In [1]:
import os 
import random
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models

from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
from einops.layers.torch import Rearrange
from tqdm.notebook import tqdm
from termcolor import cprint
from PIL import Image
from torchvision.transforms import transforms

In [2]:
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)

if torch.backends.mps.is_available():
    device = torch.device("mps")
elif torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

print(f"Using device: {device}")

Using device: mps


In [5]:
# 学習、検証、テスト用のデータセットの定義

class ThingsEEGDataset(torch.utils.data.Dataset):
    def __init__(self, split: str):
        super().__init__()
        assert split in ["train", "val", "test"], f"Invalid split: {split}"
        
        self.X = torch.from_numpy(np.load(f"data/{split}/eeg.npy")).to(torch.float32)
        self.subject_idxs = torch.from_numpy(np.load(f"data/{split}/subject_idxs.npy"))

        if split in ["train", "val"]:
            self.y = torch.from_numpy(np.load(f"data/{split}/labels.npy"))
        else:
            self.y = None # testセットにはラベルがない

        print(f"[{split.upper()} SET] EEG: {self.X.shape}, Subject Indices: {self.subject_idxs.shape}", end="")
        if self.y is not None:
            print(f", Labels: {self.y.shape}")
        else:
            print()


    def __len__(self):
        return len(self.X)

    def __getitem__(self, i):
        if self.y is not None:
            return self.X[i], self.y[i], self.subject_idxs[i]
        else:
            return self.X[i], self.subject_idxs[i]

    @property
    def num_classes(self) -> int:
        return 5 # animal, food, clothing, tool, vehicle

    @property
    def num_channels(self) -> int:
        return self.X.shape[1]

    @property
    def seq_len(self) -> int:
        return self.X.shape[2]

In [15]:
# 脳波データとそれに対応する画像の両方を読み込むデータセットの定義

class ImageEEGDataset(torch.utils.data.Dataset):
    def __init__(self, split: str, image_dir: str = "training_images", subset_size: int = None):
        super().__init__()
        assert split in ["train", "val"], f"Invalid split: {split}"

        self.X_eeg_full = torch.from_numpy(np.load(f"data/{split}/eeg.npy")).to(torch.float32)

        project_root = os.getcwd()
        print(f"Project root detected as: {project_root}")

        self.image_dir_abs = os.path.join(project_root, image_dir)
        if not os.path.isdir(self.image_dir_abs):
            raise FileNotFoundError(f"Image directory not found: {self.image_dir_abs}")

        with open(f"data/{split}/image_paths.txt", "r") as f:
            self.image_paths_full = [line.strip() for line in f.readlines()]

        if subset_size is not None:
            print(f"Using a subset of {subset_size} samples from the full dataset.")
            self.X_eeg = self.X_eeg_full[:subset_size]
            self.image_paths = self.image_paths_full[:subset_size]
        else:
            print("Using the full dataset.")
            self.X_eeg = self.X_eeg_full
            self.image_paths = self.image_paths_full
        
        self.image_dir = image_dir

        # 画像の前処理
        self.transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ])
    
    def __len__(self):
        return len(self.X_eeg)

    def __getitem__(self, i):
        eeg_data = self.X_eeg[i]
        
        image_path = os.path.join(self.image_dir, self.image_paths[i])
        image = Image.open(image_path).convert("RGB")
        image_data = self.transform(image)
        
        return eeg_data, image_data

In [None]:
class ConvBlock(nn.Module):
    def __init__(self, in_dim, out_dim, kernel_size: int = 3, p_drop: float = 0.1):
        super().__init__()
        self.in_dim = in_dim
        self.out_dim = out_dim

        self.conv0 = nn.Conv1d(in_dim, out_dim, kernel_size, padding="same")
        self.conv1 = nn.Conv1d(out_dim, out_dim, kernel_size, padding="same")
        self.batchnorm0 = nn.BatchNorm1d(num_features=out_dim)
        self.batchnorm1 = nn.BatchNorm1d(num_features=out_dim)
        self.dropout = nn.Dropout(p_drop)

    def forward(self, X: torch.Tensor) -> torch.Tensor:
        if self.in_dim == self.out_dim:
            X_skip = X
            X = self.conv0(X)
            X = X + X_skip # スキップ接続
        else:
            X = self.conv0(X)

        X = F.gelu(self.batchnorm0(X))
        
        X_skip = X
        X = self.conv1(X)
        X = X + X_skip # スキップ接続
        X = F.gelu(self.batchnorm1(X))

        return self.dropout(X)

class ConvRNNEncoder(nn.Module):
    def __init__(self, in_channels: int, hid_dim: int = 128, rnn_layers: int = 2, embedding_dim: int = 256):
        super().__init__()

        # CNN Part: 局所的な特徴を抽出
        self.cnn = nn.Sequential(
            ConvBlock(in_channels, hid_dim),
            nn.MaxPool1d(2),
            ConvBlock(hid_dim, hid_dim),
            nn.MaxPool1d(2),
        )

        # RNN Part: 時系列の長期的な依存関係を捉える
        self.rnn = nn.LSTM(
            input_size=hid_dim,
            hidden_size=embedding_dim,
            num_layers=rnn_layers,
            batch_first=True,
            bidirectional=True
        )
        self.fc = nn.Linear(embedding_dim * 2 * rnn_layers, embedding_dim)
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # x: (batch_size, in_channels, seq_len)
        x = self.cnn(x) # (batch_size, hid_dim, seq_len / 4)
        x = x.permute(0, 2, 1) # (battch_size, seq_len / 4, hid_dim)

        _, (h_n, _) = self.rnn(x) # h_n: (num_layers * 2, batch_size, embeding_dim)
        h_n = h_n.permute(1, 0, 2).contiguous() # (batch_size, num_layers*2, embedding_dim)
        h_n = h_n.view(h_n.size(0), -1) # batch_size, num_layers*2*embedding_dim)

        embedding = self.fc(h_n)
        return embedding

In [8]:
# 画像と脳波をエンコードし、埋め込みベクトルを出力するモデル
class MultiModalModel(nn.Module):
    def __init__(self, eeg_encoder: nn.Module, embedding_dim: int = 256):
        super().__init__()
        self.eeg_encoder = eeg_encoder

        # 画像エンコーダーとして事前学習済みのResNetを使用
        self.image_encoder = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
        # ResNetの最終層を付け替えて、埋め込みベクトルを出力するようにする
        num_ftrs = self.image_encoder.fc.in_features
        self.image_encoder.fc = nn.Linear(num_ftrs, embedding_dim)
    
    def forward(self, eeg, image):
        eeg_embedding = self.eeg_encoder(eeg)
        image_embedding = self.image_encoder(image)
        return eeg_embedding, image_embedding

# 対照学習損失 (InfoNCE Loss)
def contrastive_loss(eeg_embeds, img_embeds, temperature=0.07):
    # L2正規化
    eeg_embeds = F.normalize(eeg_embeds, p=2, dim=-1)
    img_embeds = F.normalize(img_embeds, p=2, dim=-1)
    
    # コサイン類似度行列を計算
    logits = torch.matmul(eeg_embeds, img_embeds.T) / temperature
    
    # 正解ラベルは対角成分
    labels = torch.arange(len(logits)).to(logits.device)
    
    # EEG->Image と Image->EEG の両方向で損失を計算
    loss_eeg = F.cross_entropy(logits, labels)
    loss_img = F.cross_entropy(logits.T, labels)
    
    return (loss_eeg + loss_img) / 2

In [None]:
# --- 事前学習の準備 ---
pretrain_epochs = 20 
pretrain_lr = 1e-4
embedding_dim = 256
subset_data_size = 30000

pretrain_dataset = ImageEEGDataset("train", subset_size=subset_data_size)
pretrain_loader = DataLoader(pretrain_dataset, batch_size=128, shuffle=True)

eeg_encoder = ConvRNNEncoder(in_channels=17, embedding_dim=embedding_dim).to(device)
pretrain_model = MultiModalModel(eeg_encoder, embedding_dim=embedding_dim).to(device)

for param in pretrain_model.parameters():
    param.requires_grad = False # prameterを凍結して計算負荷を軽減
print("Pretraining model parameters set to not require gradients.")

for param in pretrain_model.eeg_encoder.parameters():
    param.requires_grad = True # EEG encoder parameters は学習対象
print("EEG encoder parameters are explicitly set to be trainable.")

# 最適化手法
trainable_params = filter(lambda p: p.requires_grad, pretrain_model.parameters())
pretrain_optimizer = torch.optim.Adam(trainable_params, lr=pretrain_lr)

Project root detected as: /Users/odajohan/DL基礎講座/Deep-Learning-Basic-2025-
Using a subset of 30000 samples from the full dataset.
Pretraining model parameters set to not require gradients.
EEG encoder parameters are explicitly set to be trainable.


In [None]:
import time

# --- 事前学習ループ ---
training_time = 0
for epoch in range(pretrain_epochs):
    pretrain_model.train()
    total_loss = 0
    start_time = time.time()
    for eeg, image in tqdm(pretrain_loader, desc=f"Pre-training Epoch {epoch+1}"):
        eeg, image = eeg.to(device), image.to(device)
        
        eeg_embeds, img_embeds = pretrain_model(eeg, image)
        
        loss = contrastive_loss(eeg_embeds, img_embeds)
        
        pretrain_optimizer.zero_grad()
        loss.backward()
        pretrain_optimizer.step()
        
        total_loss += loss.item()
        
    avg_loss = total_loss / len(pretrain_loader)
    print(f"Epoch {epoch+1}/{pretrain_epochs}, Contrastive Loss: {avg_loss:.4f}")
    print(f"Epoch {epoch+1} training time: {time.time() - start_time:.2f} seconds")
    training_time += time.time() - start_time

# --- 事前学習済みの脳波エンコーダの重みを保存 ---
torch.save(pretrain_model.eeg_encoder.state_dict(), "pretrained_eeg_encoder.pt")
print("\nPre-trained EEG encoder weights saved to pretrained_eeg_encoder.pt")
print(f"Total training time for pre-training: {training_time:.2f} seconds")


In [21]:
class FinalClassfier(nn.Module):
    """事前学習済みのエンコーダーに分類ヘッドを追加するモデル"""
    def __init__(self, pretrained_encoder: nn.Module , num_classes: int = 5):
        super().__init__()
        self.encoder = pretrained_encoder

        embedding_dim = self.encoder.fc.out_features
        self.classifier_head = nn.Linear(embedding_dim, num_classes)
    
    def forward(self, x):
        features = self.encoder(x)
        return self.classifier_head(features)

In [22]:
# ベースとなる分類器の初期化
eeg_encoder_for_finetune = ConvRNNEncoder(in_channels=17, embedding_dim=256)

try:
    eeg_encoder_for_finetune.load_state_dict(torch.load("pretrained_eeg_encoder.pt", map_location=device))
    print("Pre-trained EEG encoder weights loaded successfully.")
except FileNotFoundError:
    print("Pre-trained EEG encoder weights not found. Please run the pre-training step first.")

finetune_model = FinalClassfier(eeg_encoder_for_finetune, num_classes=5).to(device)

params_to_update =  [
    {"params": finetune_model.encoder.parameters(), 'lr': 1e-5},
    {"params": finetune_model.classifier_head.parameters(), 'lr': 1e-3}
]
finetune_optimizer = torch.optim.Adam(params_to_update)
print("Fine-tuning model parameters set with different learning rates for encoder and classifier head.")

Pre-trained EEG encoder weights loaded successfully.
Fine-tuning model parameters set with different learning rates for encoder and classifier head.


  eeg_encoder_for_finetune.load_state_dict(torch.load("pretrained_eeg_encoder.pt", map_location=device))


In [23]:
finetune_batch_size = 512
finetune_epochs = 50

train_set = ThingsEEGDataset("train")
train_loader = DataLoader(train_set, batch_size=finetune_batch_size, shuffle=True)

val_set = ThingsEEGDataset("val")
val_loader = DataLoader(val_set, batch_size=finetune_batch_size, shuffle=False)

[TRAIN SET] EEG: torch.Size([118800, 17, 100]), Subject Indices: torch.Size([118800]), Labels: torch.Size([118800])
[VAL SET] EEG: torch.Size([59400, 17, 100]), Subject Indices: torch.Size([59400]), Labels: torch.Size([59400])


In [25]:
# 評価指標（正解率）
def accuracy(y_pred, y):
    return (y_pred.argmax(dim=-1) == y).float().mean()

# TensorBoardの準備
writer = SummaryWriter("runs/eeg_fine_tune_experiment_1")

In [None]:
max_val_acc = 0.0

for epoch in range(finetune_epochs):
    print(f"Epoch {epoch+1}/{finetune_epochs}")
    
    # ------ 訓練 ------
    finetune_model.train()
    train_loss_list, train_acc_list = [], []
    for X, y, subject_idxs in tqdm(train_loader, desc="Train"):
        X, y = X.to(device), y.to(device)
        
        y_pred = finetune_model(X)
        loss = F.cross_entropy(y_pred, y)
        
        finetune_optimizer.zero_grad()
        loss.backward()
        finetune_optimizer.step()
        
        train_loss_list.append(loss.item())
        train_acc_list.append(accuracy(y_pred, y).item())

    avg_train_loss = np.mean(train_loss_list)
    avg_train_acc = np.mean(train_acc_list)

    # ------ 検証 ------
    finetune_model.eval()
    val_loss_list, val_acc_list = [], []
    with torch.no_grad():
        for X, y, subject_idxs in tqdm(val_loader, desc="Validation"):
            X, y = X.to(device), y.to(device)
            y_pred = finetune_model(X)
            loss = F.cross_entropy(y_pred, y) # 損失関数は要確認
            val_loss_list.append(loss.item())
            val_acc_list.append(accuracy(y_pred, y).item())
            
    avg_val_loss = np.mean(val_loss_list)
    avg_val_acc = np.mean(val_acc_list)

    # --- ログ表示 & TensorBoard記録 ---
    print(f"  Train Loss: {avg_train_loss:.4f}, Train Acc: {avg_train_acc:.4f}")
    print(f"  Val   Loss: {avg_val_loss:.4f}, Val   Acc: {avg_val_acc:.4f}")

    writer.add_scalar("Loss/train", avg_train_loss, epoch)
    writer.add_scalar("Accuracy/train", avg_train_acc, epoch)
    writer.add_scalar("Loss/val", avg_val_loss, epoch)
    writer.add_scalar("Accuracy/val", avg_val_acc, epoch)

    # --- モデルのベストパラメータを保存 ---
    if avg_val_acc > max_val_acc:
        cprint(f"  New best validation accuracy! Saving model to finetune_model_best.pt", "cyan")
        torch.save(finetune_model.state_dict(), "finetune_model_best.pt")
        max_val_acc = avg_val_acc

writer.close()
print("\nTraining finished.")

In [27]:

batch_size = finetune_batch_size

test_set = ThingsEEGDataset("test")
test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False)


encoder_base = ConvRNNEncoder(in_channels=test_set.num_channels, embedding_dim=256)
model = FinalClassfier(encoder_base, num_classes=test_set.num_classes).to(device)

model.load_state_dict(torch.load("finetune_model_best.pt", map_location=device))
print("Best model weights loaded.")

preds = []
model.eval()
with torch.no_grad(): 
    for X, subject_idxs in tqdm(test_loader, desc="Evaluation"):
        X = X.to(device)
        y_pred = model(X)
        preds.append(y_pred.detach().cpu()) # 予測結果をCPUに移してから保存

# 全ての予測結果を一つのテンサーに結合し、NumPy配列に変換
preds = torch.cat(preds, dim=0).numpy()

# 提出用ファイルとして保存
np.save(f"submission_finetuned.npy", preds)
print(f"\nSubmission file 'submission.npy' saved with shape: {preds.shape}")

[TEST SET] EEG: torch.Size([59400, 17, 100]), Subject Indices: torch.Size([59400])
Best model weights loaded.


  model.load_state_dict(torch.load("finetune_model_best.pt", map_location=device))


Evaluation:   0%|          | 0/117 [00:00<?, ?it/s]


Submission file 'submission.npy' saved with shape: (59400, 5)
