# 🚀 트레이딩 수익률 기반 가중치 모델

## 📊 실제 트레이딩 로직:
## 1. 롱 포지션: 상승 시 수익, 하락 시 손실
## 2. 숏 포지션: 하락 시 수익, 상승 시 손실
## 3. 수익률에 따른 가중치 적용
## 4. 실제 트레이딩 성과 반영

In [None]:
# 📦 필요한 라이브러리 설치
%pip install pandas==2.0.3 numpy==1.24.3 scikit-learn==1.3.0 xgboost==1.7.6 lightgbm==4.0.0 tensorflow==2.13.0 pandas-ta==0.3.14b0 supabase

In [None]:
!pip install --upgrade pip!pip install xgboost lightgbm pandas-ta supabase plotly numba# 🔧 NumPy 2.0 호환성 패치 (즉시 해결!)import numpy as npif not hasattr(np, 'NaN'):    np.NaN = np.nan    print("✅ NumPy 패치 적용 완료!")# 이제 pandas_ta 정상 동작import pandas_ta as taprint("✅ pandas_ta 임포트 성공!")print(f"버전: {ta.version}")# 📚 라이브러리 임포트import pandas as pdfrom sklearn.ensemble import RandomForestClassifierfrom sklearn.model_selection import train_test_split, cross_val_scorefrom sklearn.metrics import accuracy_score, classification_report, confusion_matriximport lightgbm as lgbfrom supabase import create_clientimport warningswarnings.filterwarnings('ignore')print('✅ 라이브러리 로드 완료')

In [None]:
# 🔗 Supabase 연결
SUPABASE_URL = "https://your-project.supabase.co"
SUPABASE_KEY = "your-anon-key"

supabase = create_client(SUPABASE_URL, SUPABASE_KEY)
print('✅ Supabase 연결 완료')

In [None]:
# 📊 데이터 수집 (배치 처리)
def collect_data_in_batches(symbol='ADAUSDT', timeframe='1m', target_records=500000):
    """배치로 데이터 수집"""
    all_data = []
    offset = 0
    batch_size = 1000
    
    print(f'🔄 {symbol} {timeframe} 데이터 수집 시작...')
    
    while len(all_data) < target_records:
        try:
            response = supabase.table('crypto_ohlcv').select('*').eq('symbol', symbol).eq('timeframe', timeframe).order('timestamp', desc=True).range(offset, offset + batch_size - 1).execute()
            
            if not response.data:
                break
                
            all_data.extend(response.data)
            offset += batch_size
            
            if len(all_data) % 10000 == 0:
                print(f'📊 수집된 데이터: {len(all_data):,}개')
                
        except Exception as e:
            print(f'❌ 배치 수집 오류: {e}')
            break
    
    print(f'✅ 데이터 수집 완료: {len(all_data):,}개')
    return all_data

# 데이터 수집 실행
raw_data = collect_data_in_batches()

# DataFrame 생성
df = pd.DataFrame(raw_data)
df['datetime'] = pd.to_datetime(df['timestamp'], unit='ms')
df = df.sort_values('datetime')

print(f'📅 기간: {df["datetime"].min()} ~ {df["datetime"].max()}')
print(f'📊 총 데이터: {len(df):,}개')

In [None]:
# 📈 기술적 지표 계산
def calculate_technical_indicators(df):
    """기술적 지표 계산"""
    # 이동평균
    df['sma_5'] = ta.sma(df['close'], length=5)
    df['sma_10'] = ta.sma(df['close'], length=10)
    df['sma_20'] = ta.sma(df['close'], length=20)
    df['ema_9'] = ta.ema(df['close'], length=9)
    df['ema_21'] = ta.ema(df['close'], length=21)
    
    # MACD
    macd = ta.macd(df['close'])
    df['macd'] = macd['MACD_12_26_9']
    df['macd_signal'] = macd['MACDs_12_26_9']
    df['macd_histogram'] = macd['MACDh_12_26_9']
    
    # RSI
    df['rsi'] = ta.rsi(df['close'], length=14)
    
    # 볼린저 밴드
    bb = ta.bbands(df['close'])
    df['bb_upper'] = bb['BBU_20_2.0']
    df['bb_lower'] = bb['BBL_20_2.0']
    df['bb_width'] = (df['bb_upper'] - df['bb_lower']) / df['close']
    df['bb_position'] = (df['close'] - df['bb_lower']) / (df['bb_upper'] - df['bb_lower'])
    
    # 가격 변화율
    df['price_change'] = df['close'].pct_change()
    df['price_change_5'] = df['close'].pct_change(5)
    df['price_change_10'] = df['close'].pct_change(10)
    
    # 거래량 지표
    df['volume_sma'] = ta.sma(df['volume'], length=20)
    df['volume_ratio'] = df['volume'] / df['volume_sma']
    
    # 시간 특성
    df['hour'] = df['datetime'].dt.hour
    df['day_of_week'] = df['datetime'].dt.dayofweek
    df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int)
    
    return df

# 지표 계산
df = calculate_technical_indicators(df)
print('✅ 기술적 지표 계산 완료')

In [None]:
# 🎯 트레이딩 수익률 기반 목표 변수 생성
def create_trading_target(df, lookforward=5, threshold=0.002):
    """트레이딩 수익률 기반 목표 변수 생성"""
    # 미래 가격 변화율 계산
    df['future_return'] = df['close'].shift(-lookforward) / df['close'] - 1
    
    # 트레이딩 시그널 생성
    # 1: 롱 포지션 (상승 예상)
    # 0: 홀드 (변동 없음)
    # -1: 숏 포지션 (하락 예상)
    df['signal'] = np.where(df['future_return'] > threshold, 1,
                           np.where(df['future_return'] < -threshold, -1, 0))
    
    # 실제 트레이딩 수익률 계산
    # 롱 포지션: 상승 시 수익, 하락 시 손실
    # 숏 포지션: 하락 시 수익, 상승 시 손실
    df['trading_return'] = np.where(df['signal'] == 1, df['future_return'],  # 롱: 상승 시 수익
                                    np.where(df['signal'] == -1, -df['future_return'], 0))  # 숏: 하락 시 수익
    
    # 수익률 기반 가중치 계산
    # 수익이 클수록 높은 가중치, 손실이 클수록 낮은 가중치
    df['sample_weight'] = np.abs(df['trading_return']) * 100 + 1  # 최소 가중치 1 보장
    
    return df

# 목표 변수 생성
df = create_trading_target(df)
print('✅ 트레이딩 수익률 기반 목표 변수 생성 완료')
print(f'📊 시그널 분포:\n{df["signal"].value_counts()}')
print(f'📈 평균 트레이딩 수익률: {df["trading_return"].mean()*100:.4f}%')
print(f'📊 수익률 표준편차: {df["trading_return"].std()*100:.4f}%')
print(f'⚖️ 평균 샘플 가중치: {df["sample_weight"].mean():.2f}')

In [None]:
# 🔧 데이터 정렬 및 전처리
feature_columns = [
    'sma_5', 'sma_10', 'sma_20', 'ema_9', 'ema_21',
    'macd', 'macd_signal', 'macd_histogram',
    'rsi',
    'bb_upper', 'bb_lower', 'bb_width', 'bb_position',
    'price_change', 'price_change_5', 'price_change_10',
    'volume_sma', 'volume_ratio',
    'hour', 'day_of_week', 'is_weekend'
]

# 모든 특성과 타겟이 있는 행만 선택
all_columns = feature_columns + ['signal', 'sample_weight']
df_clean = df[all_columns].dropna()

# 특성과 타겟 분리
X = df_clean[feature_columns]
y = df_clean['signal']
sample_weights = df_clean['sample_weight']

print(f'🔧 데이터 정렬 완료')
print(f'📊 특성 데이터: {X.shape}')
print(f'📊 타겟 데이터: {y.shape}')
print(f'⚖️ 샘플 가중치: {sample_weights.shape}')
print(f'✅ 샘플 수 일치: {X.shape[0] == y.shape[0] == sample_weights.shape[0]}')

In [None]:
# 🚀 수익률 기반 가중치 LightGBM 모델
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.metrics import accuracy_score, classification_report
import pickle
import json

# 데이터 분할 (가중치도 함께 분할)
X_train, X_test, y_train, y_test, w_train, w_test = train_test_split(
    X, y, sample_weights, test_size=0.2, random_state=42, stratify=y
)

# LightGBM 모델 설정 (가중치 적용)
model = lgb.LGBMClassifier(
    n_estimators=1000,
    learning_rate=0.05,
    max_depth=8,
    num_leaves=31,
    random_state=42,
    n_jobs=-1,
    verbose=-1
)

print('🚀 수익률 기반 가중치 LightGBM 모델 훈련 시작...')
print(f'📊 훈련 데이터: {X_train.shape}')
print(f'📊 테스트 데이터: {X_test.shape}')
print(f'⚖️ 훈련 가중치 범위: {w_train.min():.2f} ~ {w_train.max():.2f}')
print(f'⚖️ 테스트 가중치 범위: {w_test.min():.2f} ~ {w_test.max():.2f}')

# 모델 훈련 (가중치 적용)
model.fit(X_train, y_train, sample_weight=w_train)

# 예측
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)

print(f'✅ 모델 훈련 완료')
print(f'📊 정확도: {accuracy:.4f}')

In [None]:
# 📊 모델 성능 평가
from sklearn.model_selection import cross_val_score

# 교차 검증 (가중치 적용)
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
cv_scores = cross_val_score(model, X, y, cv=cv, scoring='accuracy', sample_weight=sample_weights)

print('📊 교차 검증 결과:')
print(f'평균 정확도: {cv_scores.mean():.4f} (±{cv_scores.std():.4f})')
print(f'개별 정확도: {cv_scores}')

# 분류 리포트
print('\n📋 분류 리포트:')
print(classification_report(y_test, y_pred))

# 혼동 행렬
print('\n📊 혼동 행렬:')
cm = confusion_matrix(y_test, y_pred)
print(cm)

# 특성 중요도
feature_importance = pd.DataFrame({
    'feature': feature_columns,
    'importance': model.feature_importances_
}).sort_values('importance', ascending=False)

print('\n🏆 상위 10개 중요 특성:')
print(feature_importance.head(10))

In [None]:
# 💰 실제 트레이딩 성과 분석
def analyze_trading_performance(df_test, y_pred):
    """실제 트레이딩 성과 분석"""
    # 테스트 데이터에 예측 결과 추가
    df_test_copy = df_test.copy()
    df_test_copy['predicted_signal'] = y_pred
    
    # 실제 트레이딩 수익률 계산
    df_test_copy['actual_trading_return'] = np.where(
        df_test_copy['predicted_signal'] == 1, df_test_copy['future_return'],  # 롱
        np.where(df_test_copy['predicted_signal'] == -1, -df_test_copy['future_return'], 0)  # 숏
    )
    
    # 성과 분석
    total_trades = len(df_test_copy[df_test_copy['predicted_signal'] != 0])
    profitable_trades = len(df_test_copy[df_test_copy['actual_trading_return'] > 0])
    win_rate = profitable_trades / total_trades if total_trades > 0 else 0
    
    avg_return = df_test_copy['actual_trading_return'].mean()
    total_return = df_test_copy['actual_trading_return'].sum()
    
    print('💰 실제 트레이딩 성과 분석:')
    print(f'📊 총 거래 수: {total_trades:,}회')
    print(f'📈 수익 거래: {profitable_trades:,}회')
    print(f'🎯 승률: {win_rate*100:.2f}%')
    print(f'📊 평균 수익률: {avg_return*100:.4f}%')
    print(f'💰 총 수익률: {total_return*100:.4f}%')
    
    # 시그널별 성과
    print('\n📊 시그널별 성과:')
    for signal in [-1, 0, 1]:
        signal_data = df_test_copy[df_test_copy['predicted_signal'] == signal]
        if len(signal_data) > 0:
            signal_return = signal_data['actual_trading_return'].mean()
            signal_count = len(signal_data)
            print(f'  {signal:+d} ({"숏" if signal==-1 else "홀드" if signal==0 else "롱"}): {signal_return*100:.4f}% ({signal_count:,}회)')

# 성과 분석 실행
analyze_trading_performance(df_clean.iloc[X_test.index], y_pred)