## 0. 전반적인 Process
- 현황
    - 샘플 데이터를 활용한 Prompt Engineering 수행 중
- 과제
    - Postgre DB 연계 필요
    - BPI 연계 필요

In [None]:
from IPython.display import Image, display
import base64

def mm(graph: str) -> None:
    """
    Mermaid 그래프를 이미지로 렌더링하여 표시합니다.

    이 함수는 Mermaid 문법으로 작성된 그래프 문자열을 받아
    base64로 인코딩한 후, Mermaid 온라인 렌더링 서비스를 통해
    이미지로 변환하여 Jupyter Notebook이나 유사한 환경에서 표시합니다.

    Args:
        graph (str): Mermaid 문법으로 작성된 그래프 문자열

    Returns:
        None: 이미지를 직접 표시하므로 반환값은 없습니다.

    Example:
        mm('''
        graph TD
        A[Start] --> B{Is it?}
        B -- Yes --> C[OK]
        B -- No --> D[End]
        ''')
    """
    # 그래프 문자열을 UTF-8로 인코딩
    graphbytes = graph.encode("utf-8")
    
    # 인코딩된 바이트를 base64로 변환
    base64_bytes = base64.b64encode(graphbytes)
    
    # base64 바이트를 ASCII 문자열로 디코딩
    base64_string = base64_bytes.decode("ascii")
    
    # Mermaid 온라인 렌더링 서비스 URL과 base64 문자열을 결합하여 이미지 표시
    display(Image(url="https://mermaid.ink/img/" + base64_string))

mm("""
graph LR
    A[API Key 등 협업 / 기획] --> B[DB 연계]
    B --> C[BPI 계산]
    C --> D[강약점 선택]
    D --> E[강약점 데이터전처리]
    E --> F[Prompt Engineering]
    F --> G[완료]
""")

## 1. 필수 라이브러리 임포트 및 환경 설정
- .env 파일에 '자신' 또는 '더존' api key 저장: 
    * OPENAI_API_KEY=sk-proj-.....

In [5]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import json
from dotenv import load_dotenv
from langchain.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnableParallel, RunnableMap, RunnablePassthrough, RunnableLambda
from langchain_core.output_parsers import StrOutputParser
from typing import Dict, Any, Literal
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain.callbacks.base import BaseCallbackHandler
from langchain.cache import SQLiteCache
from operator import itemgetter
from tabulate import tabulate
from langchain.callbacks import get_openai_callback


# env 파일 읽기
load_dotenv()

# OpenAI API 키 가져오기
openai_api_key = os.getenv("OPENAI_API_KEY")

## 2. Sample Data 생성
- 데이터 출력 방식은 협의 필요 (\w 조희성 과장님, 이도은 대리님)
- 현재 출력물
    - df_company_info : 회사명, 업종, 재무/회계 데이터
    - df_partner_info : df_company_info의 회사의 매입/매출 거래처 정보 등
- 향후 : Postgre DB와 연계

In [None]:
import pandas as pd
import numpy as np

# 접근 시점 정의
ACCESS_TIME = '2024-09-01'
access_datetime = pd.to_datetime(ACCESS_TIME)

# 재현 가능성을 위해 난수 생성기의 시드를 설정합니다.
np.random.seed(42)

# 회사 및 업종 정보 생성
num_companies = 100  # 총 100개의 회사 생성
company_names = [f'Company_{i+1}' for i in range(num_companies)]  # 회사명 리스트 생성 (Company_1, Company_2, ..., Company_100)
industries = ['Manufacturing', 'Retail', 'Technology', 'Healthcare', 'Finance']  # 업종 리스트 정의

# 각 회사에 대한 고정된 업종 리스트 생성
company_industries = [np.random.choice(industries) for _ in range(num_companies)]  

# 날짜 생성 (2020-01 ~ ACCESS_TIME 기준 한 달 전까지)
dates = pd.date_range(start="2020-01-01", end=access_datetime - pd.DateOffset(months=1), freq='MS').strftime("%Y-%m").tolist()

# 상위 매출처 및 매입처 정보 생성 함수 정의
sales_partner_pool = list(set(f'SalesPartner_{i+1}' for i in range(200)))  # 매출처 Pool 생성 (200개)
purchase_partner_pool = list(set(f'PurchasePartner_{i+1}' for i in range(200)))  # 매입처 Pool 생성 (200개)

# 각 회사별 기본 인원수를 설정합니다.
company_base_employees = {company: np.random.randint(100, 300) for company in company_names}  # 기업별로 100명 ~ 300명 사이의 기본 인원수 설정

def generate_partner_data():
    """
    상위 매출처 및 매입처 정보를 생성하는 함수입니다.
    
    Returns:
        dict: 생성된 매출처 및 매입처 정보 딕셔너리 리스트
    """
    # 상위 5개 매출처 및 매입처 무작위 선택
    sales_companies = list(np.random.choice(sales_partner_pool, 5, replace=False))
    sales_grades = list(np.random.choice(['AAA', 'AA+', 'AA', 'A+', 'A', 'BBB+', 'BBB', 'BB', 'B', 'CCC-'], 5, replace=False))
    sales_concentration = list(np.round(np.random.dirichlet(np.ones(5), 1)[0] * 100, 2))  # 매출처 집중도

    purchase_companies = list(np.random.choice(purchase_partner_pool, 5, replace=False))
    purchase_grades = list(np.random.choice(['AAA', 'AA+', 'AA', 'A+', 'A', 'BBB+', 'BBB', 'BB', 'B', 'CCC-'], 5, replace=False))
    purchase_concentration = list(np.round(np.random.dirichlet(np.ones(5), 1)[0] * 100, 2))  # 매입처 집중도

    # 매출처 및 매입처 정보 딕셔너리 리스트로 생성
    sales_partners = [
        {
            "매출처_회사명": sales_companies[i],
            "매출처_신용등급": sales_grades[i],
            "매출처_집중도": sales_concentration[i],
            "월별_거래금액": round(np.random.uniform(500, 5000), 0),  # 월별 거래금액 (500만원 ~ 5000만원)
            "월별_회수기일": round(np.random.uniform(30, 90), 0),  # 월별 회수기일 (30일 ~ 90일)
            "월별_회수잔액(예정)": round(np.random.uniform(100, 1000), 0)  # 월별 회수잔액 (100만원 ~ 1000만원)
        }
        for i in range(5)
    ]

    purchase_partners = [
        {
            "매입처_회사명": purchase_companies[i],
            "매입처_신용등급": purchase_grades[i],
            "매입처_집중도": purchase_concentration[i],
            "월별_거래금액": round(np.random.uniform(500, 5000), 0),  # 월별 거래금액 (500만원 ~ 5000만원)
            "월별_회수기일": round(np.random.uniform(30, 90), 0),  # 월별 회수기일 (30일 ~ 90일)
            "월별_회수잔액(예정)": round(np.random.uniform(100, 1000), 0)  # 월별 회수잔액 (100만원 ~ 1000만원)
        }
        for i in range(5)
    ]

    return sales_partners, purchase_partners

# 기본 정보 데이터 생성 함수 정의
def generate_basic_data(company, industry, date):
    """
    각 회사의 특정 날짜에 대한 기본 재무 데이터를 생성하는 함수입니다.
    
    Args:
        company (str): 회사명
        industry (str): 업종명
        date (str): 날짜 (YYYY-MM 형식)
    
    Returns:
        dict: 생성된 회사의 기본 재무 정보 및 상위 매출처, 매입처 정보
    """
    base_asset_value = 30000  # 기본 총자산 값 설정 (3억원)
    total_assets = round(base_asset_value * (1 + np.random.normal(0, 0.05)), 0)  # 기본 총자산 값에서 5% 변동 (정규분포 이용, 소수점 없이)
    revenue = round(np.random.uniform(1000, 10000), 0)  # 매출액은 1천만원 ~ 1억원 사이의 값으로 설정 (소수점 없이)
    operating_profit = round(revenue * np.random.uniform(-0.2, 0.2), 0)  # 영업이익은 매출액의 -20%~20% 범위로 설정 (소수점 없이)
    net_income = round(operating_profit * np.random.uniform(-0.5, 0.8), 0)  # 당기순이익은 영업이익의 -50%~80% 범위로 설정 (소수점 없이)
    short_term_loans = round(np.random.uniform(500, 5000), 0)  # 단기차입금은 500만원 ~ 5000만원 범위로 설정 (소수점 없이)
    long_term_loans = round(np.random.uniform(1000, 10000), 0)  # 장기차입금은 1000만원 ~ 1억원 범위로 설정 (소수점 없이)
    total_loans = short_term_loans + long_term_loans  # 총 차입금 계산
    loan_to_sales = round((total_loans / revenue) * 100, 2)  # 매출대비차입금 비율을 백분율로 계산
    working_capital_turnover = round(np.random.uniform(1, 5), 2)  # 운전자금회전율은 1~5회 사이의 값으로 설정
    operating_cash_flow = round(np.random.uniform(500, 5000), 0)  # 영업활동현금흐름은 500만원 ~ 5000만원 범위로 설정 (소수점 없이)
    net_cash_flow = round(operating_cash_flow - total_loans, 0)  # 순현금흐름 = 영업활동현금흐름 - 총 차입금 (소수점 없이)
    ar_balance = round(np.random.uniform(1000, 5000), 0)  # 매출채권 규모 (1000만원 ~ 5000만원)
    ap_balance = round(np.random.uniform(800, 4000), 0)  # 매입채무 규모 (800만원 ~ 4000만원)
    inventory = round(np.random.uniform(2000, 10000), 0)  # 재고자산 (2000만원 ~ 1억원)

    # 상위 매출처 및 매입처 정보 생성
    sales_partners, purchase_partners = generate_partner_data()

    # 기본 인원수를 기반으로 ±10% 내외의 변동을 줍니다.
    base_employees = company_base_employees[company]
    employees = round(base_employees * (1 + np.random.uniform(-0.1, 0.1)))  # ±10% 내외 변동

    return {
        '기업명': company,
        '업종': industry,
        '날짜': date,
        '매출액증가율': round(np.random.uniform(5, 15), 2),  # 매출액증가율 (5% ~ 15% 사이)
        '총자산증가율': round(np.random.uniform(3, 10), 2),  # 총자산증가율 (3% ~ 10% 사이)
        '총자산': int(total_assets),  # 총자산 (정수형)
        '매출액': int(revenue),  # 매출액 (정수형)
        '영업이익': int(operating_profit),  # 영업이익 (정수형)
        '영업이익률': round(operating_profit / revenue * 100, 2) if revenue != 0 else round(np.random.uniform(-20, 20), 2),  # 영업이익률 (백분율)
        '당기순이익': int(net_income),  # 당기순이익 (정수형)
        '당기순이익률': round(net_income / revenue * 100, 2) if revenue != 0 else round(np.random.uniform(-30, 30), 2),  # 당기순이익률 (백분율)
        '단기차입금': int(short_term_loans),  # 단기차입금 (정수형)
        '장기차입금': int(long_term_loans),  # 장기차입금 (정수형)
        '매출대비차입금': loan_to_sales,  # 매출대비차입금 비율 (백분율)
        '운전자금회전율': working_capital_turnover,  # 운전자금회전율
        '인원수': employees,  # 인원수 (±10% 변동)
        '월평균급여액': round(np.random.uniform(200, 500), 1),  # 월평균 급여액 (200 ~ 500만원)
        '월매출창출액': round(np.random.uniform(5000, 20000), 1),  # 월 매출 창출액 (5000 ~ 20000만원)
        '영업활동현금흐름/매출액': round(np.random.uniform(0.05, 0.2), 2),  # 영업활동현금흐름/매출액 비율 (5% ~ 20%)
        '영업활동현금흐름': int(operating_cash_flow),  # 영업활동현금흐름 (정수형)
        '순현금흐름': int(net_cash_flow),  # 순현금흐름 (정수형)
        '매출채권': int(ar_balance),  # 매출채권 (정수형)
        '매입채무': int(ap_balance),  # 매입채무 (정수형)
        '재고자산': int(inventory),  # 재고자산 (정수형)
        '상위_매출처': sales_partners,  # 상위 매출처 정보
        '상위_매입처': purchase_partners  # 상위 매입처 정보
    }

# 기본 정보 데이터 생성
# 각 회사와 날짜별로 기본 재무 데이터를 생성하여 리스트로 저장
basic_data = [
    generate_basic_data(company, industry, date)
    for company, industry in zip(company_names, company_industries)
    for date in dates
]

# 기본 정보 데이터프레임 생성
df_company_info = pd.DataFrame(basic_data)  # 회사 기본 정보로 데이터프레임 생성

# 데이터프레임 샘플 출력
print(df_company_info[['기업명', '날짜', '상위_매출처', '상위_매입처']].head())


- Company_1에 대한 샘플 데이터

In [None]:
from pprint import pprint

print('#'*10, 'Company_1에 대한 정보', '#'*10)
pprint(df_company_info[df_company_info['기업명']== 'Company_1'])

df_company_info.to_csv("sample.csv", encoding='utf-8-sig', index=False)

## 3. 데이터전처리
- 현황 : 성장성, 수익성 구현
- 전처리 함수
    - preprocess_growth_data : '총자산', '매출액' 등 성장성 관련 Data를 Dict 형태로 변경
    - preprocess_profitability_data : '영업이익', '당기순이익' 등 수익성 관련 Data를 Dict 형태로 변경

### 성장성 (Growth) 전처리

In [None]:
import pandas as pd
import numpy as np
from datetime import datetime
from typing import Dict, Any
import logging

# 재현성을 위한 Random seed 설정
np.random.seed(42)

# 로깅 설정
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# 접근 시점 정의
ACCESS_TIME = '2024-09-01'
access_datetime = pd.to_datetime(ACCESS_TIME)

class DataValidationError(Exception):
    """데이터 검증 관련 커스텀 예외"""
    pass

class GrowthMetricsCalculator:
    """성장성 지표 계산을 위한 메인 클래스"""

    REQUIRED_COLUMNS = ['기업명', '날짜', '업종', '총자산', '매출액']
    ASSET_COL, REVENUE_COL = '총자산', '매출액'
    COMPANY_COL, DATE_COL, INDUSTRY_COL = '기업명', '날짜', '업종'

    def __init__(self, df_company_info: pd.DataFrame, target_company_name: str, n_years: int = 3):
        self.df = self._validate_input_data(df_company_info)
        self.target_company_name = target_company_name
        self.n_years = n_years
        self.growth_data = {
            'annual_revenue': {},
            'annual_assets': {},
            'monthly_revenue': {},
            'monthly_growth': {}
        }

    def _validate_input_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """필수 컬럼 및 데이터 유효성 검증 후 데이터 반환"""
        if not all(col in df.columns for col in self.REQUIRED_COLUMNS):
            missing_cols = set(self.REQUIRED_COLUMNS) - set(df.columns)
            raise DataValidationError(f"필수 컬럼 누락: {missing_cols}")
        if df[self.COMPANY_COL].isnull().all():
            raise DataValidationError(f"회사를 찾을 수 없음")
        for col in [self.ASSET_COL, self.REVENUE_COL]:
            if not pd.to_numeric(df[col], errors='coerce').notnull().all():
                raise DataValidationError(f"숫자가 아닌 값이 포함됨: {col}")
        logger.info("데이터 검증 완료")
        return df[self.REQUIRED_COLUMNS].copy()

    def _prepare_data(self):
        """데이터 전처리 및 필요한 필드 추가"""
        self.df['date'] = pd.to_datetime(self.df[self.DATE_COL])
        self.df['year'] = self.df['date'].dt.year
        self.df['month'] = self.df['date'].dt.month
        self.df = self.df.fillna({self.ASSET_COL: 0, self.REVENUE_COL: 0})
        self.df_company = self.df[self.df[self.COMPANY_COL] == self.target_company_name].reset_index(drop=True)
        if self.df_company.empty:
            raise DataValidationError(f"회사 데이터가 없음: {self.target_company_name}")
        self.target_industry = self.df_company[self.INDUSTRY_COL].iloc[0]
        self.df_industry = self.df[self.df[self.INDUSTRY_COL] == self.target_industry].reset_index(drop=True)
        self.latest_date = access_datetime - pd.DateOffset(months=1)
        logger.info(f"데이터 준비 완료 - 기준일자: {self.latest_date.strftime('%Y-%m-%d')}")

    def _calculate_growth_rate(self, current: float, previous: float, threshold: float = 0.0001) -> float:
        """성장률 계산"""
        if abs(previous) < threshold:
            logger.warning(f"이전 값이 임계값보다 작음 (previous: {previous})")
            return 0
        return round((current - previous) / previous * 100, 2)

    def _process_annual_data(self):
        """과거 연도 및 연말 예상 연도 데이터 계산 및 저장"""
        for year_offset in range(1, self.n_years + 1):
            target_year = self.latest_date.year - year_offset
            self._calculate_annual_metrics(target_year, estimate=False)
        self._calculate_annual_metrics(self.latest_date.year, estimate=True)  # 연말 예상 데이터

    def _calculate_annual_metrics(self, year: int, estimate: bool = False):
        """연간 또는 연말 예상 데이터 계산"""
        current_data = self.df_company[self.df_company['year'] == year]
        prev_data = self.df_company[self.df_company['year'] == year - 1]
        current_industry_data = self.df_industry[self.df_industry['year'] == year]
        prev_industry_data = self.df_industry[self.df_industry['year'] == year - 1]

        current_assets = int(current_data[self.ASSET_COL].iloc[-1] if not current_data.empty else 0)
        current_revenue = int(current_data[self.REVENUE_COL].sum())
        prev_assets = int(prev_data[self.ASSET_COL].iloc[-1] if not prev_data.empty else 0)
        prev_revenue = int(prev_data[self.REVENUE_COL].sum() if not prev_data.empty else 0)

        industry_assets = int(current_industry_data[self.ASSET_COL].mean() if not current_industry_data.empty else 0)
        industry_revenue = int(current_industry_data[self.REVENUE_COL].sum())
        prev_industry_assets = int(prev_industry_data[self.ASSET_COL].mean() if not prev_industry_data.empty else 0)
        prev_industry_revenue = int(prev_industry_data[self.REVENUE_COL].sum() if not prev_industry_data.empty else 0)

        key = f"{year}년(E)" if estimate else f"{year}년"
        self.growth_data['annual_revenue'][key] = {
            '매출액': current_revenue,
            '매출액증가율': self._calculate_growth_rate(current_revenue, prev_revenue),
            '업종평균 매출액증가율': self._calculate_growth_rate(industry_revenue, prev_industry_revenue),
        }
        self.growth_data['annual_assets'][key] = {
            '총자산': current_assets,
            '총자산증가율': self._calculate_growth_rate(current_assets, prev_assets),
            '업종평균 총자산증가율': self._calculate_growth_rate(industry_assets, prev_industry_assets)
        }

    def _process_monthly_data(self):
        """최근 12개월 월별 데이터 계산 및 저장"""
        # 2023-08부터 시작하는 최근 12개월 생성
        past_12_months = pd.date_range(end=self.latest_date + pd.DateOffset(months=1), periods=12, freq='ME')
        cumulative_data = {'current': {'revenue': 0}, 'previous': {'revenue': 0}}

        for idx, date in enumerate(past_12_months):
            month_str = date.strftime('%Y-%m')
            current_month = self.df_company[self.df_company['date'].dt.strftime('%Y-%m') == month_str]
            prev_month = self.df_company[self.df_company['date'].dt.strftime('%Y-%m') == (date - pd.DateOffset(years=1)).strftime('%Y-%m')]

            current_revenue = int(current_month[self.REVENUE_COL].sum())
            prev_revenue = int(prev_month[self.REVENUE_COL].sum() if not prev_month.empty else 0)

            cumulative_data['current']['revenue'] += current_revenue
            cumulative_data['previous']['revenue'] += prev_revenue

            self.growth_data['monthly_revenue'][month_str] = {
                '당월매출액': current_revenue,
                '전년동월매출액': prev_revenue
            }
            self.growth_data['monthly_growth'][month_str] = {
                '매출액증가율': self._calculate_growth_rate(current_revenue, prev_revenue),
                '누적매출액증가율': self._calculate_growth_rate(
                    cumulative_data['current']['revenue'],
                    cumulative_data['previous']['revenue']
                )
            }
            logger.debug(f"{month_str} 월별 데이터 처리 완료")


    def calculate_growth_metrics(self) -> Dict[str, pd.DataFrame]:
        """성장성 지표 계산 실행"""
        self._prepare_data()
        self._process_annual_data()
        self._process_monthly_data()

        # DataFrames 생성 및 날짜 순 정렬 (Transpose를 한 번만 사용)
        annual_revenue_df = pd.DataFrame(self.growth_data['annual_revenue']).sort_index(axis=1)
        annual_assets_df = pd.DataFrame(self.growth_data['annual_assets']).sort_index(axis=1)
        monthly_revenue_df = pd.DataFrame(self.growth_data['monthly_revenue']).sort_index(axis=1)
        monthly_growth_df = pd.DataFrame(self.growth_data['monthly_growth']).sort_index(axis=1)

        return {
            'latest_year_month': self.latest_date.strftime('%Y-%m'),
            'annual_revenue': annual_revenue_df,
            'annual_assets': annual_assets_df,
            'monthly_revenue': monthly_revenue_df,
            'monthly_growth': monthly_growth_df
        }


def preprocess_growth_data(df_company_info: pd.DataFrame, target_company_name: str) -> Dict[str, Any]:
    """성장성 지표 데이터를 전처리하여 JSON 형식으로 변환"""
    calculator = GrowthMetricsCalculator(df_company_info, target_company_name)
    return calculator.calculate_growth_metrics()



growth_data = preprocess_growth_data(df_company_info, 'Company_1')
print('#'*10, 'last_year_month', '#'*10)
pprint(growth_data['latest_year_month'])
print()

print('#'*10, 'annual_revenue', '#'*10)
pprint(pd.DataFrame(growth_data['annual_revenue']))
print()

print('#'*10, 'annual_assets', '#'*10)
pprint(pd.DataFrame(growth_data['annual_assets']))
print()

print('#'*10, 'monthly_revenue', '#'*10)
pprint(pd.DataFrame(growth_data['monthly_revenue']))
print()

print('#'*10, 'monthly_growth', '#'*10)
pprint(pd.DataFrame(growth_data['monthly_growth']))

### Profitability (수익성) 전처리

In [None]:
import pandas as pd
import numpy as np
from datetime import datetime
from typing import Dict, Any
import logging

# 재현성을 위한 Random seed 설정
np.random.seed(42)

# 로깅 설정
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# 접근 시점 정의
ACCESS_TIME = '2024-09-01'
access_datetime = pd.to_datetime(ACCESS_TIME)

class DataValidationError(Exception):
    """데이터 검증 관련 커스텀 예외"""
    pass

class ProfitabilityMetricsCalculator:
    """수익성 지표 계산을 위한 메인 클래스"""

    REQUIRED_COLUMNS = ['기업명', '날짜', '업종', '영업이익', '당기순이익', '매출액']
    OPERATING_PROFIT_COL, NET_PROFIT_COL, REVENUE_COL = '영업이익', '당기순이익', '매출액'
    COMPANY_COL, DATE_COL, INDUSTRY_COL = '기업명', '날짜', '업종'

    def __init__(self, df_company_info: pd.DataFrame, target_company_name: str, n_years: int = 3):
        self.df = self._validate_input_data(df_company_info)
        self.target_company_name = target_company_name
        self.n_years = n_years
        self.profitability_data = {
            'annual_profit': {},
            'annual_margins': {},
            'monthly_profit': {},
            'monthly_margins': {}
        }

    def _validate_input_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """필수 컬럼 및 데이터 유효성 검증 후 데이터 반환"""
        if not all(col in df.columns for col in self.REQUIRED_COLUMNS):
            missing_cols = set(self.REQUIRED_COLUMNS) - set(df.columns)
            raise DataValidationError(f"필수 컬럼 누락: {missing_cols}")
        if df[self.COMPANY_COL].isnull().all():
            raise DataValidationError(f"회사를 찾을 수 없음")
        for col in [self.OPERATING_PROFIT_COL, self.NET_PROFIT_COL, self.REVENUE_COL]:
            if not pd.to_numeric(df[col], errors='coerce').notnull().all():
                raise DataValidationError(f"숫자가 아닌 값이 포함됨: {col}")
        logger.info("데이터 검증 완료")
        return df[self.REQUIRED_COLUMNS].copy()

    def _prepare_data(self):
        """데이터 전처리 및 필요한 필드 추가"""
        self.df['date'] = pd.to_datetime(self.df[self.DATE_COL])
        self.df['year'] = self.df['date'].dt.year
        self.df['month'] = self.df['date'].dt.month
        self.df = self.df.fillna({self.OPERATING_PROFIT_COL: 0, self.NET_PROFIT_COL: 0, self.REVENUE_COL: 0})
        self.df_company = self.df[self.df[self.COMPANY_COL] == self.target_company_name].reset_index(drop=True)
        if self.df_company.empty:
            raise DataValidationError(f"회사 데이터가 없음: {self.target_company_name}")
        self.target_industry = self.df_company[self.INDUSTRY_COL].iloc[0]
        self.df_industry = self.df[self.df[self.INDUSTRY_COL] == self.target_industry].reset_index(drop=True)
        self.latest_date = access_datetime - pd.DateOffset(months=1)
        self.latest_month = self.latest_date.month
        logger.info(f"데이터 준비 완료 - 기준일자: {self.latest_date.strftime('%Y-%m-%d')}")

    def _calculate_margin_rate(self, profit: float, revenue: float, threshold: float = 0.0001) -> float:
        """이익률 계산"""
        if abs(revenue) < threshold:
            logger.warning(f"매출액이 임계값보다 작음 (revenue: {revenue})")
            return 0
        return round((profit / revenue * 100), 2)

    def _process_annual_data(self):
        """과거 연도 및 연말 예상 연도 데이터 계산 및 저장"""
        for year_offset in range(1, self.n_years + 1):
            target_year = self.latest_date.year - year_offset
            self._calculate_annual_metrics(target_year, estimate=False)
        self._calculate_annual_metrics(self.latest_date.year, estimate=True)  # 연말 예상 데이터

    def _calculate_annual_metrics(self, year: int, estimate: bool = False):
        """연간 또는 연말 예상 데이터 계산"""
        if estimate:
            current_data = self.df_company[(self.df_company['year'] == year) & (self.df_company['month'] <= self.latest_month)]
            annualized_metrics = self._calculate_annualized_metrics(current_data, is_industry=False)
            industry_metrics = self._calculate_annualized_metrics(self.df_industry[self.df_industry['year'] == year], is_industry=True)
        else:
            current_data = self.df_company[self.df_company['year'] == year]
            prev_data = self.df_company[self.df_company['year'] == year - 1]
            industry_data = self.df_industry[self.df_industry['year'] == year]
            annualized_metrics = self._calculate_metrics(current_data, prev_data)
            industry_metrics = self._calculate_metrics(industry_data, None)

        key = f"{year}년(E)" if estimate else f"{year}년"
        self.profitability_data['annual_profit'][key] = {
            '영업이익': annualized_metrics['operating_profit'],
            '당기순이익': annualized_metrics['net_profit']
        }
        self.profitability_data['annual_margins'][key] = {
            '영업이익률': annualized_metrics['operating_profit_margin'],
            '당기순이익률': annualized_metrics['net_profit_margin'],
            '업종평균 영업이익률': industry_metrics['operating_profit_margin'],
            '업종평균 당기순이익률': industry_metrics['net_profit_margin']
        }

    def _calculate_annualized_metrics(self, current_data: pd.DataFrame, is_industry: bool = False) -> Dict[str, float]:
        """연환산 지표 계산"""
        monthly_op = current_data[self.OPERATING_PROFIT_COL].sum()
        monthly_np = current_data[self.NET_PROFIT_COL].sum()
        monthly_rev = current_data[self.REVENUE_COL].sum()

        # 연환산 계산
        annualized_op = (monthly_op / self.latest_month) * 12
        annualized_np = (monthly_np / self.latest_month) * 12
        annualized_rev = (monthly_rev / self.latest_month) * 12

        return {
            'operating_profit': int(round(annualized_op)),
            'net_profit': int(round(annualized_np)),
            'operating_profit_margin': self._calculate_margin_rate(annualized_op, annualized_rev),
            'net_profit_margin': self._calculate_margin_rate(annualized_np, annualized_rev)
        }

    def _calculate_metrics(self, current_data: pd.DataFrame, prev_data: pd.DataFrame = None) -> Dict[str, float]:
        """연간 수익성 지표 계산"""
        current_op = current_data[self.OPERATING_PROFIT_COL].sum()
        current_np = current_data[self.NET_PROFIT_COL].sum()
        current_rev = current_data[self.REVENUE_COL].sum()

        return {
            'operating_profit': int(round(current_op)),
            'net_profit': int(round(current_np)),
            'operating_profit_margin': self._calculate_margin_rate(current_op, current_rev),
            'net_profit_margin': self._calculate_margin_rate(current_np, current_rev)
        }

    def _process_monthly_data(self):
        """최근 12개월 월별 데이터 계산 및 저장"""
        past_12_months = pd.date_range(end=self.latest_date + pd.DateOffset(months=1), periods=12, freq='ME')
        cumulative_data = {'current': {'operating_profit': 0, 'net_profit': 0, 'revenue': 0}}

        for idx, date in enumerate(past_12_months):
            month_str = date.strftime('%Y-%m')
            current_month = self.df_company[self.df_company['date'].dt.strftime('%Y-%m') == month_str]
            prev_month = self.df_company[self.df_company['date'].dt.strftime('%Y-%m') == (date - pd.DateOffset(years=1)).strftime('%Y-%m')]

            current_op = int(current_month[self.OPERATING_PROFIT_COL].sum())
            current_np = int(current_month[self.NET_PROFIT_COL].sum())
            current_rev = int(current_month[self.REVENUE_COL].sum())
            prev_op = int(prev_month[self.OPERATING_PROFIT_COL].sum()) if not prev_month.empty else 0
            prev_np = int(prev_month[self.NET_PROFIT_COL].sum()) if not prev_month.empty else 0

            cumulative_data['current']['operating_profit'] += current_op
            cumulative_data['current']['net_profit'] += current_np
            cumulative_data['current']['revenue'] += current_rev

            self.profitability_data['monthly_profit'][month_str] = {
                '당월영업이익': current_op,
                '전년동월영업이익': prev_op,
                '당월당기순이익': current_np,
                '전년동월당기순이익': prev_np
            }
            self.profitability_data['monthly_margins'][month_str] = {
                '영업이익률': self._calculate_margin_rate(current_op, current_rev),
                '누적영업이익률': self._calculate_margin_rate(cumulative_data['current']['operating_profit'], cumulative_data['current']['revenue']),
                '당기순이익률': self._calculate_margin_rate(current_np, current_rev),
                '누적순이익률': self._calculate_margin_rate(cumulative_data['current']['net_profit'], cumulative_data['current']['revenue'])
            }

    def calculate_profitability_metrics(self) -> Dict[str, pd.DataFrame]:
        """수익성 지표 계산 실행"""
        self._prepare_data()
        self._process_annual_data()
        self._process_monthly_data()

        # DataFrames 생성 및 날짜 순 정렬
        annual_profit_df = pd.DataFrame(self.profitability_data['annual_profit']).sort_index(axis=1)
        annual_margins_df = pd.DataFrame(self.profitability_data['annual_margins']).sort_index(axis=1)
        monthly_profit_df = pd.DataFrame(self.profitability_data['monthly_profit']).sort_index(axis=1)
        monthly_margins_df = pd.DataFrame(self.profitability_data['monthly_margins']).sort_index(axis=1)

        return {
            'latest_year_month': self.latest_date.strftime('%Y-%m'),
            'annual_profit': annual_profit_df,
            'annual_margins': annual_margins_df,
            'monthly_profit': monthly_profit_df,
            'monthly_margins': monthly_margins_df
        }

def preprocess_profitability_data(df_company_info: pd.DataFrame, target_company_name: str) -> Dict[str, Any]:
    """수익성 지표 데이터를 전처리하여 JSON 형식으로 변환"""
    calculator = ProfitabilityMetricsCalculator(df_company_info, target_company_name)
    return calculator.calculate_profitability_metrics()



profitability_data = preprocess_profitability_data(df_company_info, 'Company_1')

print('#'*10, 'last_year_month', '#'*10)
pprint(profitability_data['latest_year_month'])
print()

print('#'*10, 'annual_revenue', '#'*10)
pprint(pd.DataFrame(profitability_data['annual_profit']))
print()

print('#'*10, 'annual_assets', '#'*10)
pprint(pd.DataFrame(profitability_data['annual_margins']))
print()

print('#'*10, 'monthly_revenue', '#'*10)
pprint(pd.DataFrame(profitability_data['monthly_profit']))
print()

print('#'*10, 'monthly_growth', '#'*10)
pprint(pd.DataFrame(profitability_data['monthly_margins']))

## 4. Langchain을 활용한 분석

- cache 설정 : 비용 절감 목적
- streaming을 위한 custom handler 설정
- common_llm : 데이터 기반 해석만 하는 자유도가 낮은 gpt-4o-mini
- load_template : template 읽기
- determine_strength_weakness : **BPI 계산법 통해 연계 필요**
- create_analysis_chain : 분석 chain 생성
- run_analysis : 강/약점을 활용해 분석 시작

In [152]:
import pandas as pd
import numpy as np
from datetime import datetime
from typing import Dict, Any
import logging
from pprint import pprint
from langchain_openai import ChatOpenAI
from langchain.prompts import PromptTemplate
from langchain.schema import StrOutputParser
from langchain.schema.runnable import RunnableParallel, RunnableLambda
from operator import itemgetter
from tabulate import tabulate
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

# 로깅 설정
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# OpenAI GPT-4 Mini 모델 설정
common_llm = ChatOpenAI(
    model_name="gpt-4o-mini",
    temperature=0,
    max_tokens=300,
    streaming=True,
    callbacks=[StreamingStdOutCallbackHandler()]
)

# 디렉토리 설정
CURRENT_DIR = os.getcwd()
PROMPT_PATH = os.path.join(CURRENT_DIR, "prompts")

def load_prompt(file_name: str) -> str:
    file_path = os.path.join(PROMPT_PATH, file_name)
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            return file.read()
    except FileNotFoundError:
        print(f"Error: Prompt file {file_name} not found.")
        return ""

def determine_strength_weakness(data: pd.DataFrame, company_name: str) -> Dict[str, str]:
    logging.disable(logging.CRITICAL)  # 로깅 임시 비활성화
    growth_data = preprocess_growth_data(data[data['기업명'] == company_name], company_name)
    profitability_data = preprocess_profitability_data(data[data['기업명'] == company_name], company_name)
    
    growth_score = float(growth_data['monthly_growth'].loc['누적매출액증가율'].iloc[-1])
    profit_score = float(profitability_data['monthly_margins'].loc['누적영업이익률'].iloc[-1])
    
    # print(f"Growth Score: {growth_score}, Profit Score: {profit_score}")
    
    if growth_score > profit_score:
        return {'strength': 'growth', 'weakness': 'profitability'}
    else:
        return {'strength': 'profitability', 'weakness': 'growth'}

def create_analysis_chain(indicator: str, is_strength: bool, llm: ChatOpenAI, df: pd.DataFrame, company_name: str):
    base_template = load_prompt(f"{indicator}_template.txt")
    additional_prompt = "Perform analysis on positive side." if is_strength else "Perform analysis on negative side."
    full_template = f"{base_template}\n\n{additional_prompt}"
    
    prompt = PromptTemplate.from_template(full_template)
    analysis_chain = prompt | llm | StrOutputParser()
    
    if indicator == 'growth':
        preprocess_func = preprocess_growth_data
        data_keys = ['latest_year_month', 'annual_revenue', 'annual_assets', 'monthly_revenue', 'monthly_growth']
    elif indicator == 'profitability':
        preprocess_func = preprocess_profitability_data
        data_keys = ['latest_year_month', 'annual_profit', 'annual_margins', 'monthly_profit', 'monthly_margins']
    else:
        raise ValueError(f"Unknown indicator: {indicator}")
        
    return RunnableLambda(lambda df: analysis_chain.invoke({
        **{key: preprocess_func(df, company_name)[key] for key in data_keys}
    }))

def run_analysis(df: pd.DataFrame, company_name: str) -> Dict[str, Any]:
    strength_weakness = determine_strength_weakness(df, company_name)
    strength = strength_weakness['strength']
    weakness = strength_weakness['weakness']
    
    strength_chain = create_analysis_chain(strength, True, common_llm, df, company_name)
    weakness_chain = create_analysis_chain(weakness, False, common_llm, df, company_name)
    
    parallel_chain = RunnableParallel({
        'strength_analysis': strength_chain,
        'weakness_analysis': weakness_chain
    })
    
    return {
        'parallel': parallel_chain,
        'strength': strength_chain,
        'weakness': weakness_chain,
        'strength_name': strength,
        'weakness_name': weakness
    }

from tabulate import tabulate

def print_preprocessed_data(df: pd.DataFrame, metric: str) -> None:
    """전처리된 데이터를 출력"""
    preprocess_func = globals()[f"preprocess_{metric}_data"]
    preprocessed_data = preprocess_func(df, df['기업명'].iloc[0])
    latest_year_month = preprocessed_data['latest_year_month']
    print(f"\n{metric.capitalize()} Data ({latest_year_month} 기준):")
    
    if metric == 'growth':
        # Annual Revenue
        print("\nAnnual Revenue:")
        annual_revenue_df = pd.DataFrame(preprocessed_data['annual_revenue'])
        print(tabulate(annual_revenue_df, headers='keys', tablefmt='pretty'))

        # Annual Assets
        print("\nAnnual Assets:")
        annual_assets_df = pd.DataFrame(preprocessed_data['annual_assets'])
        print(tabulate(annual_assets_df, headers='keys', tablefmt='pretty'))

        # Monthly Revenue
        print("\nMonthly Revenue:")
        monthly_revenue_df = pd.DataFrame(preprocessed_data['monthly_revenue'])
        print(tabulate(monthly_revenue_df, headers='keys', tablefmt='pretty'))

        # Monthly Growth
        print("\nMonthly Growth:")
        monthly_growth_df = pd.DataFrame(preprocessed_data['monthly_growth'])
        print(tabulate(monthly_growth_df, headers='keys', tablefmt='pretty'))

    elif metric == 'profitability':
        # Annual Profit
        print("\nAnnual Profit:")
        annual_profit_df = pd.DataFrame(preprocessed_data['annual_profit'])
        print(tabulate(annual_profit_df, headers='keys', tablefmt='pretty'))

        # Annual Margins
        print("\nAnnual Margins:")
        annual_margins_df = pd.DataFrame(preprocessed_data['annual_margins'])
        print(tabulate(annual_margins_df, headers='keys', tablefmt='pretty'))

        # Monthly Profit
        print("\nMonthly Profit:")
        monthly_profit_df = pd.DataFrame(preprocessed_data['monthly_profit'])
        print(tabulate(monthly_profit_df, headers='keys', tablefmt='pretty'))

        # Monthly Margins
        print("\nMonthly Margins:")
        monthly_margins_df = pd.DataFrame(preprocessed_data['monthly_margins'])
        print(tabulate(monthly_margins_df, headers='keys', tablefmt='pretty'))

    else:
        print(f"Unknown metric: {metric}")

def merge_analysis_results(strength_result: str, weakness_result: str) -> str:
    """
    강점 분석 결과와 약점 분석 결과를 하나의 문자열로 병합합니다.
    """
    return '\n\n'.join([
        "Strength Analysis:",
        strength_result,
        "Weakness Analysis:",
        weakness_result
    ])

## 5. 경영제언 Session
- gpt-4o로 고품질 모델 활용
- 창의도를 결정하는 temperature=1.2로 설정
- max_tokens=500으로 조금 더 긴 text 작성하도록 설정
- final_chain : 앞에서 분석한 strength와 weakness를 합쳐서 경영제언을 하도록 chain으로 연결
    - 1안 : 앞에서 streaming으로 분석한 strength_result와 weakness_result를 묶어서 수행 (토큰 비용 아낌)
    - 2안 : End-to-End로 강단점 분석부터 시작해서 경영 제언까지 수행 (토큰 비용 더 발생) 

In [155]:
insight_llm = ChatOpenAI(
    model_name="gpt-4o",
    temperature=1.2,
    max_tokens=500,
    streaming=True,
    # cache=langchain_cache,
    callbacks=[StreamingStdOutCallbackHandler()]
)

In [None]:
# 분석 실행
company_list = df_company_info['기업명'].unique()[:3]
for firm in company_list:
    firm_data = df_company_info[df_company_info['기업명'] == firm]
    chains = run_analysis(df_company_info, firm)
    
    strength_chain = chains['strength']
    weakness_chain = chains['weakness']
    strength_name = chains['strength_name']
    weakness_name = chains['weakness_name']
    
    print(f"\n\n{'#'*10} Preprocessing Strength ({strength_name}) Data for {firm} {'#'*10}")
    print_preprocessed_data(firm_data, strength_name)
    
    print(f"\n\n{'#'*10} Analyzing Strength ({strength_name}) for {firm} {'#'*10}")
    strength_result = strength_chain.invoke(firm_data)
    # print(strength_result)
    
    print(f"\n\n{'#'*10} Preprocessing Weakness ({weakness_name}) Data for {firm} {'#'*10}")
    print_preprocessed_data(firm_data, weakness_name)
        
    print(f"\n\n{'#'*10} Analyzing Weakness ({weakness_name}) for {firm} {'#'*10}")
    weakness_result = weakness_chain.invoke(firm_data)
    # print(weakness_result)
    
    print("\n\n",'#'*10, f'Final Comment for {firm}', '#'*10)    
    final_chain = (
        RunnableParallel({
            'strength_result': itemgetter('strength'),
            'weakness_result': itemgetter('weakness')
        })
        | RunnableLambda(lambda x: {'info': merge_analysis_results(x['strength_result'], x['weakness_result'])})
        | PromptTemplate.from_template(
        """
        Translate and analyze the provided company's strengths and weaknesses:

        {info}

        - Analyze the company's strengths and weaknesses based on the provided data, focusing on key business management insights.
        - Provide specific and actionable recommendations, grounded in the analysis, to capitalize on strengths and address weaknesses.
        - Highlight critical trends and figures as evidence to support your recommendations.
        - Ensure that the recommendations are concise (limited to 5 lines) and translated into Korean in a professional tone.

        # Output Format
        - The response should be in Korean, using natural and connected language.
        - Limit the response to 5 lines or less, with a clear and cohesive structure.
        - Write in a continuous paragraph format that flows smoothly between data points, avoiding lists or bullet points.

        # Guidelines
        1. Base your response objectively on the given data without speculation.
        2. Use specific figures and trends to support your analysis.
        3. Provide practical recommendations that are directly actionable.
        4. Maintain clarity and professionalism in both the analysis and the translation to Korean.
        """
        ) 
        | insight_llm | StrOutputParser()
    )
    result = final_chain.invoke({
        'strength': strength_result,
        'weakness': weakness_result
    })
    
    # print(result)


In [None]:


print(tabulate(pd.DataFrame(preprocess_growth_data(firm_data)['year_level_data']), headers = 'keys', tablefmt='psql'))

# pd.DataFrame(preprocess_growth_data(firm_data)['year_level_data'])

In [None]:
# run_analysis 함수의 결과를 사용하여 final_chain 구성
def create_final_chain(df: pd.DataFrame):
    chains = run_analysis(df)
    parallel_chain = chains['parallel']

    final_chain = (
        parallel_chain
        | RunnableLambda(lambda x: {
            'info': merge_analysis_results(
                x['strength_analysis'],
                x['weakness_analysis']
            ),
            'strength_name': chains['strength_name'],
            'weakness_name': chains['weakness_name']
        })
        | PromptTemplate.from_template(
            """
            Translate and analyze the provided company's strengths ({strength_name}) and weaknesses ({weakness_name}):

            {info}

            - Analyze the company's strengths and weaknesses based on the provided data, focusing on key business management insights.
            - Provide specific and actionable recommendations, grounded in the analysis, to capitalize on strengths and address weaknesses.
            - Highlight critical trends and figures as evidence to support your recommendations.
            - Ensure that the recommendations are concise (limited to 5 lines) and translated into Korean in a professional tone.

            # Output Format
            - The response should be in Korean.
            - Limit the response to 5 lines or less, with a clear and cohesive structure.
            - Write in a continuous paragraph format that flows smoothly between data points, avoiding lists or bullet points.

            # Guidelines
            1. Base your response objectively on the given data without speculation.
            2. Use specific figures and trends to support your analysis.
            3. Provide practical recommendations that are directly actionable.
            4. Maintain clarity and professionalism in both the analysis and the translation to Korean.
            """
        ) 
        | insight_llm 
        | StrOutputParser()
    )

    return final_chain

# 실행

# 단일 회사 정보만을 필터링하여 사용 (예: 첫 번째 회사)
company_list = df_company_info['기업명'].unique()[:3]
for firm in company_list:
    firm_data = df_company_info[df_company_info['기업명'] == firm]
    final_chain = create_final_chain(firm_data)
    print("\n", '#'*10, f'Final Comment for {firm}', '#'*10)
    result = final_chain.invoke(firm_data)
    print('\n')


In [None]:
print(result)