In [None]:
import psutil
import win32com.client
import time
import pandas as pd
from pymongo import MongoClient
import sys
from dotenv import load_dotenv
import os

load_dotenv()
mongo_client = MongoClient(os.environ.get("MONGO_URI"))
stock_db = mongo_client["stock_db"]
ref_db = mongo_client["ref_db"]

In [None]:
def get_mongo_df(db_collection, df_name, find_query=None, projection=None):
    # Truy cập collection
    collection = db_collection[df_name]
    # Nếu không truyền vào find_query thì mặc định lấy tất cả document
    if find_query is None:
        find_query = {}
    # Nếu không truyền vào projection thì mặc định loại bỏ trường _id
    if projection is None:
        projection = {"_id": 0}
    # Thực hiện lệnh find với điều kiện và projection đã cho
    docs = collection.find(find_query, projection)
    # Chuyển đổi kết quả sang DataFrame và trả về
    df = pd.DataFrame(list(docs))
    return df

def get_open_excel_workbooks():
    excel = win32com.client.GetActiveObject("Excel.Application")
    workbooks = [wb.Name for wb in excel.Workbooks]
    return workbooks

def write_list_to_row(worksheet, row_num, data_list, start_col=1):
    for i, item in enumerate(data_list):
        worksheet.Cells(row_num, start_col + i).Value = item

##### Ghi báo cáo tài chính từ excel vào pandas

In [None]:
all_stock_lists = get_mongo_df(ref_db, 'full_stock_classification')['ticker'].tolist()

open_workbooks = get_open_excel_workbooks()
year = '2024'
quarter = '4'
period_count = '16'
report_list = ['IncomeStatement', 'BalanceSheet', 'CashFlow']

# Khởi tạo dictionary để lưu kết quả
fs_dict = {
    'IncomeStatement': {},
    'BalanceSheet': {},
    'CashFlow': {}
}

# Kết nối với Excel đang mở
excel = win32com.client.GetActiveObject("Excel.Application")
excel.Visible = True  # Đảm bảo Excel hiển thị để kiểm tra

# Duyệt qua danh sách báo cáo
file_name = f"excel.xlsx"

if file_name in open_workbooks:
    workbook = excel.Workbooks(file_name)
    worksheet = workbook.ActiveSheet
    excel.Calculation = win32com.client.constants.xlCalculationManual
    excel.ScreenUpdating = False
    for stock_order, stock in enumerate(all_stock_lists):
        for report_name in report_list:
            # **Xóa dữ liệu cũ hoàn toàn**
            while True:
                try:
                    worksheet.UsedRange.ClearContents()  # Xóa nội dung và công thức
                    worksheet.UsedRange.ClearFormats()   # Xóa định dạng để loại bỏ spill range cũ


                    worksheet.Range("A1").Formula2 = f'=FA.{report_name}.Reports("{stock}",{year},{quarter},{period_count},1000000)'
                    # **Đọc dữ liệu từ UsedRange**
                    used_range = worksheet.UsedRange
                    rows = used_range.Rows.Count
                    cols = used_range.Columns.Count

                    data = []
                    for i in range(1, rows + 1):
                        row_data = []
                        for j in range(1, cols + 1):
                            value = worksheet.Cells(i, j).Value
                            row_data.append(value)
                        data.append(row_data)
                    break

                except Exception as e:
                    time.sleep(0.01)

            # **Tạo DataFrame**
            df = pd.DataFrame(data)
            if rows > 0:
                df.columns = df.iloc[0]  # Dùng hàng đầu làm tiêu đề
                df = df.iloc[1:].reset_index(drop=True)  # Bỏ hàng đầu và reset index
            else:
                df.columns = None

            fs_dict[report_name][stock] = df

            print(f"\r({stock_order+1}) Đang xử lý {report_name}-{stock}{' '*10}", end="", flush=True)

    excel.Calculation = win32com.client.constants.xlCalculationAutomatic  # Khôi phục sau khi xong
    excel.ScreenUpdating = True  # Bật lại sau khi xong

##### Biến đổi báo cáo tài chính về 1 bảng duy nhất

In [None]:
def generate_quarter_list(end_quarter, num_periods):
    quarter_parts = end_quarter.split()
    end_q = int(quarter_parts[0][1])  # Extract the quarter number (1-4)
    end_year = int(quarter_parts[1])  # Extract the year
    quarters = []
    current_q = end_q
    current_year = end_year
    for _ in range(num_periods):
        quarters.insert(0, f"Q{current_q} {current_year}")  # Insert at beginning for chronological order
        current_q -= 1
        if current_q < 1:
            current_q = 4
            current_year -= 1
    return quarters

In [None]:
this_quarter = f'Q{quarter} {year}'
quarter_list = generate_quarter_list(this_quarter, 16)
column_list = ['ticker', 'item'] + quarter_list

fs_df_dict = {}
for report_name in report_list:
    fs_df = pd.DataFrame(columns=column_list)
    for ticker, df in fs_dict[report_name].items():
        temp_df = df.rename(columns={'':'item'})
        temp_columns = list(set(column_list) & set(temp_df.columns))
        temp_df = temp_df[temp_columns]
        temp_df['ticker'] = ticker
        fs_df = pd.concat([fs_df, temp_df], axis=0, ignore_index=True)
    fs_df_dict[report_name] = fs_df

merged_financial_statement_list = []
for report_name, df in fs_df_dict.items():
    temp_df = df.copy()
    temp_df['report_name'] = report_name
    merged_financial_statement_list.append(temp_df)
merged_financial_statement_df = pd.concat(merged_financial_statement_list, axis=0, ignore_index=True)

##### Lưu vào MongoDB

In [None]:
def overwrite_mongo(collection, df):
    # Lấy tên collection hiện tại và database
    collection_name = collection.name
    db = collection.database  # Truy cập database từ collection
    temp_collection_name = f"temp_{collection_name}"
    old_collection_name = f"old_{collection_name}"

    # Reset index của DataFrame
    df = df.reset_index(drop=True)
    records = df.replace({pd.NaT: None}).to_dict(orient='records')

    # 1. Lưu dữ liệu vào collection tạm
    temp_collection = db[temp_collection_name]
    temp_collection.drop()  # Đảm bảo collection tạm sạch trước khi insert
    temp_collection.insert_many(records)

    # 2. Rename collection cũ thành 'old_' (nếu tồn tại)
    if collection_name in db.list_collection_names():
        db[collection_name].rename(old_collection_name, dropTarget=True)

    # 3. Rename collection tạm thành tên chuẩn
    temp_collection.rename(collection_name, dropTarget=True)

    # 4. Xóa collection 'old_' (nếu tồn tại)
    if old_collection_name in db.list_collection_names():
        db[old_collection_name].drop()

overwrite_mongo(ref_db["merged_financial_statement"], merged_financial_statement_df)