# 3モーダルモデル

In [None]:
#警告表示を避ける（実行時に固まることがあるため）
import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"

In [None]:
# デバイスの確認
import torch
print("CUDA available:", torch.cuda.is_available())
print("Device:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "CPU only")

## データの読み込み

In [None]:
# ライブラリのimport
import pandas as pd
from pathlib import Path

# データが格納されている場所を指定
COMPETITION_DATA_DIR = Path("/kaggle/input/joai-competition-2025")

# CSVファイルを読み込み、データフレームとして格納
train_df = pd.read_csv(COMPETITION_DATA_DIR / 'train.csv')
test_df = pd.read_csv(COMPETITION_DATA_DIR / 'test.csv')
sample_submission_df = pd.read_csv(COMPETITION_DATA_DIR / 'sample_submission.csv')

## モデルの学習

In [None]:
from dataclasses import dataclass
from pathlib import Path

import numpy as np
import pandas as pd
import timm
import torch
import torch.nn as nn
import torch.optim as optim
from PIL import Image
from sklearn.metrics import accuracy_score, classification_report, f1_score, log_loss
from sklearn.model_selection import StratifiedKFold
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt


from transformers import AutoTokenizer, AutoModel

In [None]:
# ハイパーパラメータの設定

# 分類したいクラスは全部で4種類（Mixture, NoGas, Perfume, Smoke）
NUM_CLASSES = 4

# データの中で予測したい列の名前（ターゲット列）
TARGET_COL = "Gas"

# カテゴリ名（文字列）を数値のラベルに変換する辞書
NAME2LABEL = {"Mixture": 0, "NoGas": 1, "Perfume": 2, "Smoke": 3}

# 数値ラベルからカテゴリ名を取り出すための辞書
LABEL2NAME = dict(zip(NAME2LABEL.values(), NAME2LABEL.keys()))

@dataclass
class EnvConfig:
    data_dir: Path = Path("/kaggle/input/joai-competition-2025")
    image_dir: Path = data_dir / "images"
    train_path: Path = data_dir / "train.csv"
    test_path: Path = data_dir / "test.csv"
    model_save_dir: Path = Path("/kaggle/working/output")

    def __init__(self):
        self.model_save_dir.mkdir(parents=True, exist_ok=True)


@dataclass
class ExpConfig:
    seed: int = 42
    # 要修正
    num_folds: int = 5 # 最後は6だが5のほうが良さそう
    # 要修正
    batch_size: int = 32
    # 要修正
    num_epochs: int = 18 # 15, 20などと変えてみた
    # 要修正
    learning_rate: float = 3e-4 #これが一番良さそう
    num_workers: int = 4
    img_model_name: str = "resnet50.a1h_in1k"
    img_pretrained: bool = True

    # 後で変更可能
    text_model_name: str = "microsoft/deberta-base" # FacebookAI/roberta-baseと使い分け

@dataclass
class Config:
    env = EnvConfig()
    exp = ExpConfig()

In [None]:
# Configクラスのインスタンスを作成（env, exp両方の設定が読み込まれる）
cfg = Config()

# ExpConfigのseedを参照してみる
cfg.exp.seed  # 42という値が取得できる

### モデルの定義

In [None]:
class GasModel(nn.Module):
    def __init__(self, cfg: Config):
        super().__init__()
        self.cfg = cfg

        # 画像モデル（最終層なし）
        self.img_model = timm.create_model(
            cfg.exp.img_model_name,
            pretrained=cfg.exp.img_pretrained,
            num_classes=0
        )
        img_feat_dim = self.img_model.num_features  # e.g., 2048

        # センサ用MLP（2次元 → 64次元）
        self.sensor_mlp = nn.Sequential(
            nn.Linear(2, 64),
            nn.ReLU(),
            nn.Linear(64, 64)
        )

        # テキストモデル（DeBERTa）
        from transformers import AutoModel
        self.text_model = AutoModel.from_pretrained(cfg.exp.text_model_name)
        text_feat_dim = self.text_model.config.hidden_size  # e.g., 768

        # 結合後の全結合層（2048 + 64 + 768 → 256 → 4）
        self.classifier = nn.Sequential(
            nn.Linear(img_feat_dim + 64 + text_feat_dim, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, NUM_CLASSES)
        )

    def forward(self, x_img, x_sensor, input_ids, attention_mask):
        # 画像特徴量
        img_feat = self.img_model(x_img)  # (B, 2048)

        # センサ特徴量
        sensor_feat = self.sensor_mlp(x_sensor)  # (B, 64)

        # テキスト特徴（CLSトークン）
        text_outputs = self.text_model(input_ids=input_ids, attention_mask=attention_mask)
        text_feat = text_outputs.last_hidden_state[:, 0, :]  # (B, 768)

        # 結合 → 分類
        x = torch.cat([img_feat, sensor_feat, text_feat], dim=1)
        out = self.classifier(x)
        return out


# Configの設定を用いてGasModelのインスタンスを作成
model = GasModel(cfg)

### Dataset

In [None]:
# Datasetクラスの定義とデータオーグメンテーション
from torchvision import transforms

class TrainGasDataset(Dataset):
    def __init__(self, cfg: Config, df: pd.DataFrame, tokenizer):
        self.cfg = cfg
        self.df = df
        self.tokenizer = tokenizer

        # 画像用のtransform（データオーグメンテーションを追加）
        self.transform = transforms.Compose([
            # ランダムに画像を切り取る（大きさを224x224に変更）
            transforms.RandomResizedCrop(size=224, scale=(0.95, 1.0)),  # 少し緩やかにしてみた
            # ランダムに画像を反転
            transforms.RandomHorizontalFlip(p=0.5),
            # ランダムに画像を回転（角度を小さく）
            # transforms.RandomRotation(degrees=8), # ここも緩やかに。提出時は入れていないが、入れるのもアリ。
            # 色調をランダムに変更（明るさ、コントラスト、色、シャープネスなど）
            transforms.ColorJitter(brightness=0.15, contrast=0.15, saturation=0.15, hue=0.05), # 範囲を縮小
            # 画像をテンソルに変換
            transforms.ToTensor(),
            # 汎用性能を増加
            transforms.RandomErasing(p=0.1, scale=(0.01, 0.05)), #追加した方がよさそう
            # 画像を正規化（平均値と標準偏差を設定）
            transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                 std=[0.229, 0.224, 0.225]),
        ])


        # センサーデータ用の変換（手動標準化）
        self.sensor_mean = [0.0, 0.0]  # 例: MQ8とMQ5の平均
        self.sensor_std = [1.0, 1.0]   # 例: 標準偏差（必要に応じて調整）

    def __len__(self):
        return len(self.df)

    def __getitem__(self, index):
        row = self.df.iloc[index]

        # 画像読み込みと変換
        img_path = self.cfg.env.image_dir / row["image_path_uuid"]
        image = Image.open(img_path).convert("RGB")
        image = self.transform(image)

        # センサデータ（MQ8, MQ5）の取得と手動標準化
        sensor = torch.tensor([row["MQ8"], row["MQ5"]], dtype=torch.float32)
        sensor = (sensor - torch.tensor(self.sensor_mean)) / torch.tensor(self.sensor_std)  # 標準化

        # キャプション → トークナイズ（DeBERTaなどの入力形式に変換）
        caption = row["Caption"]
        tokenized = self.tokenizer(
            caption,
            padding="max_length",
            truncation=True,
            max_length=64,
            return_tensors="pt"
        )
        input_ids = tokenized["input_ids"].squeeze(0)  # (seq_len,)
        attention_mask = tokenized["attention_mask"].squeeze(0)

        # ラベルの取得
        label = NAME2LABEL[row[TARGET_COL]]

        return {
            "image": image,
            "sensor": sensor,
            "input_ids": input_ids,
            "attention_mask": attention_mask,
            "label": label
        }


from transformers import AutoTokenizer

# DeBERTaのトークナイザーを作成
tokenizer = AutoTokenizer.from_pretrained(cfg.exp.text_model_name)

# tokenizer を渡して Dataset を作成
train_dataset = TrainGasDataset(cfg, train_df, tokenizer)

In [None]:
def prepare_folds(train_df: pd.DataFrame, cfg: Config) -> pd.DataFrame:
    """
    StratifiedKFoldを使って、train_dfにfoldカラム（交差検証の番号）を追加する
    """
    # データ数だけ用意した配列をfold番号で埋める
    fold_array = np.zeros(len(train_df), dtype=np.int32)
    
    # StratifiedKFold: ラベルのバランスを保ちながら複数の分割を作る
    skf = StratifiedKFold(
        n_splits=cfg.exp.num_folds,
        shuffle=True,
        random_state=cfg.exp.seed
    )
    
    # valid_idxに該当するデータにfold番号を割り当てる
    for fold, (_, valid_idx) in enumerate(skf.split(train_df, train_df[TARGET_COL])):
        fold_array[valid_idx] = fold
    
    # 元のDataFrameにfoldカラムを追加して返す
    train_df['fold'] = fold_array
    return train_df

# train_dfにfold列を追加（交差検証で使う）
train_df = prepare_folds(train_df, cfg)
train_df.head()

In [None]:
# 中身の確認
sample = train_dataset[0]

# 画像とラベルを取り出す
image = sample["image"]
label = sample["label"]

# 確認
print(image.shape)  # 例: torch.Size([3, 224, 224])
print(label)        # 例: 2

### 学習・検証

In [None]:
# 各foldごとにcfg.exp.num_epochs回全ての学習用データを学習

# 早期終了の活用
class EarlyStopping:
    def __init__(self, patience=3, verbose=False):
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False

    def __call__(self, val_loss):
        if self.best_score is None:
            self.best_score = val_loss
        elif val_loss < self.best_score:
            self.best_score = val_loss
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
                if self.verbose:
                    print("Early stopping triggered")


In [None]:
from torch.utils.data.dataloader import default_collate
from transformers import AutoTokenizer
from torch.optim.lr_scheduler import CosineAnnealingLR  # 追加: スケジューラのインポート

def train_one_epoch(model, train_loader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    for batch in tqdm(train_loader, desc="Training"):
        images = batch["image"].to(device)
        sensors = batch["sensor"].to(device)
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["label"].to(device)

        optimizer.zero_grad()
        outputs = model(images, sensors, input_ids, attention_mask)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    return total_loss / len(train_loader)  # 損失をバッチ数で割る

def validate(model, valid_loader, criterion, device):
    model.eval()
    total_loss = 0
    total_correct = 0
    total_samples = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for batch in tqdm(valid_loader, desc="Validation"):
            images = batch["image"].to(device)
            sensors = batch["sensor"].to(device)
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["label"].to(device)

            outputs = model(images, sensors, input_ids, attention_mask)
            loss = criterion(outputs, labels)

            total_loss += loss.item()
            predicted = outputs.argmax(dim=1)
            total_correct += predicted.eq(labels).sum().item()
            total_samples += labels.size(0)

            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    avg_loss = total_loss / len(valid_loader)
    accuracy = total_correct / total_samples
    f1 = f1_score(all_labels, all_preds, average='weighted')
    return avg_loss, accuracy, f1

def train_fold(fold: int, train_df: pd.DataFrame, cfg: Config, device):
    print(f"\n{'*' * 20}\nFold {fold}")

    tokenizer = AutoTokenizer.from_pretrained(cfg.exp.text_model_name)

    train_fold_df = train_df[train_df['fold'] != fold].reset_index(drop=True)
    valid_fold_df = train_df[train_df['fold'] == fold].reset_index(drop=True)

    train_dataset = TrainGasDataset(cfg, train_fold_df, tokenizer)
    valid_dataset = TrainGasDataset(cfg, valid_fold_df, tokenizer)

    train_loader = DataLoader(
        train_dataset, batch_size=cfg.exp.batch_size, shuffle=True,
        num_workers=cfg.exp.num_workers, pin_memory=True, collate_fn=default_collate
    )
    valid_loader = DataLoader(
        valid_dataset, batch_size=cfg.exp.batch_size, shuffle=False,
        num_workers=cfg.exp.num_workers, pin_memory=True, collate_fn=default_collate
    )

    model = GasModel(cfg).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=cfg.exp.learning_rate)

    # 追加: CosineAnnealingLR スケジューラの定義
    scheduler = CosineAnnealingLR(optimizer, T_max=cfg.exp.num_epochs)

    # early stoppingのインスタンスを作成
    early_stopping = EarlyStopping(patience=5, verbose=True)
    
    for epoch in range(cfg.exp.num_epochs):
        print(f"Epoch {epoch}")
        train_loss = train_one_epoch(model, train_loader, criterion, optimizer, device)
        valid_loss, valid_accuracy, valid_f1 = validate(model, valid_loader, criterion, device)
    
        print(f"Train Loss: {train_loss:.4f} | Valid Loss: {valid_loss:.4f} | "
              f"Valid Accuracy: {valid_accuracy:.4f} | Valid F1: {valid_f1:.4f}")
    
        # 追加: スケジューラのステップ（学習率を更新）
        scheduler.step()
    
        # Early stoppingのチェック
        early_stopping(valid_loss)
        if early_stopping.early_stop:
            print("Early stopping triggered")
            break
    
    # 最終モデルを保存
    model_save_path = cfg.env.model_save_dir / f"fold{fold}_epoch{cfg.exp.num_epochs - 1}.pth"
    torch.save(model.state_dict(), model_save_path)
    print(f"Model for fold {fold} saved to {model_save_path}")
    
    return model  # 保存したモデルを返す

In [None]:
# CPUとGPUのどちらを使うかを自動判定（CUDAが使える環境ならGPUを使う）
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# fold数を設定
folds = cfg.exp.num_folds

# 各foldで学習を行い、モデルを保存
for fold in range(folds):
    train_fold(fold, train_df, cfg, device)

## 推論・提出ファイル作成

In [None]:
class TestGasDataset(Dataset):
    def __init__(self, cfg: Config, df: pd.DataFrame, tokenizer):
        self.cfg = cfg
        self.df = df
        self.tokenizer = tokenizer
        
        # 画像変換（リサイズと正規化）
        self.transform = transforms.Compose([
            transforms.Resize(224),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                 std=[0.229, 0.224, 0.225])
        ])

    def __len__(self):
        return len(self.df)

    def __getitem__(self, index):
        row = self.df.iloc[index]

        # 画像の読み込みと前処理
        img_path = self.cfg.env.image_dir / row["image_path_uuid"]
        image = Image.open(img_path).convert("RGB")
        image = self.transform(image)

        # センサ値（MQ8, MQ5）をTensor化
        sensor = torch.tensor([row["MQ8"], row["MQ5"]], dtype=torch.float32)

        # テキスト（Caption）をトークナイズ
        caption = row["Caption"]
        text_input = self.tokenizer(
            caption,
            truncation=True,
            padding="max_length",
            max_length=64,
            return_tensors="pt"
        )
        input_ids = text_input["input_ids"].squeeze(0)      # (seq_len)
        attention_mask = text_input["attention_mask"].squeeze(0)

        return {
            "image": image,
            "sensor": sensor,
            "input_ids": input_ids,
            "attention_mask": attention_mask
        }



def run_inference(cfg: Config, device):
    print("\nStarting Inference")
    test_df = pd.read_csv(cfg.env.test_path)

    # トークナイザーの準備
    tokenizer = AutoTokenizer.from_pretrained(cfg.exp.text_model_name)

    # TestDatasetの準備（テキストも渡す）
    test_dataset = TestGasDataset(cfg, test_df, tokenizer)
    test_loader = DataLoader(
        test_dataset,
        batch_size=cfg.exp.batch_size,
        shuffle=False,
        num_workers=cfg.exp.num_workers,
        pin_memory=True
    )

    all_preds = []  # 各foldの予測を格納するリスト

    # 各foldで学習したモデルを使用して推論
    for fold in range(cfg.num_folds):
        model = GasModel(cfg).to(device)
        model_checkpoint = cfg.env.model_save_dir / f"fold{fold}_epoch{cfg.exp.num_epochs - 1}.pth"
        model.load_state_dict(torch.load(model_checkpoint, map_location=device, weights_only=True))
        model.eval()

        fold_preds = []  # 各foldの予測を格納
        with torch.no_grad():
            for batch in tqdm(test_loader, desc=f"Inference fold {fold}"):
                images = batch["image"].to(device)
                sensors = batch["sensor"].to(device)
                input_ids = batch["input_ids"].to(device)
                attention_mask = batch["attention_mask"].to(device)

                # モデルにデータを渡す
                outputs = model(images, sensors, input_ids, attention_mask)

                # ソフトマックスを適用して確率を計算
                batch_probs = torch.softmax(outputs, dim=1)
                fold_preds.extend(batch_probs.cpu().numpy())

        all_preds.append(np.array(fold_preds))  # 各foldの予測を保存

    # アンサンブル（平均化）
    final_preds = np.mean(all_preds, axis=0)  # 各foldの予測の平均を取る（アンサンブル）

    return final_preds

In [None]:
# 推論関数を呼び出して、各テストデータに対するクラスごとの確率を取得
probs = run_inference(cfg, device)
probs  # 全サンプル × 4クラス分の確率が入った配列

In [None]:
# 最も確率が高いクラスの番号を抽出（axis=1で行方向に処理）
predicted_labels = probs.argmax(axis=1)
predicted_labels  # 各サンプルの予測ラベル番号

In [None]:
# そのままでは提出できないので、対応する文字列に変換
LABEL2NAME

In [None]:
# sample_submission を copy して提出用データフレームを作成
submission_df = sample_submission_df.copy()

# 「Gas」列に予測したクラス名を入れる
submission_df['Gas'] = predicted_labels

# ラベル番号から文字列のクラス名に変換
submission_df['Gas'] = submission_df['Gas'].map(LABEL2NAME)

# 先頭の数行を確認
submission_df.head()

In [None]:
# csvファイルとして出力
#submission_df.to_csv('submission.csv', index=False)
submission_df.to_csv('/kaggle/working/submission.csv', index=False)