In [7]:
!pip install scipy

Collecting scipy
  Downloading scipy-1.16.0-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (61 kB)
Downloading scipy-1.16.0-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.whl (35.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m35.3/35.3 MB[0m [31m6.1 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: scipy
Successfully installed scipy-1.16.0


In [8]:
# Cell 1: 필요한 라이브러리 임포트
import sys
import os
sys.path.insert(0, '/home/grey1/stock-kafka3/airflow/plugins')
sys.path.insert(0, '/home/grey1/stock-kafka3/airflow/common')

import yfinance as yf
import pandas as pd
from datetime import datetime, timedelta
import time
import duckdb

# Cell 2: DuckDBManager 클래스 직접 정의 (임포트 에러 해결)
class DuckDBManager:
    """DuckDB 데이터베이스 관리 클래스"""
    
    def __init__(self, db_path: str):
        self.db_path = db_path
        self.con = duckdb.connect(database=db_path)
        self._initialize_tables()
    
    def _initialize_tables(self):
        """테이블 초기화"""
        # nasdaq_symbols 테이블
        self.con.execute("""
            CREATE TABLE IF NOT EXISTS nasdaq_symbols (
                symbol VARCHAR PRIMARY KEY,
                name VARCHAR,
                market_cap BIGINT,
                sector VARCHAR,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)
        
        # stock_data 테이블
        self.con.execute("""
            CREATE TABLE IF NOT EXISTS stock_data (
                symbol VARCHAR,
                date DATE,
                open DOUBLE,
                high DOUBLE,
                low DOUBLE,
                close DOUBLE,
                volume BIGINT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                PRIMARY KEY (symbol, date)
            )
        """)
    
    def get_active_symbols(self):
        """활성 심볼 목록 조회"""
        try:
            result = self.con.execute("""
                SELECT symbol FROM nasdaq_symbols
            """).fetchall()
            
            # 튜플 리스트를 문자열 리스트로 변환
            symbols = [row[0] for row in result]
            return symbols
        except Exception as e:
            print(f"❌ 심볼 조회 오류: {e}")
            return []
    
    def save_stock_data(self, data: dict):
        """주가 데이터 저장"""
        try:
            self.con.execute("""
                INSERT OR REPLACE INTO stock_data 
                (symbol, date, open, high, low, close, volume)
                VALUES (?, ?, ?, ?, ?, ?, ?)
            """, (
                data['symbol'],
                data['date'],
                data['open'],
                data['high'],
                data['low'],
                data['close'],
                data['volume']
            ))
            return True
        except Exception as e:
            print(f"❌ 데이터 저장 오류: {e}")
            return False
    
    def close(self):
        """연결 종료"""
        if hasattr(self, 'con'):
            self.con.close()

# Cell 3: YFinanceCollector 클래스 정의
class YFinanceCollector:
    """yfinance 기반 주식 데이터 수집 클래스 (테스트용)"""
    
    def __init__(self, db_path: str = "/home/grey1/stock-kafka3/data/duckdb/stock_data.db"):
        self.db = DuckDBManager(db_path)
    
    def collect_stock_data(self, symbol: str, period: str = "1mo") -> bool:
        """개별 종목 주가 데이터 수집 (테스트용 - 1개월)"""
        import random
        
        try:
            # API 호출 제한 방지를 위한 지연
            delay = random.uniform(0.5, 1.5)
            time.sleep(delay)
            
            print(f"🔍 {symbol} 데이터 수집 중...")
            
            # yfinance로 데이터 수집
            ticker = yf.Ticker(symbol)
            hist = ticker.history(
                period=period, 
                auto_adjust=True,
                prepost=False,
                actions=False,
                repair=True
            )
            
            if hist.empty:
                print(f"⚠️ {symbol}: 데이터가 비어있음")
                return False
            
            print(f"✅ {symbol}: {len(hist)}개 레코드 수집됨")
            
            # 데이터 저장 (선택사항)
            # hist = hist.reset_index()
            # for _, row in hist.iterrows():
            #     self.db.save_stock_data({
            #         'symbol': symbol,
            #         'date': row['Date'].date(),
            #         'open': row['Open'],
            #         'high': row['High'],
            #         'low': row['Low'],
            #         'close': row['Close'],
            #         'volume': row['Volume']
            #     })
            
            return True
            
        except Exception as e:
            print(f"❌ {symbol}: 수집 실패 - {e}")
            return False
    
    def close(self):
        """리소스 정리"""
        if hasattr(self, 'db'):
            self.db.close()

# Cell 4: 단일 종목 테스트
collector = YFinanceCollector()

# 애플 주식 테스트
test_symbol = "AAPL"
print(f"📊 {test_symbol} 주식 데이터 수집 테스트")

success = collector.collect_stock_data(test_symbol, period="5d")  # 5일치만
print(f"결과: {'성공' if success else '실패'}")

# Cell 5: yfinance 직접 테스트 (디버깅용)
ticker = yf.Ticker("AAPL")
hist = ticker.history(period="5d")

print(f"📊 수집된 데이터 shape: {hist.shape}")
print(f"📊 컬럼: {hist.columns.tolist()}")
print("\n📊 최근 5일 데이터:")
print(hist.tail())

# Cell 6: 심볼 형식 문제 디버깅
db = DuckDBManager("/home/grey1/stock-kafka3/data/duckdb/stock_data.db")
symbols = db.get_active_symbols()

print(f"🔍 DB에서 조회된 심볼 타입: {type(symbols)}")
print(f"🔍 심볼 개수: {len(symbols) if symbols else 0}")

if symbols and len(symbols) > 0:
    print(f"\n🔍 첫 5개 심볼:")
    for i, symbol in enumerate(symbols[:5]):
        print(f"  [{i}] 타입: {type(symbol)}, 값: '{symbol}'")

db.close()

📊 AAPL 주식 데이터 수집 테스트
🔍 AAPL 데이터 수집 중...
✅ AAPL: 5개 레코드 수집됨
결과: 성공
📊 수집된 데이터 shape: (5, 7)
📊 컬럼: ['Open', 'High', 'Low', 'Close', 'Volume', 'Dividends', 'Stock Splits']

📊 최근 5일 데이터:
                                 Open        High         Low       Close  \
Date                                                                        
2025-07-14 00:00:00-04:00  209.929993  210.910004  207.539993  208.619995   
2025-07-15 00:00:00-04:00  209.220001  211.889999  208.919998  209.110001   
2025-07-16 00:00:00-04:00  210.300003  212.399994  208.639999  210.160004   
2025-07-17 00:00:00-04:00  210.570007  211.800003  209.589996  210.020004   
2025-07-18 00:00:00-04:00  210.869995  211.789993  209.699997  211.179993   

                             Volume  Dividends  Stock Splits  
Date                                                          
2025-07-14 00:00:00-04:00  38840100        0.0           0.0  
2025-07-15 00:00:00-04:00  42296300        0.0           0.0  
2025-07-16 00:00:00-04:00 

In [None]:
# Cell 7: 워커 수 최적화 테스트
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from datetime import datetime
import random

class WorkerOptimizationTester:
    """워커 수 최적화 테스트 클래스"""
    
    def __init__(self):
        self.test_symbols = [
            "AAPL", "MSFT", "GOOGL", "AMZN", "TSLA", 
            "META", "NVDA", "AMD", "INTC", "ORCL",
            "IBM", "CRM", "ADBE", "NFLX", "PYPL",
            "UBER", "LYFT", "ZOOM", "WORK", "DOCU"
        ]
        self.results = {}
    
    def test_single_symbol(self, symbol, delay_range=(0.1, 0.5)):
        """단일 심볼 테스트 (API 호출 시뮬레이션)"""
        start_time = time.time()
        
        try:
            # API 호출 제한 방지 지연
            delay = random.uniform(*delay_range)
            time.sleep(delay)
            
            # yfinance 호출
            ticker = yf.Ticker(symbol)
            hist = ticker.history(period="5d", timeout=10)
            
            if hist.empty:
                return {
                    'symbol': symbol,
                    'success': False,
                    'duration': time.time() - start_time,
                    'error': 'Empty data'
                }
            
            return {
                'symbol': symbol,
                'success': True,
                'duration': time.time() - start_time,
                'records': len(hist),
                'error': None
            }
            
        except Exception as e:
            return {
                'symbol': symbol,
                'success': False,
                'duration': time.time() - start_time,
                'error': str(e)
            }
    
    def test_worker_count(self, worker_count, symbol_batch_size=10):
        """특정 워커 수로 테스트 실행"""
        print(f"\n🧪 워커 수 {worker_count}개 테스트 시작...")
        
        test_symbols = self.test_symbols[:symbol_batch_size]
        start_time = time.time()
        
        results = []
        failed_count = 0
        api_limit_errors = 0
        
        with ThreadPoolExecutor(max_workers=worker_count) as executor:
            # 작업 제출
            future_to_symbol = {
                executor.submit(self.test_single_symbol, symbol): symbol 
                for symbol in test_symbols
            }
            
            # 결과 수집
            for future in as_completed(future_to_symbol):
                result = future.result()
                results.append(result)
                
                if not result['success']:
                    failed_count += 1
                    error = result['error'].lower()
                    if '429' in error or 'rate limit' in error or 'too many' in error:
                        api_limit_errors += 1
        
        total_time = time.time() - start_time
        success_count = len(results) - failed_count
        
        # 통계 계산
        successful_durations = [r['duration'] for r in results if r['success']]
        avg_duration = sum(successful_durations) / len(successful_durations) if successful_durations else 0
        
        test_result = {
            'worker_count': worker_count,
            'total_symbols': len(test_symbols),
            'success_count': success_count,
            'failed_count': failed_count,
            'api_limit_errors': api_limit_errors,
            'total_time': total_time,
            'avg_symbol_time': avg_duration,
            'symbols_per_second': success_count / total_time if total_time > 0 else 0,
            'success_rate': success_count / len(test_symbols) * 100
        }
        
        self.results[worker_count] = test_result
        
        print(f"✅ 워커 {worker_count}개 완료:")
        print(f"   성공: {success_count}/{len(test_symbols)} ({test_result['success_rate']:.1f}%)")
        print(f"   API 제한 오류: {api_limit_errors}개")
        print(f"   총 시간: {total_time:.1f}초")
        print(f"   처리율: {test_result['symbols_per_second']:.2f} symbols/sec")
        
        return test_result
    
    def run_optimization_test(self):
        """전체 최적화 테스트 실행"""
        print("🚀 워커 수 최적화 테스트 시작")
        print("=" * 50)
        
        # 테스트할 워커 수 범위
        worker_counts = [1, 3, 5, 7, 10, 15]
        
        for worker_count in worker_counts:
            try:
                self.test_worker_count(worker_count, symbol_batch_size=15)
                
                # 테스트 간 쿨다운 (API 제한 방지)
                if worker_count != worker_counts[-1]:
                    print(f"⏰ 쿨다운 대기 중... (10초)")
                    time.sleep(10)
                    
            except Exception as e:
                print(f"❌ 워커 {worker_count}개 테스트 실패: {e}")
        
        return self.analyze_results()
    
    def analyze_results(self):
        """결과 분석 및 최적 워커 수 추천"""
        if not self.results:
            return None
        
        print(f"\n📊 워커 수 최적화 테스트 결과 분석")
        print("=" * 50)
        
        # 결과 테이블 출력
        print(f"{'워커수':<6} {'성공률':<8} {'API제한':<8} {'처리율':<12} {'평균시간':<10}")
        print("-" * 50)
        
        best_worker_count = None
        best_score = 0
        
        for worker_count, result in sorted(self.results.items()):
            # 종합 점수 계산 (성공률 * 처리율 - API 제한 페널티)
            score = (result['success_rate'] / 100) * result['symbols_per_second']
            api_penalty = result['api_limit_errors'] * 0.1
            final_score = max(0, score - api_penalty)
            
            if final_score > best_score:
                best_score = final_score
                best_worker_count = worker_count
            
            print(f"{worker_count:<6} {result['success_rate']:<7.1f}% {result['api_limit_errors']:<8} "
                  f"{result['symbols_per_second']:<11.2f} {result['avg_symbol_time']:<9.2f}s")
        
        print(f"\n🎯 추천 설정:")
        print(f"   최적 워커 수: {best_worker_count}개")
        print(f"   예상 성공률: {self.results[best_worker_count]['success_rate']:.1f}%")
        print(f"   예상 처리율: {self.results[best_worker_count]['symbols_per_second']:.2f} symbols/sec")
        
        # 설정 제안
        batch_size = 100 // best_worker_count if best_worker_count else 50
        estimated_time = 3000 / (self.results[best_worker_count]['symbols_per_second'] * 60) if best_worker_count else 60
        
        print(f"\n🔧 프로덕션 설정 제안:")
        print(f"   MAX_WORKERS = {best_worker_count}")
        print(f"   BATCH_SIZE = {batch_size}")
        print(f"   예상 처리 시간 (3000개 심볼): {estimated_time:.1f}분")
        
        return best_worker_count

# 테스트 실행
tester = WorkerOptimizationTester()
optimal_workers = tester.run_optimization_test()

In [None]:
# Cell 8: 빠른 워커 테스트 (5분 버전)
def quick_worker_test():
    """빠른 워커 수 테스트 (소규모)"""
    print("⚡ 빠른 워커 테스트 시작 (5개 심볼, 3가지 워커 수)")
    
    quick_tester = WorkerOptimizationTester()
    
    # 소규모 테스트
    test_workers = [3, 5, 8]
    test_symbols = ["AAPL", "MSFT", "GOOGL", "TSLA", "NVDA"]
    
    results = {}
    
    for workers in test_workers:
        print(f"\n🔸 워커 {workers}개 테스트...")
        start_time = time.time()
        
        success_count = 0
        api_errors = 0
        
        with ThreadPoolExecutor(max_workers=workers) as executor:
            futures = [
                executor.submit(quick_tester.test_single_symbol, symbol) 
                for symbol in test_symbols
            ]
            
            for future in as_completed(futures):
                result = future.result()
                if result['success']:
                    success_count += 1
                elif 'rate limit' in result.get('error', '').lower():
                    api_errors += 1
        
        total_time = time.time() - start_time
        success_rate = success_count / len(test_symbols) * 100
        
        results[workers] = {
            'success_rate': success_rate,
            'api_errors': api_errors,
            'total_time': total_time,
            'throughput': success_count / total_time
        }
        
        print(f"   ✅ 성공률: {success_rate:.1f}% | API오류: {api_errors}개 | 시간: {total_time:.1f}초")
        
        # 간격 두기
        time.sleep(3)
    
    # 결과 요약
    print(f"\n📋 빠른 테스트 결과:")
    best_workers = max(results.keys(), key=lambda w: results[w]['success_rate'] - results[w]['api_errors'] * 10)
    
    for workers, result in results.items():
        marker = "🎯" if workers == best_workers else "  "
        print(f"{marker} {workers}개 워커: {result['success_rate']:.1f}% 성공률, "
              f"{result['throughput']:.2f} req/sec")
    
    print(f"\n💡 추천: {best_workers}개 워커 사용")
    return best_workers

# 빠른 테스트 실행
recommended_workers = quick_worker_test()