In [None]:
import pandas as pd
import numpy as np
import os
from tqdm.notebook import tqdm
from dotenv import load_dotenv
import warnings
from pathlib import Path
# LangChain 관련
from langchain.chat_models import ChatOpenAI
from tqdm.notebook import tqdm

# 커스텀 모듈
from utils.vector_handler import load_vectorstore

# 에러 처리
import traceback

# ✅ 환경 변수 로드 및 설정
pd.options.display.float_format = '{:.3f}'.format
warnings.filterwarnings('ignore')

# 상수 정의
PAGE_NAME = "analysis"
VECTOR_DB_ANSS_PATH = f"./vectordb/{PAGE_NAME}"

vectorstore = load_vectorstore(db_path=VECTOR_DB_ANSS_PATH)

def summarize_data(df, df_name):
    # 주어진 데이터프레임(df)에 대한 EDA(탐색적 데이터 분석) 결과를 정리하여 반환하는 함수.
    # "컬럼 개요"에 범주형 변수 분포 정보 포함
    # 연속형 변수의 인스턴스(예제) 제외 (토큰 수 절감 목적)

    # ✅ 1. 이상치 탐지 (IQR 방식)
    outliers_info = {}
    for col in df.select_dtypes(include=[np.number]).columns:
        Q1 = df[col].quantile(0.25)
        Q3 = df[col].quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        outlier_count = ((df[col] < lower_bound) | (df[col] > upper_bound)).sum()

        if outlier_count > 0:
            outliers_info[col] = int(outlier_count)

    # ✅ 2. 컬럼 개요 (기본 통계 및 결측 정보 + 인스턴스 예제 + 범주형 분포 추가)
    columns_info = []
    for col in df.columns:
        col_dtype = df[col].dtype

        col_info = {
            "데이터프레임명": df_name,
            "컬럼명": col,
            "데이터 타입": col_dtype,
        }

        # ✅ 범주형 변수만 인스턴스(예제) 포함 (연속형 변수 제외하여 토큰 수 절감)
        if col_dtype in ["object", "category"]:
            unique_vals = df[col].dropna().unique()
            # 유니크 값 샘플링 (20개 이상이면 10개만 출력 + '...')
            if len(unique_vals) > 20:
                sample_display = list(unique_vals[:10]) + ['...']
            else:
                sample_display = unique_vals.tolist()
            col_info["인스턴스(예제)"] = sample_display

            # ✅ 범주형 변수일 경우, 분포 정보 추가 (최대 10개만 저장하여 토큰 절감)
            value_counts = df[col].value_counts(normalize=True) * 100
            value_counts = value_counts[:10]  # 최대 10개만 유지
            category_distribution = {val: round(pct, 2) for val, pct in value_counts.items()}
            col_info["범주형 분포"] = category_distribution

        columns_info.append(col_info)

    columns_info_df = pd.DataFrame(columns_info)

    return columns_info_df

def search_column_descriptions(stage1_results_dict, vectorstore):
    """RAG를 사용하여 컬럼 설명을 검색하는 함수"""
    data = []
    
    # 전체 처리해야 할 행 수 계산
    total_rows = sum(len(df) for df in stage1_results_dict.values())
    
    with tqdm(total=total_rows, desc="🔍 컬럼 설명 검색 중") as pbar:
        for df_name, df in stage1_results_dict.items():
            df_after_llm = df[['데이터프레임명', '컬럼명']]
            
            for i, row in df_after_llm.iterrows():
                table_name = row['데이터프레임명']
                col = row['컬럼명']
                
                search_query = f"테이블명 : {table_name} | 컬럼명 : {col}"
                docs = vectorstore.similarity_search(search_query, k=1)
                
                if docs:
                    best_match = docs[0].page_content
                    data.append({
                        '데이터프레임명': table_name,
                        '컬럼명': col,
                        '컬럼설명': best_match.split('\n')[3].split('설명')[1].strip()
                    })
                else:
                    data.append({
                        '데이터프레임명': table_name,
                        '컬럼명': col,
                        '컬럼설명': "설명 없음"
                    })
                pbar.update(1)  # 진행바 업데이트
    
    return pd.DataFrame(data)


def generate_dataframe_info(df, vectorstore=None):
    """
    데이터프레임의 정보를 생성하는 통합 함수
    
    Args:
        df (pd.DataFrame): 분석할 데이터프레임
        vectorstore: RAG 검색을 위한 벡터스토어 (선택사항)
    
    Returns:
        dict: 생성된 모든 정보를 포함하는 딕셔너리
    """
    print("📊 데이터프레임 기본 정보 생성 중...")
    df_name = str(df.name) if hasattr(df, 'name') else 'Sheet'
    basic_info = summarize_data(df, df_name)
    
    # RAG 기반 컬럼 설명 검색 (vectorstore가 제공된 경우에만)
    if vectorstore is not None:
        print("🔍 RAG 기반 컬럼 설명 검색 중...")
        column_descriptions = search_column_descriptions({df_name: basic_info}, vectorstore)
    else:
        column_descriptions = None
    
    # 결과 통합
    result = {
        "basic_info": basic_info,
        "column_descriptions": column_descriptions,
        "df_name": df_name
    }
    
    print("✅ 모든 정보 생성이 완료되었습니다.")
    return result

def save_results_to_excel(results_dict, output_path="../output/stage1/eda_summary.xlsx"):
    """
    생성된 모든 결과를 하나의 시트로 통합하여 엑셀 파일로 저장하는 함수
    
    Args:
        results_dict (dict): generate_dataframe_info 함수의 결과
        output_path (str): 저장할 엑셀 파일 경로
    """
    print("💾 결과 저장 중...")
    try:
        # 기본 정보와 컬럼 설명을 컬럼명 기준으로 통합
        merged_df = results_dict["basic_info"].copy()
        
        # 컬럼 설명이 있는 경우 통합
        if results_dict["column_descriptions"] is not None:
            merged_df = merged_df.merge(
                results_dict["column_descriptions"][['데이터프레임명', '컬럼명', '컬럼설명']],
                on=['데이터프레임명', '컬럼명'],
                how='left'
            )
        
        # 엑셀 파일로 저장
        with pd.ExcelWriter(output_path) as writer:
            merged_df.to_excel(writer, sheet_name=results_dict["df_name"], index=False)
            print(f"✅ [LOG] {results_dict['df_name']} 분석 결과가 저장되었습니다.")
            
        print(f"✅ [LOG] 모든 결과가 성공적으로 저장되었습니다: {output_path}")
    except Exception as e:
        print(f"❌ [LOG] 파일 저장 실패: {output_path}")
        print(f"❌ [LOG] 오류 내용: {e}")

# 사용 예시:
df = pd.read_pickle('../data/datamart_20250226_093329.pkl')
result = generate_dataframe_info(df, vectorstore)  # vectorstore는 선택사항
save_results_to_excel(result, output_path="../output/stage1/eda_summary.xlsx")  # 결과 저장

📊 데이터프레임 기본 정보 생성 중...
🔍 RAG 기반 컬럼 설명 검색 중...


🔍 컬럼 설명 검색 중:   0%|          | 0/9 [00:00<?, ?it/s]

✅ 모든 정보 생성이 완료되었습니다.
💾 결과 저장 중...
✅ [LOG] Sheet 분석 결과가 저장되었습니다.
✅ [LOG] 모든 결과가 성공적으로 저장되었습니다: ../output/stage1/eda_summary.xlsx
