In [1]:
import pandas as pd
import json
import os
from typing import Dict, Any, Optional, Tuple
from datetime import datetime, timedelta
from sub_func import *

In [2]:
def extract_key_metrics(sector_info):
    """섹터 정보에서 Carhart 4 factor 관련 주요 지표 추출"""
    if sector_info is None:
        return None
    
    return {
        'market_beta': sector_info.get('market_beta', 0),
        'size_factor': sector_info.get('size_factor', 0),
        'value_factor': sector_info.get('value_factor', 0),
        'momentum_factor': sector_info.get('momentum_factor', 0)
    }

def extract_sentiment_score(df):
    """감성분석 결과를 numerical score로 변환"""
    def get_score(result):
        if isinstance(result, dict):
            # 딕셔너리에서 감성 결과 추출 (예: result.get('sentiment') 등)
            sentiment = result.get('sentiment', 'neutral')  # 적절한 키로 수정
        else:
            sentiment = result
            
        score_mapping = {
            'positive': 1.0,
            'neutral': 0.0,
            'negative': -1.0
        }
        return score_mapping.get(sentiment, 0.0)
    
    df['sentiment_score'] = df['SA_result'].apply(get_score)
    return df

def filter_by_percentile_and_label(df, label, percentile):
    """특정 감성의 상위/하위 percentile에 해당하는 뉴스 필터링"""
    if df.empty:
        return pd.DataFrame()
    
    # label에 따라 필터링
    if label == 'positive':
        filtered_df = df[df['SA_result'] == 'positive']
        return filtered_df.nlargest(int(len(filtered_df) * percentile/100), 'sentiment_score')
    else:  # negative
        filtered_df = df[df['SA_result'] == 'negative']
        return filtered_df.nsmallest(int(len(filtered_df) * percentile/100), 'sentiment_score')

In [27]:
class InvestmentReportGenerator:
    """Investment report generation class with GPT integration"""
    
    MARKDOWN_INSTRUCTION = """
    응답은 반드시 markdown 문법에 따라 작성되어야 합니다.
    ** 보고서에는 반드시 주어진 정보에 대한 분석이 필요합니다 **
    """

    ANALYST_BASE_PROMPT = """
    당신은 증권회사에 고용된 {role}입니다.
    주식투자의 관점에서 주어진 정보들을 요약하고, 이에 대한 의견을 알려주세요.
    {additional_instructions}
    {markdown_instruction}
    """

    def __init__(self, ticker: str, year: str, quarter: str):
        """
        Initialize the report generator.
        
        Args:
            ticker: Stock ticker symbol
            year: Target year
            quarter: Target quarter (Q1-Q4)
        """
        self.tickers = tickers
        self.year = year
        self.quarter = quarter
        self.prompts = self._initialize_prompts()
        self.responses = {ticker: {} for ticker in tickers}
        self.report_data = {}
        self.start_date, self.end_date = self._get_date_range()

    def _initialize_prompts(self) -> Dict[str, str]:
        """Initialize system prompts with templated format"""
        return {
    "financial_system": self.ANALYST_BASE_PROMPT.format(
        role="재무전문가",
        additional_instructions="보고서 근거 기반 의견 제시",
        markdown_instruction=self.MARKDOWN_INSTRUCTION
    ),
    "financial_prompt": "",

    "intl_macro_system": self.ANALYST_BASE_PROMPT.format(
        role="국제관계전문가",
        additional_instructions="국가별 금리, GDP, 인플레이션 등 거시경제 정보 분석",
        markdown_instruction=self.MARKDOWN_INSTRUCTION
    ),
    "intl_macro_prompt": "",

    "sector_system": """증권사 경제전문가로서 투자 관점에서 정보 분석 및 의견 제시
        # 필수 포함 사항
        - 섹터별 성과와 동향 분석
        - 투자 매력도 평가 (근거 제시)
        - 차트 패턴 분석 및 기술적 시사점
        """ + self.MARKDOWN_INSTRUCTION,
    "sector_prompt": "",

    "final_system": """증권사 애널리스트팀장으로서 전략적 포트폴리오 제안서 작성
        # 필수 섹션 (각 3000토큰 이내)
        1. 거시경제 분석
        - 글로벌 동향 (GDP/물가/금리)
        - 주요 리스크와 전망

        2. 섹터 분석 
        - 성과/동향/매력도
        - 기술적 분석 시사점

        3. 종목 분석 (각 1000토큰 이내)
        - 재무분석 및 주요 지표
        - 투자의견과 목표가

        4. 포트폴리오 전략
        - 자산배분 및 비중조정
        - 위험관리 방안

        5. 투자포인트/리스크
        - 핵심 포인트 3개
        - 리스크 요인과 대응

        요구사항:
        - 구체적 데이터/근거 제시
        - 실행 가능한 전략 수립
        """ + self.MARKDOWN_INSTRUCTION,
    "final_prompt": ""
}

    def _get_date_range(self) -> tuple:
        """Get start and end dates for the given quarter"""
        quarter_months = {
            'Q1': ('01', '03'),
            'Q2': ('04', '06'),
            'Q3': ('07', '09'),
            'Q4': ('10', '12')
        }
        
        if self.quarter in quarter_months:
            start_month, end_month = quarter_months[self.quarter]
            start_date = f"{self.year}{start_month}01"
            end_date = f"{self.year}{end_month}{'30' if end_month in ['06', '09'] else '31'}"
            return start_date, end_date
        else:
            raise ValueError(f"Invalid quarter: {self.quarter}")

    def analyze_financial_data(self, ticker: str) -> str:
        """Analyze financial statements and generate report"""
        try:
            fin_statement = get_raw_fin_statement_info(ticker, self.year, self.quarter)
            fin_statement_dict = fin_statement.to_dict() if fin_statement is not None else {}
        except Exception:
            fin_statement_dict = {}

        try:
            fin_ratio = fin_statement_info(ticker, self.year, self.quarter)
            fin_ratio_dict = fin_ratio.to_dict('records')[0] if fin_ratio is not None and not fin_ratio.empty else {}
        except Exception:
            fin_ratio_dict = {}

        try:
            fin_report = reports_info(ticker, self.year, self.quarter)
            report_content = fin_report['1. 요약재무정보.csv'][0][4:-4] if not fin_report.empty else "정보 없음"
        except Exception:
            report_content = "정보 없음"
        
        prompt_data = {
            "재무제표": fin_statement_dict,
            "주요 재무 비율": fin_ratio_dict,
            "재무보고서": report_content
        }
        
        self.prompts["financial_prompt"] = "\n".join(f"{k}: {v}" for k, v in prompt_data.items())
        return to_GPT(self.prompts["financial_system"], self.prompts["financial_prompt"])

    def analyze_international_macro(self) -> str:
        """Analyze international news and macroeconomic data"""
        try:
            intl_news = intl_news_info(self.year, self.start_date, self.end_date)
            news_titles = list(intl_news['news_title']) if intl_news is not None and not intl_news.empty else []
        except Exception:
            news_titles = []

        try:
            macro_data = macro_econ_info(self.year, self.start_date, self.end_date)
        except Exception:
            macro_data = "거시경제 데이터 없음"
        
        self.prompts["intl_macro_prompt"] = "\n".join([
            f"국제 뉴스 헤드라인: {news_titles}",
            f"거시경제 관련 정보: {macro_data}"
        ])
        
        return to_GPT(self.prompts["intl_macro_system"], self.prompts["intl_macro_prompt"])

    def analyze_sector_and_pattern(self, ticker: str) -> str:
        """Analyze sector trends and chart patterns"""
        index_prices = {}
        try:
            sector_list = [s for s in os.listdir('../store_data/raw/market_data/sector') 
                          if '코스피' not in s]
        except Exception:
            sector_list = []
        
        # Collect sector data
        for sector in sector_list:
            try:
                index_price = index_price_info(sector, self.start_date, self.end_date)
                if index_price is not None and not index_price.empty:
                    index_price = index_price[['Close', 'Transaction_Val', 'Market_Cap', 'RSI_14']]
                    index_prices[sector] = index_price.T.to_dict()
            except Exception:
                continue
        
        # Collect sector analysis
        sector_infos = {}
        for sector in sector_list:
            try:
                sector_analysis = sector_analysis_info(sector, self.year, self.quarter)
                if sector_analysis is not None:
                    sector_infos[sector] = extract_key_metrics(sector_analysis)
            except Exception:
                continue

        try:
            pattern_data = pattern_info(ticker, self.end_date.replace('-', ''))
            pattern_dict = pattern_data.to_dict('records') if pattern_data is not None and not pattern_data.empty else None
        except Exception:
            pattern_dict = None
        
        self.prompts["sector_prompt"] = "\n".join([
            f"섹터별 가격 정보: {index_prices}",
            f"섹터별 carhart 4 factor 분석: {sector_infos}",
            f"차트 패턴 분석 결과: {pattern_dict}"
        ])
        
        return to_GPT(self.prompts["sector_system"], self.prompts["sector_prompt"])

    def analyze_stocks(self):
        """Execute analysis for all stocks"""
        # Only analyze macro once
        macro_response = self.analyze_international_macro()
        self.responses["international_macro"] = macro_response
        
        for ticker in self.tickers:
            print(f"\n=== {ticker} 분석 중... ===")
            try:
                self.responses[ticker].update({
                    "financial": self.analyze_financial_data(ticker),
                    "sector_pattern_analysis": self.analyze_sector_and_pattern(ticker)
                })
            except Exception as e:
                print(f"{ticker} 분석 중 오류 발생: {e}")

    def generate_final_report(self) -> str:
        """Generate comprehensive final report"""
        combined_prompt = []
        
        for ticker in self.tickers:
            try:
                # Process news
                corp_news_df = corp_rel_news_info(ticker, self.year, self.start_date, self.end_date)
                news_summary = self._process_news(corp_news_df)
            except FileNotFoundError:
                print(f"{ticker}: 뉴스 데이터 파일이 없습니다.")
                news_summary = {'Positive': [], 'Negative': []}
            except Exception as e:
                print(f"{ticker}: 뉴스 처리 중 오류 발생 - {e}")
                news_summary = {'Positive': [], 'Negative': []}
            
            try:
                # Get stock price
                stock_price = stock_price_info(ticker, self.start_date, self.end_date)
                price_dict = stock_price.to_dict() if stock_price is not None and not stock_price.empty else None
            except Exception:
                price_dict = None
            
            # Add stock analysis
            combined_prompt.extend([
                f"\n=== {ticker} 종목 분석 ===",
                f"재무제표 및 재무 비율 분석: {self._get_response_content(self.responses[ticker], 'financial')}",
                f"종목 관련 뉴스: {news_summary}",
                f"주가 정보: {price_dict}",
                f"섹터 분석: {self._get_response_content(self.responses[ticker], 'sector_pattern_analysis')}"
            ])

        # Add macro analysis
        combined_prompt.extend([
            "\n=== 거시경제 분석 ===",
            self._get_response_content(self.responses, "international_macro")
        ])

        self.prompts["final_prompt"] = "\n".join(combined_prompt)
        final_response = to_GPT(self.prompts["final_system"], self.prompts["final_prompt"])
        self.responses["final"] = final_response
        return final_response

    def _process_news(self, corp_news_df) -> Dict[str, list]:
        """Process corporate news and extract sentiment"""
        news_summary = {'Positive': [], 'Negative': []}
        
        if corp_news_df is not None and not corp_news_df.empty:
            try:
                # 증권 카테고리 필터링
                corp_news_df = corp_news_df[corp_news_df['news_category'].str.contains('증권', na=False)]
                
                if not corp_news_df.empty:
                    # SA 결과 및 감성 점수 추출
                    corp_news_df['SA_result'] = corp_news_df['news_title'].apply(lambda x: 
                        get_SA_result(x) if pd.notna(x) else None)
                    
                    # None이나 NaN이 아닌 행만 감성 점수 추출
                    valid_news = corp_news_df.dropna(subset=['SA_result'])
                    if not valid_news.empty:
                        valid_news = extract_sentiment_score(valid_news)
                        
                        for sentiment in ['positive', 'negative']:
                            try:
                                news = filter_by_percentile_and_label(valid_news, sentiment, 20)
                                if not news.empty:
                                    news_summary[sentiment.capitalize()] = list(news['news_title'])
                            except Exception:
                                continue
            except Exception as e:
                print(f"뉴스 처리 중 오류 발생: {e}")
        
        return news_summary

    def _get_response_content(self, response_dict: Dict, key: str) -> str:
        """Safely extract content from GPT response"""
        try:
            return response_dict.get(key, {}).get('choices', [{}])[0].get('message', {}).get('content', '')
        except (AttributeError, IndexError):
            return ''

    def print_report(self) -> None:
        """Print final report to stdout"""
        if not self.responses.get("final"):
            self.generate_final_report()
        
        report_content = self.responses["final"]["choices"][0]["message"]["content"]
        print("\n=== 최종 종합 보고서 ===")
        print(report_content)
        print("\n=== 보고서 끝 ===\n")

    def save_report(self) -> None:
        """Save final report to Notion DB"""
        page_title = f"{year}_{quarter}_analyst_rp"
        print(f"{page_title} 보고서를 노션 DB에 저장합니다...")
        to_DB('t_1', 
                page_title, 
                f"{self.start_date}_{self.end_date}", 
                self.responses["final"]["choices"][0]["message"]["content"]
        )

    def _save_to_notion(self, page_title: str) -> None:
        """Save report to Notion database"""
        print(f"{page_title} 보고서를 노션 DB에 저장합니다...")
        to_DB('t_1', 
                page_title, 
                f"{self.start_date}_{self.end_date}", 
                self.responses["final"]["choices"][0]["message"]["content"]
        )

In [28]:
import json

# JSON 파일 경로
json_file_path = "/Users/gamjawon/finTF/pipeline/notion_page_ids.json"

# JSON 파일 읽기 함수
def read_json(file_path):
    try:
        with open(file_path, "r", encoding="utf-8") as file:
            data = json.load(file)
        return data
    except FileNotFoundError:
        return {}
    
# JSON 파일 수정
def write_json(file_path, new_data):
    with open(file_path, "w", encoding="utf-8") as file:
        json.dump(new_data, file, ensure_ascii=False, indent=4)
        
# 'pf_selection_agent' 섹션에서 종목 코드 추출
def get_tickers_from_json(agent_type, title):
   data = read_json(json_file_path)
   if agent_type in data and title in data[agent_type]:
       page_id = data[agent_type][title]
       content = get_all_text_from_page(page_id)
       
       try:
           # final_portfolio 부분 추출
           start = content.find("'final_portfolio'")
           end = content.find("'corp_analysis_report'")
           portfolio_str = content[start:end].strip()
           
           # 종목코드만 추출
           import re
           tickers = re.findall(r"'(\d{6})'", portfolio_str)
           return list(set(tickers))  # 중복 제거
           
       except Exception as e:
           print(f"Error: {e}")
           return []
   return []

# 예시로 'pf_selection_agent'의 '2022_Q4_init_pf'에서 종목 코드 추출
if __name__ == "__main__":
    # 'pf_selection_agent'에서 '2022_Q4_init_pf' 종목 코드 가져오기
    tickers = get_tickers_from_json('pf_selection_agent', '2022_Q4_init_pf')

    # 연도 및 분기 설정
    year = "2022"
    quarter = "Q4"

    # 분석기 객체 생성
    analyzer = InvestmentReportGenerator(tickers, year, quarter)

    # 분석 실행
    analyzer.analyze_stocks()

    # 최종 보고서 생성
    final_report = analyzer.generate_final_report()

    # 보고서 출력
    analyzer.print_report()

    # 노션에 저장
    analyzer.save_report()  # 이 부분은 주석 처리하여 노션에 저장하려면 사용


UK에 대해 Inflation Rate 정보를 찾을 수 없습니다. | [Errno 2] No such file or directory: '/Users/gamjawon/finTF/pipeline/sub_func/get_info/../../../store_data/raw/FRED/UK/Inflation Rate/2022/2022_Inflation Rate.csv'

=== 033660 분석 중... ===
재무제표를 불러오는 과정에서 오류가 발생했습니다 | [Errno 2] No such file or directory: '/Users/gamjawon/finTF/pipeline/sub_func/get_info/../../../store_data/raw/opendart/store_financial_statement/033660/_033660_재무제표 ().csv'
/Users/gamjawon/finTF/pipeline/sub_func/get_info/../../../store_data/raw/market_data/price/033660/2022.10/2022.10_033660.csv 파일을 찾을 수 없습니다.
/Users/gamjawon/finTF/pipeline/sub_func/get_info/../../../store_data/raw/market_data/price/033660/2022.11/2022.11_033660.csv 파일을 찾을 수 없습니다.
/Users/gamjawon/finTF/pipeline/sub_func/get_info/../../../store_data/raw/market_data/price/033660/2022.12/2022.12_033660.csv 파일을 찾을 수 없습니다.
재무제표를 불러오는 과정에서 오류가 발생했습니다 | [Errno 2] No such file or directory: '/Users/gamjawon/finTF/pipeline/sub_func/get_info/../../../store_data/raw/opendart/s

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  corp_news_df['SA_result'] = corp_news_df['news_title'].apply(lambda x:
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  corp_news_df['SA_result'] = corp_news_df['news_title'].apply(lambda x:
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  corp_news_df['SA_result'] = corp_news_df['news_title'].apply(la

BadRequestError: Error code: 400 - {'error': {'message': "This model's maximum context length is 128000 tokens. However, your messages resulted in 435495 tokens. Please reduce the length of the messages.", 'type': 'invalid_request_error', 'param': 'messages', 'code': 'context_length_exceeded'}}

In [5]:
analyzer.save_report()

2022_Q4_init_pf 보고서를 노션 DB에 저장합니다...
Error: 404, {'object': 'error', 'status': 404, 'code': 'object_not_found', 'message': 'Could not find database with ID: 3827dd2d-b394-44a3-9c11-762153d30714. Make sure the relevant pages and databases are shared with your integration.', 'request_id': '814a740c-47da-405e-a3d4-11f82b0f6cde'}
오류 발생: 404 Client Error: Not Found for url: https://api.notion.com/v1/pages


In [7]:
class PortfolioManagerReportGenerator:
    def __init__(self, ticker: str, year: str, quarter: str):
        self.ticker = ticker
        self.year = year
        self.quarter = quarter
        self.prompts = self._initialize_prompts()
        self.responses = {}
        self.report_data = {}
        self.analyst_report = None
        self.current_portfolio = None
        
    def _initialize_prompts(self) -> Dict[str, str]:
        """프롬프트 초기화"""
        return {
            "portfolio_manager_system": """당신은 자산운용사의 수석 포트폴리오 매니저입니다. 
애널리스트팀의 리서치 보고서를 검토하여 전체 포트폴리오 운용 전략을 제시하세요.

분석 핵심:
- 종목/섹터 비중 분산
- 투자매력도와 리스크
- 거시경제 환경
- 위험/수익 특성

보고서 구조:

# 1. 포트폴리오 현황
- 전체 구성과 섹터 비중
- 주요 위험/수익 지표
- 현 포트폴리오 강약점

# 2. 시장 환경
- 거시경제/섹터 전망
- 핵심 리스크/기회 요인

# 3. 종목별 전략
각 종목:
- 현재/적정 비중
- 투자매력도
- 주요 모멘텀/리스크
- 비중 조절 방향

# 4. 포트폴리오 조정
- 운용 방향성
- 종목별 비중 조정(확대/축소/편입/편출)
- 섹터 전략

# 5. 리스크 관리
- 포트폴리오/종목별 관리
- 손절/이익실현 기준
- 모니터링 지표

요구사항:
1. 구체적 수치/근거 포함
2. 현황/전망 균형있게 서술
3. 전체 수익/위험 고려
4. 실행 가능한 전략 제시

응답은 markdown 형식으로 작성""",
            "portfolio_manager_prompt": ""
        }
        
    def set_analyst_report(self, report: str) -> None:
        """애널리스트 보고서 설정"""
        self.analyst_report = report
        
    def set_current_portfolio(self, portfolio: Dict) -> None:
        """현재 포트폴리오 설정"""
        self.current_portfolio = portfolio
        
    def generate_portfolio_report(self) -> str:
        """포트폴리오 매니저 보고서 생성"""
        if self.analyst_report is None or self.current_portfolio is None:
            raise ValueError("Analyst report and current portfolio must be set before generating report")
            
        self.prompts["portfolio_manager_prompt"] = f"현재 포트폴리오 구성: {self.current_portfolio}\n"
        self.prompts["portfolio_manager_prompt"] += f"애널리스트 보고서: {self.analyst_report}"
        
        response = to_GPT(self.prompts["portfolio_manager_system"], 
                         self.prompts["portfolio_manager_prompt"])
        self.responses["portfolio_manager"] = response
        return response
        
    def save_report(self) -> None:
        """노션 DB에 포트폴리오 매니저 보고서 저장"""
        if not self.responses.get("portfolio_manager"):
            self.generate_portfolio_report()
        
        page_title = f"{self.year}_{self.quarter}_{self.ticker}_portfolio_manager_report"

        print(f"{page_title} 보고서를 노션 DB에 저장합니다...")
        to_DB('t_1', 
            page_title, 
            f"{self.quarter}_{self.year}", 
            self.responses["portfolio_manager"]["choices"][0]["message"]["content"]
        )



In [8]:
def get_current_portfolio():
    data = read_json(json_file_path)
    agent_type = 'pf_selection_agent'
    title = '2022_Q4_init_pf'
    # 특정 agent_type과 title에 해당하는 종목 코드 추출
    if agent_type in data and title in data[agent_type]:
        portfolio = data[agent_type][title]
        return portfolio
    else:
        return []

In [9]:
if __name__ == "__main__":
# 2. 종합 포트폴리오 매니저 보고서 생성
    # 티커는 대표 종목으로 설정하거나 "MULTI"와 같은 식별자 사용
    portfolio_manager = PortfolioManagerReportGenerator(f"{tickers[0]}_외_{len(tickers)-1}종목", year, quarter)
    
    # 애널리스트 보고서와 현재 포트폴리오 설정
    portfolio_manager.set_analyst_report(final_report["choices"][0]["message"]["content"])
    portfolio_manager.set_current_portfolio(get_current_portfolio())
    
    # 보고서 생성
    portfolio_report = portfolio_manager.generate_portfolio_report()
    
    # 보고서 출력
    print("\n=== 포트폴리오 매니저 종합 보고서 ===")
    print(portfolio_report["choices"][0]["message"]["content"])
    print("=== 보고서 끝 ===")
    
    # 선택적으로 노션에 저장
    portfolio_manager.save_report()

IndexError: list index out of range

In [24]:
class TraderReportGenerator:
    def __init__(self, tickers: List[str], year: str, quarter: str):
        self.tickers = tickers
        self.year = year
        self.quarter = quarter
        self.prompts = self._initialize_prompts()
        self.responses = {ticker: {} for ticker in tickers}
        self.report_data = {}
        self.start_date, self.end_date = self._get_date_range()
        self.price_data = {}
        self.analyst_reports = {}
        self.pm_reports = {}
        self.price_predictions = {}
        
    def _initialize_prompts(self) -> Dict[str, str]:
        """프롬프트 초기화"""
        return {
            "trader_system": """당신은 증권사의 트레이더입니다. 여러 종목의 데이터와 보고서들을 분석하여 핵심적인 매매 의견을 제시하되, 출력은 3만~4만 토큰 이내로 제한해야 합니다.

        응답 형식:
        # 시장 전반 분석 (1-2 문단)
        - 주요 시장 동향
        - 핵심 매매 전략

        # 주요 관심 종목 분석 (상위 5-7개 종목)
        각 종목별로:
        - 현재가/예측가 핵심 동향
        - 매매 방향과 가격대
        - 주요 리스크와 손절가

        # 기타 종목 매매 요약 (1-2문단)
        - 매수/매도 종목 구분
        - 핵심 진입/청산 전략

        모든 분석은 반드시 제공된 데이터(가격, 예측, 애널리스트/PM 보고서)에 기반해야 합니다.
        응답은 markdown 형식으로 작성하되, 핵심 정보 위주로 간단명료하게 작성하세요.""",
            "trader_prompt": ""
        }

    def _get_date_range(self) -> tuple:
        """분기에 해당하는 시작일과 종료일 반환"""
        quarter_months = {
            'Q1': ('01', '03'),
            'Q2': ('04', '06'),
            'Q3': ('07', '09'),
            'Q4': ('10', '12')
        }
        
        if self.quarter in quarter_months:
            start_month, end_month = quarter_months[self.quarter]
            start_date = f"{self.year}{start_month}01"
            end_date = f"{self.year}{end_month}{'30' if end_month in ['06', '09'] else '31'}"
            return start_date, end_date
        else:
            raise ValueError(f"Invalid quarter: {self.quarter}")

    def set_price_data(self) -> None:
        """주가 데이터 설정"""
        try:
            for ticker in self.tickers:
                self.price_data[ticker] = stock_price_info(ticker, self.start_date, self.end_date)
        except Exception as e:
            print(f"가격 데이터 설정 중 오류 발생: {str(e)}")


    def set_analyst_report(self, report: str) -> None:
        """애널리스트 보고서 설정"""
        self.analyst_report = report

    def set_pm_report(self, report: str) -> None:
        """포트폴리오 매니저 보고서 설정"""
        self.pm_report = report

    def get_price_prediction(self) -> None:
        """GRU 모델을 사용한 가격 예측"""
        try:
            self.price_predictions = predict_multiple_prices(
                self.tickers,
                self.start_date,
                self.end_date
            )
        except Exception as e:
            print(f"가격 예측 모델 실행 중 오류 발생: {str(e)}")

    def generate_trader_report(self) -> str:
        """트레이더 보고서 생성"""
        price_data_str = {ticker: data.to_dict() if hasattr(data, 'to_dict') else data 
                        for ticker, data in self.price_data.items()} if self.price_data else {}
        
        self.prompts["trader_prompt"] = "\n".join([
            f"전체 주가 데이터: {price_data_str}",
            f"전체 종목 가격 예측: {self.price_predictions or {}}",
            f"애널리스트 종합 보고서: {self.analyst_report or '정보 없음'}",
            f"포트폴리오 매니저 종합 보고서: {self.pm_report or '정보 없음'}"
        ])

        response = to_GPT(self.prompts["trader_system"], self.prompts["trader_prompt"])
        self.responses["trader"] = response
        return response

    def save_report(self) -> None:
        """노션 DB에 트레이더 보고서 저장"""
        if not self.responses.get("trader"):
            self.generate_trader_report()

        page_title = f"{self.year}_{self.quarter}_전체종목_trader_report"
   
        db_info = get_all_page_ids_from_database('t_1')
        
        to_DB('t_1', 
                page_title, 
                f"{self.quarter}_{self.year}", 
                self.responses["trader"]["choices"][0]["message"]["content"]
            )
        '''
        if page_title not in list(db_info.keys()):
            print(f"{page_title} 보고서를 노션 DB에 저장합니다...")
            to_DB('t_1', 
                page_title, 
                f"{self.quarter}_{self.year}", 
                self.responses["trader"]["choices"][0]["message"]["content"]
            )
        else:
            print(f"{page_title} 보고서가 이미 존재합니다.")
        '''

In [None]:
if __name__ == "__main__":
# 2. 종합 포트폴리오 매니저 보고서 생성
    # 티커는 대표 종목으로 설정하거나 "MULTI"와 같은 식별자 사용

    trader = TraderReportGenerator(tickers, year, quarter)
    
    # 가격 데이터와 애널리스트 보고서와 현재 포트폴리오 설정
    trader.set_price_data()
    trader.set_analyst_report(final_report["choices"][0]["message"]["content"])
    trader.set_pm_report(portfolio_report["choices"][0]["message"]["content"])
    
    # 보고서 생성
    trader_report = trader.generate_trader_report()
    
    # 보고서 출력
    print("\n=== 트레이더 종합 보고서 ===")
    print(trader_report["choices"][0]["message"]["content"])
    print("=== 보고서 끝 ===")
    
    # 선택적으로 노션에 저장
    # trader.save_report()

['005930']
로드 데이터 함수 시작
[DEBUG] 새로운 데이터 로드 시도
[DEBUG] 연도: 2020, 월: 10
[DEBUG] 매핑된 분기: Q4
[DEBUG] 재무제표 데이터 로드: True

[DEBUG] 외국인 보유 비중 데이터 로드 시도
[DEBUG] 외국인 보유 비중 데이터 로드 성공
[DEBUG] 데이터 병합 완료
[DEBUG] 최종 데이터 shape: (61, 5)
[DEBUG] 학습 데이터 통계:
X_train shape: (46, 15, 3)
y_train shape: (46,)
X_train 값 범위: 0.0 ~ 1.0
y_train 값 범위: 0.0 ~ 1.0
Epoch 1/50


  super().__init__(**kwargs)


[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 80ms/step - loss: 0.0856 - mae: 0.3562 - val_loss: 0.1916 - val_mae: 0.6095 - learning_rate: 0.0010
Epoch 2/50
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step - loss: 0.0495 - mae: 0.2611 - val_loss: 0.1485 - val_mae: 0.5330 - learning_rate: 0.0010
Epoch 3/50
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step - loss: 0.0288 - mae: 0.1940 - val_loss: 0.1099 - val_mae: 0.4540 - learning_rate: 0.0010
Epoch 4/50
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step - loss: 0.0174 - mae: 0.1487 - val_loss: 0.0801 - val_mae: 0.3829 - learning_rate: 0.0010
Epoch 5/50
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step - loss: 0.0092 - mae: 0.1075 - val_loss: 0.0584 - val_mae: 0.3209 - learning_rate: 0.0010
Epoch 6/50
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step - loss: 0.0079 - mae: 0.1054 - val_loss: 0.0426 - val_mae: 0.26

In [None]:
fin_data = fin_statement_info("005930", "2020", "Q4")
print("재무제표 데이터:")
print(fin_data)

재무제표 데이터:
   Stock Price        PER       PBR       ROE  Profit Growth Rate (%)  \
0        81000  18.310946  1.752331  0.095699                21.47751   

   CAGR (%)  
0  0.214775  


In [43]:
import pandas as pd
import numpy as np
from sub_func import *

from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import GRU, Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
import tensorflow as tf
from sklearn.preprocessing import MinMaxScaler
import os
import joblib
from pykrx import stock

def predict_multiple_prices(tickers: list, start_date: str, end_date: str, price_data_dict=None) -> dict:
    predictions = {}
    
    if not os.path.exists('models'):
        os.makedirs('models')
    
    for ticker in tickers:
        try:
            data = load_stock_data(ticker, start_date, end_date)
            if data is None:
                continue
                
            # PER이 0인 경우 처리
            if np.all(data['PER'] == 0):
                print("[WARNING] PER이 모두 0입니다. 평균값으로 대체합니다.")
                data['PER'] = 15.0  # 일반적인 PER 평균값으로 대체
            
            # 스케일링
            scaler = MinMaxScaler()
            scaled_data = scaler.fit_transform(data[['close', 'high', 'PER', 'foreign_holding']])
            data[['close', 'high', 'PER', 'foreign_holding']] = scaled_data
            
            # 시계열 데이터 준비
            window_size = 15
            X = []
            y = []
            for i in range(window_size, len(data)):
                X.append(data[['high', 'PER', 'foreign_holding']].values[i-window_size:i])
                y.append(data['close'].values[i])
            X = np.array(X)
            y = np.array(y)
            
            # 모델 학습
            model = create_and_train_model(X, y, ticker)
            
            # 예측 수행
            last_sequence = X[-1:]
            future_predictions = []
            current_sequence = last_sequence.copy()
            
            for _ in range(20):
                pred = model.predict(current_sequence, verbose=0)
                future_predictions.append(float(pred[0, 0]))
                
                current_sequence = np.roll(current_sequence, -1, axis=1)
                current_sequence[0, -1] = [pred[0, 0], data['PER'].iloc[-1], data['foreign_holding'].iloc[-1]]
            
            # 예측값 역변환
            future_predictions = np.array(future_predictions).reshape(-1, 1)
            future_predictions = np.concatenate([future_predictions, np.zeros((len(future_predictions), 3))], axis=1)
            future_predictions = scaler.inverse_transform(future_predictions)[:, 0]
            
            predictions[ticker] = {
                'current_price': float(scaler.inverse_transform([[data['close'].iloc[-1], 0, 0, 0]])[0, 0]),
                'predicted_prices': future_predictions.tolist(),
                'prediction_dates': pd.date_range(
                    start=pd.to_datetime(data['date'].iloc[-1]) + pd.Timedelta(days=1),
                    periods=20
                ).strftime('%Y-%m-%d').tolist()
            }
            
        except Exception as e:
            print(f"{ticker} 예측 중 오류 발생: {str(e)}")
            predictions[ticker] = None
    
    return predictions

def predict_price(ticker: str, start_date: str = None, end_date: str = None) -> dict:
    """
    GRU 모델을 사용하여 주가를 예측하는 함수
    
    Args:
        ticker (str): 주식 종목 코드
        start_date (str): 예측 시작일 (YYYYMMDD 형식)
        end_date (str): 예측 종료일 (YYYYMMDD 형식)
    
    Returns:
        dict: 예측 결과를 담은 딕셔너리
    """
    try:
        # 모델 및 스케일러 경로 설정
        MODEL_PATH = f'models/{ticker}_gru_model.h5'
        SCALER_PATH = f'models/{ticker}_scaler.pkl'
        
        # 데이터 로드 
        data = load_stock_data(ticker, start_date, end_date)  
        
        # 데이터 전처리
        data['date'] = pd.to_datetime(data['date'])
        data = data.sort_values('date')
        
        # 저장된 스케일러 로드 또는 새로 생성
        if os.path.exists(SCALER_PATH):
            scaler = joblib.load(SCALER_PATH)
        else:
            scaler = MinMaxScaler()
            data[['close', 'high', 'PER', 'foreign_holding']] = scaler.fit_transform(
                data[['close', 'high', 'PER', 'foreign_holding']]
            )
            joblib.dump(scaler, SCALER_PATH)
        
        # 시계열 데이터 준비
        window_size = 15
        X = []
        for i in range(window_size, len(data)):
            X.append(data[['high', 'PER', 'foreign_holding']].values[i-window_size:i])
        X = np.array(X)
        
        # 모델 로드 또는 새로 생성
        if os.path.exists(MODEL_PATH):
            model = load_model(MODEL_PATH)
        else:
            model = create_and_train_model(X, data['close'].values[window_size:], ticker)
        
        # 다음 분기 예측
        future_predictions = []
        last_sequence = X[-1:]
        
        # 다음 20일(약 한 달) 예측
        for _ in range(20):
            next_pred = model.predict(last_sequence)
            future_predictions.append(next_pred[0, 0])
            
            # 다음 예측을 위한 시퀀스 업데이트
            last_sequence = np.roll(last_sequence, -1, axis=1)
            last_sequence[0, -1] = next_pred
        
        # 예측값 역변환
        future_predictions = np.array(future_predictions).reshape(-1, 1)
        future_predictions = scaler.inverse_transform(
            np.concatenate((future_predictions, np.zeros((future_predictions.shape[0], 3))), axis=1)
        )[:, 0]
        
        # 결과 정리
        result = {
            'current_price': data['close'].iloc[-1],
            'predicted_prices': future_predictions.tolist(),
            'prediction_dates': pd.date_range(
                start=data['date'].iloc[-1] + pd.Timedelta(days=1), 
                periods=20
            ).strftime('%Y-%m-%d').tolist(),
            'confidence_level': calculate_confidence_level(model, X, data['close'].values[window_size:])
        }
        
        return result
        
    except Exception as e:
        print(f"예측 중 오류 발생: {str(e)}")
        return None

def create_and_train_model(X_train, y_train, ticker):
    """GRU 모델 생성 및 학습"""
    print(f"[DEBUG] 학습 데이터 통계:")
    print(f"X_train shape: {X_train.shape}")
    print(f"y_train shape: {y_train.shape}")
    print(f"X_train 값 범위: {np.min(X_train)} ~ {np.max(X_train)}")
    print(f"y_train 값 범위: {np.min(y_train)} ~ {np.max(y_train)}")

    # 입력 데이터 검증
    if np.any(np.isnan(X_train)) or np.any(np.isinf(X_train)):
        raise ValueError("입력 데이터에 NaN 또는 무한값이 포함되어 있습니다.")

    if np.any(np.isnan(y_train)) or np.any(np.isinf(y_train)):
        raise ValueError("타겟 데이터에 NaN 또는 무한값이 포함되어 있습니다.")

    # 모델 구조
    model = Sequential([
        GRU(32, input_shape=(X_train.shape[1], X_train.shape[2])),
        Dense(16, activation='relu'),
        Dense(1)
    ])
    
    # 컴파일
    optimizer = Adam(learning_rate=0.001)
    model.compile(
        optimizer=optimizer,
        loss='huber',
        metrics=['mae']
    )
    
    # 콜백
    callbacks = [
        EarlyStopping(
            monitor='val_loss',
            patience=10,
            restore_best_weights=True
        ),
        ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=5,
            min_lr=0.0001
        )
    ]
    
    # 학습
    history = model.fit(
        X_train, y_train,
        epochs=50,
        batch_size=16,
        validation_split=0.2,
        callbacks=callbacks,
        verbose=1
    )
    
    # 학습 결과 검증
    print("\n[DEBUG] 모델 평가:")
    val_predictions = model.predict(X_train[-int(len(X_train)*0.2):])
    val_true = y_train[-int(len(y_train)*0.2):]
    mse = np.mean((val_predictions - val_true) ** 2)
    print(f"검증 세트 MSE: {mse}")
    
    return model

def calculate_confidence_level(model, X, y_true):
    """예측 신뢰도 계산"""
    y_pred = model.predict(X)
    mse = np.mean((y_pred - y_true.reshape(-1, 1)) ** 2)
    confidence = np.exp(-mse)  # 0~1 사이의 값으로 변환
    return float(confidence)

def load_stock_data(ticker: str, start_date: str, end_date: str, price_data=None) -> pd.DataFrame:
    """
    주식 데이터를 로드하는 함수
    price_data: TraderReportGenerator에서 이미 로드된 가격 데이터
    """
    print("로드 데이터 함수 시작")
    try:
        if price_data is not None:
            print("[DEBUG] 기존 price_data 사용")  # 디버깅
            selected_data = pd.DataFrame()
            selected_data['close'] = price_data['Close']
            selected_data['high'] = price_data['High']
            selected_data['date'] = price_data.index
            selected_data = selected_data.reset_index(drop=True)
        else:
            print("[DEBUG] 새로운 데이터 로드 시도")  # 디버깅
            price_data = stock_price_info(ticker, start_date, end_date)
            
            if price_data is None:
                print(f"Warning: 가격 데이터를 가져올 수 없습니다: {ticker}")
                return None
            
            selected_data = pd.DataFrame()
            selected_data['close'] = price_data['Close']
            selected_data['high'] = price_data['High']
            selected_data['date'] = price_data.index
            selected_data = selected_data.reset_index(drop=True)
            
        # 연도와 분기 추출
        year = start_date[:4]
        month = start_date[4:6]
        print(f"[DEBUG] 연도: {year}, 월: {month}")  # 디버깅
        
        # 분기 매핑
        quarter_map = {
            'Q1': ['01', '02', '03'],
            'Q2': ['04', '05', '06'],
            'Q3': ['07', '08', '09'],
            'Q4': ['10', '11', '12']
        }
        
        quarter = next(q for q, months in quarter_map.items() if month in months)
        print(f"[DEBUG] 매핑된 분기: {quarter}")  # 디버깅
        
        # PER 데이터 가져오기
        fin_data = fin_statement_info(ticker, year, quarter)
        print(f"[DEBUG] 재무제표 데이터 로드: {fin_data is not None}")  # 디버깅
        
        if fin_data is not None:
            per_value = fin_data['PER'].iloc[0]
        else:
            per_value = None
            
        # PER 컬럼 추가
        selected_data['PER'] = per_value
        
        # 외국인 보유 비중 추가
        try:
            print("\n[DEBUG] 외국인 보유 비중 데이터 로드 시도")
            foreign_data = stock.get_exhaustion_rates_of_foreign_investment(start_date, end_date, ticker)
            print(f"[DEBUG] 외국인 보유 비중 데이터 로드 성공")
            
            # 데이터 병합을 위해 인덱스 처리
            selected_data['date'] = pd.to_datetime(selected_data['date'])
            foreign_data = foreign_data.reset_index()
            foreign_data.columns = ['date' if col == '날짜' else col for col in foreign_data.columns]
            foreign_data['date'] = pd.to_datetime(foreign_data['date'])
            
            # 날짜 기준으로 데이터 병합
            selected_data = pd.merge(selected_data, 
                                   foreign_data[['date', '지분율']], 
                                   on='date', 
                                   how='left')
            selected_data = selected_data.rename(columns={'지분율': 'foreign_holding'})
            
            print("[DEBUG] 데이터 병합 완료")
        except Exception as e:
            print(f"[DEBUG] 외국인 보유 비중 로드 실패: {str(e)}")
            selected_data['foreign_holding'] = 0
            
        print(f"[DEBUG] 최종 데이터 shape: {selected_data.shape}")  # 디버깅
        return selected_data
        
    except Exception as e:
        print(f"데이터 로드 중 오류 발생: {str(e)}")
        print(f"[DEBUG] 오류 발생 위치 정보: {e.__traceback__.tb_lineno}")  # 디버깅
        return None