In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import accuracy_score
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import StandardScaler
import pandas as pd


In [2]:
# 1. 데이터 로드
sb = pd.read_csv('csv/scoreboard_2023_2024_2025.csv')
bt = pd.read_csv('csv/batter_2023_2024_2025.csv')
pt = pd.read_csv('csv/pitcher_2023_2024_2025.csv')

print("="*70)
print("[STEP 1] 데이터 로드")
print("="*70)
print(f"Input: CSV 파일 3개 (scoreboard, batter, pitcher)")
print(f"Expected Output: 3개의 DataFrame 생성")
print(f"Real Output:")
print(f"  - scoreboard shape: {sb.shape}, columns: {list(sb.columns[:5])}")
print(f"  - batter shape: {bt.shape}, columns: {list(bt.columns[:5])}")
print(f"  - pitcher shape: {pt.shape}, columns: {list(pt.columns[:5])}")
print()

# 날짜순 정렬 및 승리 여부 이진화
sb = sb.sort_values(['year', 'month', 'day', 'starttime'])
sb['win_binary'] = (sb['result'] == 1).astype(int)

print("="*70)
print("[STEP 2] 날짜순 정렬 및 승리 여부 이진화")
print("="*70)
print(f"Input: scoreboard DataFrame (정렬 전)")
print(f"Expected Output: 날짜순 정렬 + win_binary 컬럼 추가")
print(f"Real Output:")
print(f"  - scoreboard shape: {sb.shape}")
print(f"  - win_binary 분포: {sb['win_binary'].value_counts().to_dict()}")
print(f"  - 샘플 데이터:\n{sb[['year', 'month', 'day', 'result', 'win_binary']].head()}")
print()

# 2. 타자 데이터를 경기(idx)별 팀 합계로 집계
team_batting = bt.groupby('idx').agg({
    'hit': 'sum',
    'bat_num': 'sum'
}).reset_index()

print("="*70)
print("[STEP 3] 타자 데이터를 경기별 팀 합계로 집계")
print("="*70)
print(f"Input: batter DataFrame shape {bt.shape}")
print(f"Expected Output: idx별로 hit와 bat_num 합계")
print(f"Real Output:")
print(f"  - team_batting shape: {team_batting.shape}")
print(f"  - hit 통계: min={team_batting['hit'].min()}, max={team_batting['hit'].max()}, mean={team_batting['hit'].mean():.2f}")
print(f"  - 샘플 데이터:\n{team_batting.head()}")
print()

# 기본 경기 정보와 결합
base_df = pd.merge(sb[['idx', 'team', 'year', 'month', 'day', 'home', 'away', 'r', 'win_binary', 'dbheader']], 
                   team_batting, on='idx', how='left')

print("="*70)
print("[STEP 4] 기본 경기 정보와 팀 타격 데이터 결합")
print("="*70)
print(f"Input: scoreboard columns + team_batting")
print(f"Expected Output: idx 기준 merge된 DataFrame")
print(f"Real Output:")
print(f"  - base_df shape: {base_df.shape}")
print(f"  - 컬럼: {list(base_df.columns)}")
print(f"  - 샘플 데이터:\n{base_df[['idx', 'team', 'hit', 'bat_num']].head()}")
print()

# 3. 상대 팀 득점(실점) 정보 매칭 (더블헤더 dbheader 포함)
# game_id에 dbheader를 추가하여 더블헤더 경기를 구분합니다.
base_df['game_id'] = (base_df['year'].astype(str) + 
                     base_df['month'].astype(str).str.zfill(2) + 
                     base_df['day'].astype(str).str.zfill(2) + "_" + 
                     base_df['home'] + "_" + base_df['away'] + "_" + 
                     base_df['dbheader'].astype(str))

print("="*70)
print("[STEP 5] game_id 생성 (더블헤더 포함)")
print("="*70)
print(f"Input: year, month, day, home, away, dbheader")
print(f"Expected Output: YYYYMMDD_HOME_AWAY_DBHEADER 형식의 game_id")
print(f"Real Output:")
print(f"  - game_id 샘플: {base_df['game_id'].iloc[:3].tolist()}")
print(f"  - 고유한 경기 수: {base_df['game_id'].nunique()}")
print()

opp_scores = base_df[['game_id', 'team', 'r']].rename(columns={'team': 'opp_team', 'r': 'runs_allowed'})
base_df = pd.merge(base_df, opp_scores, on='game_id')
base_df = base_df[base_df['team'] != base_df['opp_team']].copy()

print("="*70)
print("[STEP 6] 상대팀 실점 정보 매칭")
print("="*70)
print(f"Input: base_df의 game_id, team, r 정보")
print(f"Expected Output: 각 팀마다 홈/원정 두 행으로 분리, runs_allowed 추가")
print(f"Real Output:")
print(f"  - base_df shape (필터 후): {base_df.shape}")
print(f"  - runs_allowed 통계: min={base_df['runs_allowed'].min()}, max={base_df['runs_allowed'].max()}, mean={base_df['runs_allowed'].mean():.2f}")
print(f"  - 샘플 데이터:\n{base_df[['game_id', 'team', 'opp_team', 'r', 'runs_allowed']].head()}")
print()

# 4. [특성 1, 2, 3] 최근 30경기 이동 평균 (transform 사용으로 컬럼 유지)
# ※ 주의: 7월 데이터만 사용할 경우 30경기 미만이면 NaN이 발생하므로 min_periods를 설정합니다.
base_df = base_df.sort_values(['team', 'year', 'month', 'day', 'game_id'])

# 평균 득점 및 실점
base_df['f1_avg_runs_scored_30'] = base_df.groupby('team')['r'].transform(lambda x: x.shift(1).rolling(window=30, min_periods=1).mean())
base_df['f2_avg_runs_allowed_30'] = base_df.groupby('team')['runs_allowed'].transform(lambda x: x.shift(1).rolling(window=30, min_periods=1).mean())

print("="*70)
print("[STEP 7-1] 최근 30경기 이동 평균 (득점/실점)")
print("="*70)
print(f"Input: team별로 정렬된 r, runs_allowed")
print(f"Expected Output: 최근 30경기의 평균 득점/실점 (NaN 허용)")
print(f"Real Output:")
print(f"  - f1_avg_runs_scored_30: NaN={base_df['f1_avg_runs_scored_30'].isna().sum()}, min={base_df['f1_avg_runs_scored_30'].min():.2f}, max={base_df['f1_avg_runs_scored_30'].max():.2f}")
print(f"  - f2_avg_runs_allowed_30: NaN={base_df['f2_avg_runs_allowed_30'].isna().sum()}, min={base_df['f2_avg_runs_allowed_30'].min():.2f}, max={base_df['f2_avg_runs_allowed_30'].max():.2f}")
print(f"  - 샘플:\n{base_df[['team', 'f1_avg_runs_scored_30', 'f2_avg_runs_allowed_30']].head(10)}")
print()

# 팀 타율 (누적 합계를 구한 뒤 나누는 방식이 더 정확합니다)
rolling_hits = base_df.groupby('team')['hit'].transform(lambda x: x.shift(1).rolling(window=30, min_periods=1).sum())
rolling_ab = base_df.groupby('team')['bat_num'].transform(lambda x: x.shift(1).rolling(window=30, min_periods=1).sum())
base_df['f3_team_batting_avg_30'] = rolling_hits / rolling_ab

print("="*70)
print("[STEP 7-2] 최근 30경기 팀 타율")
print("="*70)
print(f"Input: team별 hit, bat_num의 30경기 누적합")
print(f"Expected Output: rolling_hits / rolling_ab = 팀 타율")
print(f"Real Output:")
print(f"  - f3_team_batting_avg_30: NaN={base_df['f3_team_batting_avg_30'].isna().sum()}, min={base_df['f3_team_batting_avg_30'].min():.4f}, max={base_df['f3_team_batting_avg_30'].max():.4f}")
print(f"  - 샘플:\n{base_df[['team', 'hit', 'bat_num', 'f3_team_batting_avg_30']].head(10)}")
print()

# 5. [특성 4] 선발 투수 시즌 평균 실점
starters = pt[pt['mound'] == 1][['idx', 'name', 'losescore']].copy()
starters = pd.merge(starters, sb[['idx', 'year', 'month', 'day']], on='idx')
starters = starters.sort_values(['name', 'year', 'month', 'day'])
starters['f4_pitcher_runs_avg'] = starters.groupby('name')['losescore'].transform(lambda x: x.shift(1).expanding().mean())

print("="*70)
print("[STEP 8] 선발 투수 시즌 평균 실점")
print("="*70)
print(f"Input: pitcher 데이터 (mound==1만 필터링)")
print(f"Expected Output: 각 투수별 누적 평균 실점 (expanding mean)")
print(f"Real Output:")
print(f"  - starters shape: {starters.shape}")
print(f"  - f4_pitcher_runs_avg: NaN={starters['f4_pitcher_runs_avg'].isna().sum()}, min={starters['f4_pitcher_runs_avg'].min():.2f}, max={starters['f4_pitcher_runs_avg'].max():.2f}")
print(f"  - 샘플:\n{starters[['name', 'losescore', 'f4_pitcher_runs_avg']].head(10)}")
print()

base_df = pd.merge(base_df, starters[['idx', 'f4_pitcher_runs_avg']], on='idx', how='left')

print("="*70)
print("[STEP 8-2] 투수 정보를 base_df에 추가")
print("="*70)
print(f"Input: base_df + starters의 f4_pitcher_runs_avg")
print(f"Expected Output: idx 기준 merge")
print(f"Real Output:")
print(f"  - base_df shape: {base_df.shape}")
print(f"  - f4_pitcher_runs_avg: NaN={base_df['f4_pitcher_runs_avg'].isna().sum()}")
print()

# 6. [특성 5] 팀 전체 승률
base_df['f5_total_win_pct'] = base_df.groupby('team')['win_binary'].transform(lambda x: x.shift(1).expanding().mean())

print("="*70)
print("[STEP 9] 팀 전체 승률")
print("="*70)
print(f"Input: team별 win_binary")
print(f"Expected Output: 누적 승률 (expanding mean)")
print(f"Real Output:")
print(f"  - f5_total_win_pct: NaN={base_df['f5_total_win_pct'].isna().sum()}, min={base_df['f5_total_win_pct'].min():.4f}, max={base_df['f5_total_win_pct'].max():.4f}")
print(f"  - 샘플:\n{base_df[['team', 'win_binary', 'f5_total_win_pct']].head(10)}")
print()

# 7. [특성 6] 홈/원정 승률
def calc_ha_win_pct(df):
    df = df.copy()
    df['is_home'] = (df['team'] == df['home'])
    # 홈일 때와 원정일 때를 각각 그룹화하여 승률 계산
    df['f6_ha_win_pct'] = df.groupby(['team', 'is_home'])['win_binary'].transform(lambda x: x.shift(1).expanding().mean())
    return df

base_df = calc_ha_win_pct(base_df)

print("="*70)
print("[STEP 10] 홈/원정 승률")
print("="*70)
print(f"Input: team과 is_home(홈/원정 구분)")
print(f"Expected Output: team-is_home별 누적 승률")
print(f"Real Output:")
print(f"  - f6_ha_win_pct: NaN={base_df['f6_ha_win_pct'].isna().sum()}, min={base_df['f6_ha_win_pct'].min():.4f}, max={base_df['f6_ha_win_pct'].max():.4f}")
print(f"  - is_home 분포: {base_df['is_home'].value_counts().to_dict()}")
print(f"  - 샘플:\n{base_df[['team', 'is_home', 'win_binary', 'f6_ha_win_pct']].head(10)}")
print()

# 8. 최종 MLP 데이터셋 구성
feature_cols = ['f1_avg_runs_scored_30', 'f2_avg_runs_allowed_30', 'f3_team_batting_avg_30', 
                'f4_pitcher_runs_avg', 'f5_total_win_pct', 'f6_ha_win_pct']

# 홈팀 데이터와 원정팀 데이터 분리 후 결합
home_df = base_df[base_df['team'] == base_df['home']][['game_id', 'win_binary'] + feature_cols]
home_df.columns = ['game_id', 'home_win'] + ['h_' + c for c in feature_cols]

away_df = base_df[base_df['team'] == base_df['away']][['game_id'] + feature_cols]
away_df.columns = ['game_id'] + ['a_' + c for c in feature_cols]

print("="*70)
print("[STEP 11-1] 홈팀/원정팀 데이터 분리")
print("="*70)
print(f"Input: base_df의 모든 경기 기록")
print(f"Expected Output: 홈팀 행과 원정팀 행으로 분리")
print(f"Real Output:")
print(f"  - home_df shape: {home_df.shape}")
print(f"  - away_df shape: {away_df.shape}")
print(f"  - home_df 샘플:\n{home_df.head()}")
print(f"  - away_df 샘플:\n{away_df.head()}")
print()

final_dataset = pd.merge(home_df, away_df, on='game_id').dropna()

print("="*70)
print("[STEP 11-2] 홈/원정 데이터 결합 및 NaN 제거")
print("="*70)
print(f"Input: home_df, away_df (game_id 기준)")
print(f"Expected Output: game_id별로 홈/원정 특성을 모두 가진 행")
print(f"Real Output:")
print(f"  - 결합 전 행: {len(home_df)}")
print(f"  - 결합 후 행 (dropna 전): {len(pd.merge(home_df, away_df, on='game_id'))}")
print(f"  - final_dataset shape (NaN 제거 후): {final_dataset.shape}")
print(f"  - 컬럼: {list(final_dataset.columns)}")
print()

# Target: 홈팀 승리 시 0, 원정팀 승리 시 1
final_dataset['target'] = (final_dataset['home_win'] == 0).astype(int)
final_dataset = final_dataset.drop(columns=['home_win'])

print("="*70)
print("[STEP 12] 타겟 변수 생성")
print("="*70)
print(f"Input: home_win (홈팀 승리=1, 원정팀 승리=0)")
print(f"Expected Output: target (홈팀 승리=0, 원정팀 승리=1)")
print(f"Real Output:")
print(f"  - target 분포: {final_dataset['target'].value_counts().to_dict()}")
print(f"  - target 비율: 홈팀승(0)={final_dataset[final_dataset['target']==0].shape[0]/len(final_dataset):.2%}, 원정팀승(1)={final_dataset[final_dataset['target']==1].shape[0]/len(final_dataset):.2%}")
print(f"  - 샘플:\n{final_dataset[['game_id', 'target']].head()}")
print()

# 저장
final_dataset.to_csv('kbo_mlp_training_data.csv', index=False)

print("="*70)
print("[STEP 13] 최종 데이터셋 저장")
print("="*70)
print(f"Input: final_dataset")
print(f"Expected Output: kbo_mlp_training_data.csv 파일 생성")
print(f"Real Output:")
print(f"  - 파일명: kbo_mlp_training_data.csv")
print(f"  - 최종 shape: {final_dataset.shape}")
print(f"  - 컬럼 수: {len(final_dataset.columns)}")
print(f"  - 컬럼 목록: {list(final_dataset.columns)}")
print(f"\n학습용 데이터셋 생성 완료: {len(final_dataset)} 경기 데이터 포함")
print("="*70)

[STEP 1] 데이터 로드
Input: CSV 파일 3개 (scoreboard, batter, pitcher)
Expected Output: 3개의 DataFrame 생성
Real Output:
  - scoreboard shape: (4688, 37), columns: ['idx', 'team', 'result', 'i_1', 'i_2']
  - batter shape: (60780, 26), columns: ['idx', 'name', 'team', 'position', 'i_1']
  - pitcher shape: (23021, 15), columns: ['idx', 'name', 'team', 'mound', 'inning']

[STEP 2] 날짜순 정렬 및 승리 여부 이진화
Input: scoreboard DataFrame (정렬 전)
Expected Output: 날짜순 정렬 + win_binary 컬럼 추가
Real Output:
  - scoreboard shape: (4688, 38)
  - win_binary 분포: {0: 2401, 1: 2287}
  - 샘플 데이터:
   year  month  day  result  win_binary
4  2023      3   13      -1           0
5  2023      3   13       1           1
0  2023      3   13      -1           0
1  2023      3   13       1           1
8  2023      3   13       1           1

[STEP 3] 타자 데이터를 경기별 팀 합계로 집계
Input: batter DataFrame shape (60780, 26)
Expected Output: idx별로 hit와 bat_num 합계
Real Output:
  - team_batting shape: (4688, 3)
  - hit 통계: min=0, max=28, mean=9.12
 

  bt = pd.read_csv('csv/batter_2023_2024_2025.csv')


### skit-learn

In [3]:
# 1. 전처리된 데이터 로드
df = pd.read_csv('kbo_mlp_training_data.csv')

In [4]:

# 2. 특징(X)과 정답(y) 분리
# game_id는 식별용이므로 제외, target(0:홈승, 1:원정승)을 예측
X = df.drop(columns=['game_id', 'target'])
y = df['target']

In [5]:

# 3. 데이터 분할 (훈련 75%, 테스트 25%) [cite: 20]
# shuffle=False는 시간 순서대로 테스트하기 위함입니다 (자료 기준) [cite: 76]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, shuffle=False)

# 4. 데이터 스케일링 (StandardScaler 사용) [cite: 75, 77]
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

In [6]:

# 5. 세 가지 최적화 알고리즘(Solver) 비교 [cite: 64, 71]
solvers = ['sgd', 'adam', 'lbfgs']
results = {}

print("--- 모델 학습 결과 ---")
for s in solvers:
    # 자료의 설정값 반영: 은닉층 (3,), 활성화 함수 relu, 최대 반복 1000 [cite: 47, 83]
    clf = MLPClassifier(
    hidden_layer_sizes=(3,),  # PDF slide 7: "a hidden layer containing 3 nodes"
    activation='relu',         # 기본 활성화 함수
    solver=s,           # PDF slide 9: "L-BFGS outperformed with 62.1%"
    max_iter=5000,            # 수렴을 위해 충분히 큰 값 설정
    random_state=42
    )
    
    # 모델 학습
    clf.fit(X_train_scaled, y_train)
    
    # 예측 및 정확도 계산 [cite: 85, 88]
    y_pred = clf.predict(X_test_scaled)
    acc = accuracy_score(y_test, y_pred)
    results[s] = acc
    print(f"Algorithm: {s.upper()} | Accuracy: {acc:.1%}")

--- 모델 학습 결과 ---
Algorithm: SGD | Accuracy: 53.9%
Algorithm: ADAM | Accuracy: 53.6%
Algorithm: LBFGS | Accuracy: 53.6%


In [7]:

# 가장 높은 정확도를 보인 알고리즘 확인
best_solver = max(results, key=results.get)
print(f"\n추천 알고리즘: {best_solver.upper()} (정확도 {results[best_solver]:.1%})")


추천 알고리즘: SGD (정확도 53.9%)


In [8]:
from sklearn.model_selection import GridSearchCV
from sklearn.neural_network import MLPClassifier
from sklearn.preprocessing import StandardScaler

# 1. 데이터 스케일링 (신경망에서 가장 중요!)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# 2. 테스트할 파라미터 조합 설정
param_grid = {
    'hidden_layer_sizes': [(3,), (6,), (12,), (6, 3), (12, 6)], # 노드 수를 늘려보거나 층을 쌓아봄
    'solver': ['sgd', 'adam', 'lbfgs'],
    'activation': ['relu', 'tanh'],
    'max_iter': [5000] # 충분히 학습하도록 반복 횟수 증가
}

# 3. 그리드 서치 실행 (모든 조합을 다 해보고 최고를 찾음)
mlp = MLPClassifier(random_state=1)
grid_search = GridSearchCV(mlp, param_grid, cv=3, scoring='accuracy', n_jobs=-1)
grid_search.fit(X_train_scaled, y_train)

# 4. 결과 출력
print(f"최고 정확도: {grid_search.best_score_:.1%}")
print(f"최적의 파라미터: {grid_search.best_params_}")

# 5. 최적의 모델로 테스트 데이터 평가
best_model = grid_search.best_estimator_
test_acc = best_model.score(X_test_scaled, y_test)
print(f"최종 테스트 결과: {test_acc:.1%}")

최고 정확도: 53.5%
최적의 파라미터: {'activation': 'tanh', 'hidden_layer_sizes': (12,), 'max_iter': 5000, 'solver': 'sgd'}
최종 테스트 결과: 55.1%


STOP: TOTAL NO. OF ITERATIONS REACHED LIMIT

Increase the number of iterations to improve the convergence (max_iter=5000).
You might also want to scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
  self.n_iter_ = _check_optimize_result("lbfgs", opt_res, self.max_iter)


In [9]:
# 1. 데이터 로드 및 전처리
df = pd.read_csv('kbo_mlp_training_data.csv')
X = df.drop(columns=['game_id', 'target']).values
y = df['target'].values.reshape(-1, 1)

# 데이터 분할 (셔플 없이 시간 순서대로)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)

# 스케일링
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# 텐서 변환
X_train_t = torch.FloatTensor(X_train_scaled)
y_train_t = torch.FloatTensor(y_train)
X_test_t = torch.FloatTensor(X_test_scaled)
y_test_t = torch.FloatTensor(y_test)

train_loader = DataLoader(TensorDataset(X_train_t, y_train_t), batch_size=32, shuffle=True)


# 2. MLP 모델 클래스 정의
class KBOPredictor(nn.Module):
    def __init__(self):
        super(KBOPredictor, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(12, 12),  # 입력층 12 -> 은닉층 12
            nn.Tanh(),
            nn.Linear(12, 6),   # 은닉층 12 -> 은닉층 6
            nn.Tanh(),
            nn.Linear(6, 1),    # 은닉층 6 -> 출력층 1
            nn.Sigmoid()        # 0~1 사이의 확률로 출력
        )

    def forward(self, x):
        return self.model(x)

# 모델, 손실함수, 최적화기 설정
model = KBOPredictor()
criterion = nn.BCELoss()
# weight_decay는 가중치가 너무 커지지 않게 규제함 (L2 규제)
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-2)

# 3. 학습 루프
epochs = 200
for epoch in range(epochs):
    model.train()
    for batch_X, batch_y in train_loader:
        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
    
    # 20회마다 평가 결과 출력
    if (epoch + 1) % 20 == 0:
        model.eval()
        with torch.no_grad():
            train_outputs = model(X_train_t)
            train_acc = ((train_outputs > 0.5).float() == y_train_t).float().mean()
            
            test_outputs = model(X_test_t)
            test_acc = ((test_outputs > 0.5).float() == y_test_t).float().mean()
            
            print(f"Epoch [{epoch+1}/{epochs}] Loss: {loss.item():.4f} | Train Acc: {train_acc:.1%} | Test Acc: {test_acc:.1%}")

# 4. 최종 결과 확인
model.eval()
with torch.no_grad():
    final_pred = (model(X_test_t) > 0.5).float()
    print(f"\n[최종 테스트 정확도]: {accuracy_score(y_test, final_pred):.2%}")

Epoch [20/200] Loss: 0.7167 | Train Acc: 56.3% | Test Acc: 52.5%
Epoch [40/200] Loss: 0.7412 | Train Acc: 56.5% | Test Acc: 53.2%
Epoch [60/200] Loss: 0.6578 | Train Acc: 56.9% | Test Acc: 55.0%
Epoch [80/200] Loss: 0.6870 | Train Acc: 56.7% | Test Acc: 51.3%
Epoch [100/200] Loss: 0.6392 | Train Acc: 57.3% | Test Acc: 54.1%
Epoch [120/200] Loss: 0.7079 | Train Acc: 57.4% | Test Acc: 53.4%
Epoch [140/200] Loss: 0.7461 | Train Acc: 56.7% | Test Acc: 53.9%
Epoch [160/200] Loss: 0.7335 | Train Acc: 56.7% | Test Acc: 53.2%
Epoch [180/200] Loss: 0.6488 | Train Acc: 57.0% | Test Acc: 54.6%
Epoch [200/200] Loss: 0.6741 | Train Acc: 57.0% | Test Acc: 54.8%

[최종 테스트 정확도]: 54.80%


In [10]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np

# 1. 데이터 로드 및 전처리
df = pd.read_csv('kbo_mlp_training_data.csv')
X = df.drop(columns=['game_id', 'target']).values
y = df['target'].values.reshape(-1, 1)

# 데이터 분할 (셔플 없이 시간 순서대로)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)

# 스케일링
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# 텐서 변환
X_train_t = torch.FloatTensor(X_train_scaled)
y_train_t = torch.FloatTensor(y_train)
X_test_t = torch.FloatTensor(X_test_scaled)
y_test_t = torch.FloatTensor(y_test)

train_loader = DataLoader(TensorDataset(X_train_t, y_train_t), batch_size=16, shuffle=True)

# 2. 모델 설계 (Dropout 추가로 과적합 방지)
class AdvancedKBOPredictor(nn.Module):
    def __init__(self):
        super(AdvancedKBOPredictor, self).__init__()
        self.layers = nn.Sequential(
            nn.Linear(12, 8),
            nn.Tanh(),
            nn.Dropout(0.2),
            nn.Linear(8, 4),
            nn.Tanh(),
            nn.Linear(4, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        return self.layers(x)

# 모델, 손실함수, 최적화기 설정
model = AdvancedKBOPredictor()
criterion = nn.BCELoss()
# weight_decay는 가중치가 너무 커지지 않게 규제함 (L2 규제)
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-2)

# 3. 학습 루프
epochs = 200
for epoch in range(epochs):
    model.train()
    for batch_X, batch_y in train_loader:
        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
    
    # 20회마다 평가 결과 출력
    if (epoch + 1) % 20 == 0:
        model.eval()
        with torch.no_grad():
            train_outputs = model(X_train_t)
            train_acc = ((train_outputs > 0.5).float() == y_train_t).float().mean()
            
            test_outputs = model(X_test_t)
            test_acc = ((test_outputs > 0.5).float() == y_test_t).float().mean()
            
            print(f"Epoch [{epoch+1}/{epochs}] Loss: {loss.item():.4f} | Train Acc: {train_acc:.1%} | Test Acc: {test_acc:.1%}")

# 4. 최종 결과 확인
model.eval()
with torch.no_grad():
    final_pred = (model(X_test_t) > 0.5).float()
    print(f"\n[최종 테스트 정확도]: {accuracy_score(y_test, final_pred):.2%}")

Epoch [20/200] Loss: 0.6797 | Train Acc: 55.6% | Test Acc: 51.1%
Epoch [40/200] Loss: 0.7594 | Train Acc: 56.8% | Test Acc: 55.0%
Epoch [60/200] Loss: 0.6208 | Train Acc: 56.6% | Test Acc: 54.6%
Epoch [80/200] Loss: 0.7014 | Train Acc: 57.0% | Test Acc: 55.3%
Epoch [100/200] Loss: 0.6769 | Train Acc: 56.9% | Test Acc: 53.9%
Epoch [120/200] Loss: 0.6649 | Train Acc: 56.3% | Test Acc: 51.3%
Epoch [140/200] Loss: 0.6691 | Train Acc: 56.6% | Test Acc: 52.9%
Epoch [160/200] Loss: 0.6984 | Train Acc: 56.4% | Test Acc: 52.9%
Epoch [180/200] Loss: 0.6715 | Train Acc: 56.4% | Test Acc: 51.8%
Epoch [200/200] Loss: 0.6700 | Train Acc: 56.7% | Test Acc: 53.6%

[최종 테스트 정확도]: 53.63%
