# 08. Phase 6: 勝敗予測モデル - CNN版 (Colab)

## 概要
- baseline特徴量 + wardグリッドCNNのハイブリッドモデル
- Google Colab A100 GPU対応

## 前提
- `prediction_dataset.npz`をアップロード済み

In [None]:
# Cell 1: 環境確認・インストール
!nvidia-smi

import torch
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")

In [None]:
# Cell 2: データアップロード
from google.colab import files
import os

# prediction_dataset.npzをアップロード
if not os.path.exists('prediction_dataset.npz'):
    print("prediction_dataset.npzをアップロードしてください")
    uploaded = files.upload()
else:
    print("prediction_dataset.npz: 既に存在")

In [None]:
# Cell 3: ライブラリインポート
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import StratifiedKFold, LeaveOneOut
from sklearn.metrics import accuracy_score, roc_auc_score, log_loss
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt

# 再現性
SEED = 42
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed(SEED)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

In [None]:
# Cell 4: データ読み込み
data = np.load('prediction_dataset.npz', allow_pickle=True)

X_baseline = data['X_baseline']  # (N, 2, 20) - 2時点, 20特徴量
X_riot_vision = data['X_riot_vision']  # (N, 2, 3)
X_ward_grid = data['X_ward_grid']  # (N, 2, 2, 32, 32) - 2時点, 2チーム, 32x32
y = data['y']  # (N,)
match_ids = data['match_ids']
times_ms = data['times_ms']

print(f"試合数: {len(y)}")
print(f"X_baseline shape: {X_baseline.shape}")
print(f"X_ward_grid shape: {X_ward_grid.shape}")
print(f"y shape: {y.shape}")
print(f"Blue勝利: {y.sum()}, Red勝利: {len(y) - y.sum()}")
print(f"評価時点: {[t // 60000 for t in times_ms]}分")

In [None]:
# Cell 5: ハイブリッドCNNモデル定義

class HybridWinPredictor(nn.Module):
    """
    baseline特徴量 + wardグリッドCNNのハイブリッドモデル
    
    入力:
        - baseline: (batch, n_baseline_features)
        - ward_grid: (batch, 2, 32, 32) - 2チーム分のグリッド
    """
    
    def __init__(self, n_baseline_features=20, dropout_rate=0.3):
        super().__init__()
        
        # CNN部分（wardグリッド用）
        self.cnn = nn.Sequential(
            # 入力: (batch, 2, 32, 32)
            nn.Conv2d(2, 16, kernel_size=3, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(2),  # -> (batch, 16, 16, 16)
            
            nn.Conv2d(16, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2),  # -> (batch, 32, 8, 8)
            
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d(1),  # -> (batch, 64, 1, 1)
            
            nn.Flatten(),  # -> (batch, 64)
        )
        
        # MLP部分（baseline特徴量用）
        self.baseline_mlp = nn.Sequential(
            nn.Linear(n_baseline_features, 32),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
        )
        
        # 結合後の分類層
        # CNN出力(64) + baseline MLP出力(32) = 96
        self.classifier = nn.Sequential(
            nn.Linear(64 + 32, 64),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
            nn.Linear(64, 1),
        )
    
    def forward(self, baseline, ward_grid):
        # CNN処理
        cnn_features = self.cnn(ward_grid)
        
        # baseline MLP処理
        baseline_features = self.baseline_mlp(baseline)
        
        # 結合
        combined = torch.cat([cnn_features, baseline_features], dim=1)
        
        # 分類
        logits = self.classifier(combined)
        return logits.squeeze(-1)


# モデル確認
model_test = HybridWinPredictor(n_baseline_features=20)
print(model_test)
print(f"\nパラメータ数: {sum(p.numel() for p in model_test.parameters()):,}")

In [None]:
# Cell 6: 学習・評価関数

def prepare_data_for_time(X_baseline, X_ward_grid, y, time_index):
    """
    指定時点のデータを準備
    
    Args:
        time_index: 0=10分, 1=20分
    """
    # baseline: (N, 2, 20) -> (N, 20)
    baseline = X_baseline[:, time_index, :]
    
    # ward_grid: (N, 2, 2, 32, 32) -> (N, 2, 32, 32)
    # 2時点のうち指定時点、2チーム分のグリッド
    ward_grid = X_ward_grid[:, time_index, :, :, :]
    
    return baseline, ward_grid, y


def train_epoch(model, train_loader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    
    for baseline, ward_grid, labels in train_loader:
        baseline = baseline.to(device)
        ward_grid = ward_grid.to(device)
        labels = labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(baseline, ward_grid)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    
    return total_loss / len(train_loader)


def evaluate(model, test_loader, device):
    model.eval()
    all_preds = []
    all_probs = []
    all_labels = []
    
    with torch.no_grad():
        for baseline, ward_grid, labels in test_loader:
            baseline = baseline.to(device)
            ward_grid = ward_grid.to(device)
            
            outputs = model(baseline, ward_grid)
            probs = torch.sigmoid(outputs).cpu().numpy()
            preds = (probs > 0.5).astype(int)
            
            all_probs.extend(probs)
            all_preds.extend(preds)
            all_labels.extend(labels.numpy())
    
    return np.array(all_preds), np.array(all_probs), np.array(all_labels)

In [None]:
# Cell 7: Leave-One-Out Cross Validation

def run_loo_cv(X_baseline, X_ward_grid, y, time_index, 
               n_epochs=100, lr=0.001, patience=10, verbose=True):
    """
    Leave-One-Out CVでモデルを評価
    """
    # データ準備
    baseline, ward_grid, labels = prepare_data_for_time(X_baseline, X_ward_grid, y, time_index)
    
    # 標準化（baselineのみ）
    scaler = StandardScaler()
    baseline_scaled = scaler.fit_transform(baseline)
    
    # ward_gridは0-1正規化（すでに累積秒数なので最大値で割る）
    ward_grid_max = ward_grid.max()
    if ward_grid_max > 0:
        ward_grid_normalized = ward_grid / ward_grid_max
    else:
        ward_grid_normalized = ward_grid
    
    n_samples = len(labels)
    loo = LeaveOneOut()
    
    all_preds = []
    all_probs = []
    all_labels = []
    
    for fold_idx, (train_idx, test_idx) in enumerate(loo.split(baseline_scaled)):
        if verbose and fold_idx % 10 == 0:
            print(f"  Fold {fold_idx + 1}/{n_samples}")
        
        # データ分割
        X_base_train = torch.FloatTensor(baseline_scaled[train_idx])
        X_base_test = torch.FloatTensor(baseline_scaled[test_idx])
        X_grid_train = torch.FloatTensor(ward_grid_normalized[train_idx])
        X_grid_test = torch.FloatTensor(ward_grid_normalized[test_idx])
        y_train = torch.FloatTensor(labels[train_idx])
        y_test = torch.FloatTensor(labels[test_idx])
        
        # DataLoader
        train_dataset = TensorDataset(X_base_train, X_grid_train, y_train)
        train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
        
        test_dataset = TensorDataset(X_base_test, X_grid_test, y_test)
        test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)
        
        # モデル初期化
        model = HybridWinPredictor(n_baseline_features=baseline.shape[1]).to(device)
        criterion = nn.BCEWithLogitsLoss()
        optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=1e-4)
        
        # Early stopping用
        best_loss = float('inf')
        patience_counter = 0
        
        # 学習
        for epoch in range(n_epochs):
            train_loss = train_epoch(model, train_loader, criterion, optimizer, device)
            
            if train_loss < best_loss:
                best_loss = train_loss
                patience_counter = 0
            else:
                patience_counter += 1
            
            if patience_counter >= patience:
                break
        
        # テスト
        preds, probs, _ = evaluate(model, test_loader, device)
        all_preds.extend(preds)
        all_probs.extend(probs)
        all_labels.extend(y_test.numpy())
    
    # 評価指標計算
    all_preds = np.array(all_preds)
    all_probs = np.array(all_probs)
    all_labels = np.array(all_labels)
    
    accuracy = accuracy_score(all_labels, all_preds)
    auc = roc_auc_score(all_labels, all_probs)
    logloss = log_loss(all_labels, all_probs)
    
    return {
        'accuracy': accuracy,
        'auc': auc,
        'logloss': logloss,
        'predictions': all_preds,
        'probabilities': all_probs,
        'labels': all_labels,
    }

In [None]:
# Cell 8: 10分時点の評価

print("=" * 50)
print("10分時点: Hybrid CNN (baseline + wardグリッド)")
print("=" * 50)

results_10min_cnn = run_loo_cv(
    X_baseline, X_ward_grid, y,
    time_index=0,  # 10分
    n_epochs=100,
    lr=0.001,
    patience=15,
    verbose=True,
)

print(f"\n=== 10分時点 結果 ===")
print(f"Accuracy: {results_10min_cnn['accuracy']:.3f}")
print(f"AUC: {results_10min_cnn['auc']:.3f}")
print(f"LogLoss: {results_10min_cnn['logloss']:.3f}")

In [None]:
# Cell 9: 20分時点の評価

print("=" * 50)
print("20分時点: Hybrid CNN (baseline + wardグリッド)")
print("=" * 50)

results_20min_cnn = run_loo_cv(
    X_baseline, X_ward_grid, y,
    time_index=1,  # 20分
    n_epochs=100,
    lr=0.001,
    patience=15,
    verbose=True,
)

print(f"\n=== 20分時点 結果 ===")
print(f"Accuracy: {results_20min_cnn['accuracy']:.3f}")
print(f"AUC: {results_20min_cnn['auc']:.3f}")
print(f"LogLoss: {results_20min_cnn['logloss']:.3f}")

In [None]:
# Cell 10: 結果比較（08のロジスティック回帰結果と比較）

# 08_win_prediction.ipynbの結果（手動入力）
results_lr = {
    '10min': {
        'baseline': {'accuracy': 0.598, 'auc': 0.648},
        'baseline_riot': {'accuracy': 0.685, 'auc': 0.710},
        'baseline_grid_pca': {'accuracy': 0.598, 'auc': 0.648},
    },
    '20min': {
        'baseline': {'accuracy': 0.804, 'auc': 0.901},
        'baseline_riot': {'accuracy': 0.848, 'auc': 0.907},
        'baseline_grid_pca': {'accuracy': 0.804, 'auc': 0.884},
    },
}

print("=" * 60)
print("モデル比較: ロジスティック回帰 vs Hybrid CNN")
print("=" * 60)

print("\n--- 10分時点 ---")
print(f"{'モデル':<30} {'Accuracy':<12} {'AUC':<12}")
print("-" * 54)
print(f"{'baseline (LR)':<30} {results_lr['10min']['baseline']['accuracy']:<12.3f} {results_lr['10min']['baseline']['auc']:<12.3f}")
print(f"{'baseline + Riot visionScore (LR)':<30} {results_lr['10min']['baseline_riot']['accuracy']:<12.3f} {results_lr['10min']['baseline_riot']['auc']:<12.3f}")
print(f"{'baseline + wardグリッドPCA (LR)':<30} {results_lr['10min']['baseline_grid_pca']['accuracy']:<12.3f} {results_lr['10min']['baseline_grid_pca']['auc']:<12.3f}")
print(f"{'baseline + wardグリッド (CNN)':<30} {results_10min_cnn['accuracy']:<12.3f} {results_10min_cnn['auc']:<12.3f}")

print("\n--- 20分時点 ---")
print(f"{'モデル':<30} {'Accuracy':<12} {'AUC':<12}")
print("-" * 54)
print(f"{'baseline (LR)':<30} {results_lr['20min']['baseline']['accuracy']:<12.3f} {results_lr['20min']['baseline']['auc']:<12.3f}")
print(f"{'baseline + Riot visionScore (LR)':<30} {results_lr['20min']['baseline_riot']['accuracy']:<12.3f} {results_lr['20min']['baseline_riot']['auc']:<12.3f}")
print(f"{'baseline + wardグリッドPCA (LR)':<30} {results_lr['20min']['baseline_grid_pca']['accuracy']:<12.3f} {results_lr['20min']['baseline_grid_pca']['auc']:<12.3f}")
print(f"{'baseline + wardグリッド (CNN)':<30} {results_20min_cnn['accuracy']:<12.3f} {results_20min_cnn['auc']:<12.3f}")

In [None]:
# Cell 11: 貢献度計算

print("\n=== CNNによるwardグリッドの貢献度 ===")

# 10分時点
baseline_10 = results_lr['10min']['baseline']['accuracy']
cnn_10 = results_10min_cnn['accuracy']
lift_10 = (cnn_10 - baseline_10) * 100

print(f"\n10分時点:")
print(f"  baseline: {baseline_10:.1%}")
print(f"  baseline + wardグリッド (CNN): {cnn_10:.1%}")
print(f"  精度向上: {lift_10:+.2f}%")

# 20分時点
baseline_20 = results_lr['20min']['baseline']['accuracy']
cnn_20 = results_20min_cnn['accuracy']
lift_20 = (cnn_20 - baseline_20) * 100

print(f"\n20分時点:")
print(f"  baseline: {baseline_20:.1%}")
print(f"  baseline + wardグリッド (CNN): {cnn_20:.1%}")
print(f"  精度向上: {lift_20:+.2f}%")

# Riot visionScoreとの比較
riot_10 = results_lr['10min']['baseline_riot']['accuracy']
riot_20 = results_lr['20min']['baseline_riot']['accuracy']

print(f"\n=== Riot visionScoreとの比較 ===")
print(f"10分時点: CNN {cnn_10:.1%} vs Riot {riot_10:.1%} (差: {(cnn_10 - riot_10) * 100:+.2f}%)")
print(f"20分時点: CNN {cnn_20:.1%} vs Riot {riot_20:.1%} (差: {(cnn_20 - riot_20) * 100:+.2f}%)")

In [None]:
# Cell 12: 結果の可視化

fig, axes = plt.subplots(1, 2, figsize=(12, 5))

models = ['baseline\n(LR)', 'baseline+Riot\n(LR)', 'baseline+grid\n(LR)', 'baseline+grid\n(CNN)']
colors = ['#2ecc71', '#3498db', '#e74c3c', '#9b59b6']

# 10分時点
acc_10 = [
    results_lr['10min']['baseline']['accuracy'],
    results_lr['10min']['baseline_riot']['accuracy'],
    results_lr['10min']['baseline_grid_pca']['accuracy'],
    results_10min_cnn['accuracy'],
]
axes[0].bar(models, acc_10, color=colors)
axes[0].set_ylabel('Accuracy')
axes[0].set_title('10分時点')
axes[0].set_ylim(0.5, 1.0)
axes[0].axhline(y=0.5, color='gray', linestyle='--', label='ランダム')
for i, v in enumerate(acc_10):
    axes[0].text(i, v + 0.02, f'{v:.1%}', ha='center')

# 20分時点
acc_20 = [
    results_lr['20min']['baseline']['accuracy'],
    results_lr['20min']['baseline_riot']['accuracy'],
    results_lr['20min']['baseline_grid_pca']['accuracy'],
    results_20min_cnn['accuracy'],
]
axes[1].bar(models, acc_20, color=colors)
axes[1].set_ylabel('Accuracy')
axes[1].set_title('20分時点')
axes[1].set_ylim(0.5, 1.0)
axes[1].axhline(y=0.5, color='gray', linestyle='--', label='ランダム')
for i, v in enumerate(acc_20):
    axes[1].text(i, v + 0.02, f'{v:.1%}', ha='center')

plt.tight_layout()
plt.savefig('model_comparison_cnn.png', dpi=150, bbox_inches='tight')
plt.show()

print("\n保存: model_comparison_cnn.png")

In [None]:
# Cell 13: 結果サマリー

print("=" * 60)
print("実験完了: Hybrid CNN (baseline + wardグリッド)")
print("=" * 60)

print(f"\n【結論】")
if cnn_10 > baseline_10 or cnn_20 > baseline_20:
    print("  CNNによるwardグリッド特徴量は予測精度に寄与する")
else:
    print("  CNNを使用してもwardグリッド特徴量の寄与は限定的")

print(f"\n【数値】")
print(f"  10分時点: {lift_10:+.2f}% (baseline {baseline_10:.1%} → CNN {cnn_10:.1%})")
print(f"  20分時点: {lift_20:+.2f}% (baseline {baseline_20:.1%} → CNN {cnn_20:.1%})")

print(f"\n【Riot visionScoreとの比較】")
if cnn_10 > riot_10 or cnn_20 > riot_20:
    print("  CNNがRiot visionScoreを上回る時点あり")
else:
    print("  全時点でRiot visionScoreが優位")

In [None]:
# Cell 14: 結果ダウンロード

import json

# 結果をJSONで保存
results_summary = {
    '10min': {
        'accuracy': float(results_10min_cnn['accuracy']),
        'auc': float(results_10min_cnn['auc']),
        'logloss': float(results_10min_cnn['logloss']),
        'lift_vs_baseline': float(lift_10),
    },
    '20min': {
        'accuracy': float(results_20min_cnn['accuracy']),
        'auc': float(results_20min_cnn['auc']),
        'logloss': float(results_20min_cnn['logloss']),
        'lift_vs_baseline': float(lift_20),
    },
}

with open('cnn_results.json', 'w') as f:
    json.dump(results_summary, f, indent=2)

print("結果保存: cnn_results.json")

# ダウンロード
from google.colab import files
files.download('cnn_results.json')
files.download('model_comparison_cnn.png')