# データ分割

- サイトからデータをダウンロードしたディレクトリをDARA_DIR 
データ分割し，trian,valデータをコピーするディレクトリをOUTPUT_DIR　とする

- 分割比率は訓練：評価 = 8:2としている


In [2]:
import os
import random
import shutil
from glob import glob

# 元のデータディレクトリと保存先ディレクトリ
DATA_DIR = "C:/Users/sugie/PycharmProjects/Study/SIGNATE/train/train"
OUTPUT_DIR = "C:/Users/sugie/PycharmProjects/Study/SIGNATE/split_data2"

# 分割比率
train_ratio = 0.8
val_ratio = 0.2

# クラスのディレクトリを取得（not-hold, hold）
class_dirs = [d for d in os.listdir(DATA_DIR) if os.path.isdir(os.path.join(DATA_DIR, d))]

# 保存先ディレクトリの作成
for split in ['train', 'val']:
    for class_dir in class_dirs:
        os.makedirs(os.path.join(OUTPUT_DIR, split, class_dir), exist_ok=True)

# 各クラスごとに画像をランダムに分割し、対応するフォルダにコピー
for class_dir in class_dirs:
    image_paths = glob(os.path.join(DATA_DIR, class_dir, '*.jpg'))
    random.shuffle(image_paths)

    # データを分割
    train_idx = int(len(image_paths) * train_ratio)
    train_images = image_paths[:train_idx]
    val_images = image_paths[train_idx:]

    # 各セットに画像をコピー
    for image in train_images:
        shutil.copy(image, os.path.join(OUTPUT_DIR, 'train', class_dir))
    for image in val_images:
        shutil.copy(image, os.path.join(OUTPUT_DIR, 'val', class_dir))

print("画像の分割とコピーが完了しました。")


画像の分割とコピーが完了しました。


# データオーギュメンテーション
- RandomBrightnessContrast(ランダムに明るさ・コントラストを変更)
- RandomGamma(ランダムにガンマ変換をかける)
- ShiftScaleRotate(ランダムにアフィン変換をかける．平行移動，拡大縮小)

- これらのデータオーギュメンテーションの発生率を設定して，各画像に上記3つのデータオーギュメンテーションを施す．これを各画像に対して３回行うことにより，訓練データを４倍にする．
- 今回のデータオーギュメンテーションでは，反転，回転などの加工は行わなかった．理由→舞踊は形式通りの動きしかしないことを考慮すると，ダンスの形が変わるような回転，反転の加工は不要だと判断した．

In [None]:
import os
import glob
import numpy as np
from PIL import Image
import albumentations as albu
from tqdm import tqdm

# 元のデータディレクトリと保存先ディレクトリ
DATA_DIR = "C:/Users/sugie/PycharmProjects/Study/SIGNATE/split_data"
AUGMENTED_DATA_DIR = "C:/Users/sugie/PycharmProjects/Study/SIGNATE/augmented_data2"
CLASS_NAMES = ["not-hold", "hold"]

# オーギュメンテーションの設定
augmentation = albu.Compose([
    albu.RandomBrightnessContrast(brightness_limit=(-0.3, 0.3), contrast_limit=(-0.3, 0.3), p=0.3),
    albu.RandomGamma(gamma_limit=(70, 130), p=0.3),
    albu.ShiftScaleRotate(shift_limit=0.1, scale_limit=0.10, rotate_limit=0, p=0.5)  # 回転角度を0に設定
])

# 各クラスごとに増強データを生成して保存
num_augmentations = 3  # 1つの画像につき生成するバリエーションの数
for class_name in CLASS_NAMES:
    class_dir = os.path.join(DATA_DIR, "train", class_name)
    save_dir = os.path.join(AUGMENTED_DATA_DIR, class_name)
    os.makedirs(save_dir, exist_ok=True)

    image_paths = glob.glob(f"{class_dir}/*.jpg")
    for image_path in tqdm(image_paths, desc=f"Processing {class_name} images"):
        image = np.array(Image.open(image_path).convert('RGB'))
        base_name = os.path.basename(image_path).split('.')[0]

        # 指定した回数分オーギュメンテーションを適用し、保存
        for i in range(num_augmentations):
            augmented = augmentation(image=image)['image']
            augmented_image = Image.fromarray(augmented)
            augmented_image.save(os.path.join(save_dir, f"{base_name}_aug_{i}.jpg"))


# ResNet学習（ファインチューニング）
-
- 特徴量抽出部分の重みはImage-Netで学習済みのものを使用．その後の全結合層は重みを初期化して学習する．
- すべての重みを学習し直すため，処理がすごく重い．．．（全結合層だけ学習したほうが良いかも）．一応，今回のコンペでは上位の精度(F1-score)が0.999付近となっており，非常に高い精度が求められていた．また，訓練データとテストデータで同じ部屋，扇子，人物のデータが使用されているため，過学習気味の方が高い精度が出るのではと思った．そこで，全重みを学習して，本データに特化させたかった．

- 損失関数：CrossEntropyLoss
- optimizer：SGD

- 以降のカリキュラム学習・アンサンブルのために，各サンプルに対して['file_name', 'True_Label', 'Predicted_Label', 'Confidence']をcsv出力させる
- ここでConfidence値とは最終出力層(Softmax)におけるもっとも数値の高いノードの値である．

In [2]:
import os
import glob
import csv
import numpy as np
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision.models import resnet50
from sklearn.metrics import precision_score, recall_score, f1_score
import torchvision.transforms as transforms
import random

# シード値の設定
seed_value = 42
torch.manual_seed(seed_value)
torch.cuda.manual_seed(seed_value)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(seed_value)
random.seed(seed_value)

# データの格納先ディレクトリとクラス名
ORIGINAL_DATA_DIR = "C:/Users/sugie/PycharmProjects/Study/SIGNATE/split_data"
AUGMENTED_DATA_DIR = "C:/Users/sugie/PycharmProjects/Study/SIGNATE/augmented_data2"
CLASS_NAMES = ["not-hold", "hold"]
OUTPUT_CSV = "predictions_with_confidences_ResNet_aug.csv"
MODEL_PATH = "best_model_ResNet_aug.pth"

# デバイスの設定
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')


# カスタムデータセットクラス（元データと増強データを組み合わせる）
class MyDataset(Dataset):
    def __init__(self, original_data_dir, augmented_data_dir, split, class_names=CLASS_NAMES, transform=None):
        self.images = []
        self.labels = []
        self.class_names = class_names
        self.transform = transform if transform else transforms.Compose([transforms.ToTensor()])

        # 元のデータを追加
        for label, class_name in enumerate(class_names):
            original_class_dir = os.path.join(original_data_dir, split, class_name)
            original_image_paths = glob.glob(f"{original_class_dir}/*.jpg")
            self.images.extend(original_image_paths)
            self.labels.extend([label] * len(original_image_paths))

            # 増強データを追加
            augmented_class_dir = os.path.join(augmented_data_dir, class_name)
            augmented_image_paths = glob.glob(f"{augmented_class_dir}/*.jpg")
            self.images.extend(augmented_image_paths)
            self.labels.extend([label] * len(augmented_image_paths))

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = Image.open(self.images[idx]).convert('RGB')
        label = self.labels[idx]
        file_name = os.path.basename(self.images[idx])
        if self.transform:
            image = self.transform(image)
        return image, label, file_name


# データローダーの準備
def get_dataloaders(batch_size=8):
    train_dataset = MyDataset(ORIGINAL_DATA_DIR, AUGMENTED_DATA_DIR, split='train')
    val_dataset = MyDataset(ORIGINAL_DATA_DIR, AUGMENTED_DATA_DIR, split='val')  # 増強データは含めない
    test_dataset = MyDataset(ORIGINAL_DATA_DIR, AUGMENTED_DATA_DIR, split='test')  # 増強データは含めない

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=0)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=0)

    return train_loader, val_loader, test_loader

    # 訓練関数
from torch.cuda.amp import autocast, GradScaler
# モデルの訓練、評価、保存を行う関数
def train_model():
    train_loader, val_loader, test_loader = get_dataloaders()

    # モデルの設定
    model = resnet50(pretrained=True)
    model.fc = nn.Linear(2048, len(CLASS_NAMES))
    model = model.to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

    # 最小検証損失とそれに対応するモデルパラメータを保存
    best_val_loss = float('inf')
    best_model_state = None



    scaler = GradScaler()  # スケーラの設定

    def train(epoch):
        model.train()
        running_loss = 0.0
        for batch_idx, (data, target, _) in enumerate(train_loader):
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            with autocast():  # 混合精度モードの適用
                output = model(data)
                loss = criterion(output, target)
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
            running_loss += loss.item()
            if batch_idx % 100 == 0:
                print(f'Epoch {epoch} | Batch {batch_idx} | Loss {running_loss / (batch_idx + 1):.6f}')

    # 評価関数
    def evaluate(loader):
        model.eval()
        all_preds = []
        all_labels = []
        all_file_names = []
        total_loss = 0.0
        confidences = []
        with torch.no_grad():
            for data, target, file_name in loader:
                data, target = data.to(device), target.to(device)
                output = model(data)
                loss = criterion(output, target)
                total_loss += loss.item()
                probs = torch.softmax(output, dim=1)
                conf, preds = torch.max(probs, 1)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(target.cpu().numpy())
                all_file_names.extend(file_name)
                confidences.extend(conf.cpu().numpy())
        avg_loss = total_loss / len(loader)
        return all_labels, all_preds, all_file_names, confidences, avg_loss

    # 訓練プロセスの実行
    num_epochs = 8
    for epoch in range(num_epochs):
        train(epoch)
        _, _, _, _, val_loss = evaluate(val_loader)
        print(f'Epoch {epoch} Validation Loss: {val_loss:.6f}')
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_model_state = model.state_dict()

    # 最適なモデルの保存
    torch.save(best_model_state, MODEL_PATH)
    print(f"最適なモデルが保存されました (Validation Loss: {best_val_loss:.6f})")

    # テストデータでの評価とCSV出力
    model.load_state_dict(torch.load(MODEL_PATH))
    all_labels, all_predictions, all_file_names, confidences, _ = evaluate(test_loader)

    # 各クラスごとの精度
    num_classes = len(CLASS_NAMES)
    for i in range(num_classes):
        class_labels = np.array(all_labels) == i
        class_predictions = np.array(all_predictions) == i
        precision = precision_score(class_labels, class_predictions)
        recall = recall_score(class_labels, class_predictions)
        f1 = f1_score(class_labels, class_predictions)
        print(f'Class {i} - Precision: {precision:.6f}, Recall: {recall:.6f}, F1 Score: {f1:.6f}')

    # CSVファイルにファイル名、真のラベル、予測ラベル、確信度を保存
    with open(OUTPUT_CSV, mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['file_name', 'True_Label', 'Predicted_Label', 'Confidence'])
        for file_name, true_label, predicted_label, confidence in zip(all_file_names, all_labels, all_predictions,
                                                                      confidences):
            writer.writerow([file_name, true_label, predicted_label, confidence])
    print(f"予測結果が {OUTPUT_CSV} に保存されました。")


if __name__ == '__main__':
    # メイン実行
    train_model()


ModuleNotFoundError: No module named 'sklearn'

# カリキュラム学習
#### カリキュラム学習とは
学習データの難易度を徐々に増していくことにより，初期の簡単なデータで学習モデルの基盤を完成させ，後に難しいデータを学習させることにより，汎用性を向上させる，また難易度の高いデータに頑健なモデルを作成する学習方法．

- 今回のカリキュラム学習では学習データの難易度を上記で学習したReSNetの出力するConfidence値とする
- 訓練データを一度ReSNetモデルに入力し，Confidence値を算出する．その後，Confidence値の閾値によって訓練データを容易・困難の２種データセットにに分ける．
- また，ResNetのモデルで高いConfidence値だが不正解だったサンプルも困難データセットに入れる

- 簡単データローダーで４エポック，困難データローダーで８エポック学習する．（ここの学習エポック数は適正か不明．検討の余地あり）

- カリキュラム学習に使用したモデル，初めと同じReSNetモデル．ただし，特徴量抽出層はImage-Netで学習した重み．全結合層は初期化する．（初期学習モデルの重みは使用しない）

In [None]:
import os
import glob
import numpy as np
from PIL import Image
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, TensorDataset
from torchvision.models import resnet50
import torchvision.transforms as transforms
import random
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score  # 精度計算のために追加

# シード値の設定
seed_value = 42
torch.manual_seed(seed_value)
torch.cuda.manual_seed(seed_value)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(seed_value)
random.seed(seed_value)

# データの格納先ディレクトリとクラス名
ORIGINAL_DATA_DIR = "C:/Users/sugie/PycharmProjects/Study/SIGNATE/split_data"
AUGMENTED_DATA_DIR = "C:/Users/sugie/PycharmProjects/Study/SIGNATE/augmented_data2"
CLASS_NAMES = ["not-hold", "hold"]
MODEL_PATH = "best_model_ResNet_aug.pth"
BEST_CURRICULUM_MODEL_PATH = "best_curriculum_model_latest.pth"

# デバイスの設定
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')

# カスタムデータセットクラス
class MyDataset(Dataset):
    def __init__(self, original_data_dir, augmented_data_dir=None, split='train', class_names=CLASS_NAMES,
                 transform=None):
        self.images = []
        self.labels = []
        self.class_names = class_names
        self.transform = transform if transform else transforms.Compose([
            transforms.ToTensor()
        ])

        # データセットの読み込み
        for label, class_name in enumerate(class_names):
            class_dir = os.path.join(original_data_dir, split, class_name)
            image_paths = glob.glob(f"{class_dir}/*.jpg")
            self.images.extend(image_paths)
            self.labels.extend([label] * len(image_paths))

            # 増強データが指定されている場合、増強データも追加
            if augmented_data_dir and split == 'train':
                augmented_class_dir = os.path.join(augmented_data_dir, class_name)
                augmented_image_paths = glob.glob(f"{augmented_class_dir}/*.jpg")
                self.images.extend(augmented_image_paths)
                self.labels.extend([label] * len(augmented_image_paths))

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = Image.open(self.images[idx]).convert('RGB')
        label = self.labels[idx]
        file_name = os.path.basename(self.images[idx])
        if self.transform:
            image = self.transform(image)
        return image, label, file_name

# データローダーの準備
def get_dataloader(split='train', batch_size=8):
    if split == 'train':
        dataset = MyDataset(ORIGINAL_DATA_DIR, AUGMENTED_DATA_DIR, split=split)
    else:
        dataset = MyDataset(ORIGINAL_DATA_DIR, split=split)
    return DataLoader(dataset, batch_size=batch_size, shuffle=False, num_workers=0)

# カスタムデータローダーを作成する関数
class SubsetDataset(Dataset):
    def __init__(self, dataset, indices):
        self.dataset = dataset
        self.indices = indices

    def __len__(self):
        return len(self.indices)

    def __getitem__(self, idx):
        data_idx = self.indices[idx]
        image, label, _ = self.dataset[data_idx]
        return image, label

def get_custom_dataloader(indices, dataset, batch_size=8):
    subset = SubsetDataset(dataset, indices)
    return DataLoader(subset, batch_size=batch_size, shuffle=True)

# モデルの評価
def evaluate_model(model, loader):
    model.eval()
    all_labels = []
    all_preds = []
    confidences = []
    all_indices = []  # インデックスを収集
    with torch.no_grad():
        for batch_idx, (data, target, file_name) in enumerate(loader):
            data, target = data.to(device), target.to(device)
            output = model(data)
            probs = torch.softmax(output, dim=1)
            conf, preds = torch.max(probs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(target.cpu().numpy())
            confidences.extend(conf.cpu().numpy())
            # バッチ内のインデックスを取得
            batch_start = batch_idx * loader.batch_size
            batch_indices = [batch_start + i for i in range(len(data))]
            all_indices.extend(batch_indices)
    return all_labels, all_preds, confidences, all_indices

# 検証損失を計算
def calculate_val_loss(model, val_loader, criterion):
    model.eval()
    total_loss = 0.0
    with torch.no_grad():
        for data, target, _ in val_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss = criterion(output, target)
            total_loss += loss.item()
    return total_loss / len(val_loader)

# モデルの精度を計算
def evaluate_model_accuracy(model, loader):
    model.eval()
    all_labels = []
    all_preds = []
    with torch.no_grad():
        for data, target, _ in loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            preds = output.argmax(dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(target.cpu().numpy())
    accuracy = accuracy_score(all_labels, all_preds)
    return accuracy

# 難易度を取得し、簡単サンプルと難しいサンプルの数を均等にし、さらにクラスごとに均等にする
def get_difficulty_data(all_labels, all_predictions, confidences, all_indices, dataset):
    easy_samples = []
    hard_samples = []

    for idx, label, pred, conf in zip(all_indices, all_labels, all_predictions, confidences):
        if label == pred and conf >= 0.99999:
            easy_samples.append(idx)
        else:
            hard_samples.append(idx)

    # 簡単サンプルと難しいサンプルの数を揃える前にシャッフル
    print(f'Initial easy samples: {len(easy_samples)}, Initial hard samples: {len(hard_samples)}')
    random.shuffle(easy_samples)

    # 簡単サンプルの一部を難しいサンプルに移動し、数を同じにする
    while len(easy_samples) > len(hard_samples):
        hard_samples.append(easy_samples.pop())
    print(f'Balanced easy samples: {len(easy_samples)}, Balanced hard samples: {len(hard_samples)}')

    # クラスごとのサンプル数を均等にする関数
    def balance_classes(indices):
        class_0 = [idx for idx in indices if dataset.labels[idx] == 0]
        class_1 = [idx for idx in indices if dataset.labels[idx] == 1]
        min_class_samples = min(len(class_0), len(class_1))
        return class_0[:min_class_samples] + class_1[:min_class_samples]

    # クラスごとのサンプル数を均等にする
    easy_samples = balance_classes(easy_samples)
    hard_samples = balance_classes(hard_samples)

    print(f'Adjusted easy samples: {len(easy_samples)}, Adjusted hard samples: {len(hard_samples)}')
    return easy_samples, hard_samples

# 学習関数
def train(loader, epoch, model, criterion, optimizer, description=""):
    model.train()
    for batch_idx, (data, target) in enumerate(loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % 100 == 0:
            print(f'{description} Epoch {epoch} | Batch {batch_idx} | Loss {loss.item():.6f}')

# カリキュラム学習の関数
def curriculum_learning(easy_indices, hard_indices, train_dataset):
    # 検証用データローダー
    val_loader = get_dataloader(split='val')

    # 簡単サンプルと難しいサンプルからDataLoaderを作成
    easy_loader = get_custom_dataloader(easy_indices, train_dataset)
    hard_loader = get_custom_dataloader(hard_indices, train_dataset)

    # モデルのセットアップ（ImageNetで事前学習された重みを使用）
    model = resnet50(pretrained=True)
    model.fc = nn.Linear(2048, len(CLASS_NAMES))
    model = model.to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

    best_val_loss = float('inf')
    val_losses = []

    # 簡単サンプルからの学習
    print("Starting curriculum learning with easy samples")
    for epoch in range(4):
        train(easy_loader, epoch, model, criterion, optimizer, description="Easy")
        val_loss = calculate_val_loss(model, val_loader, criterion)
        val_accuracy = evaluate_model_accuracy(model, val_loader)
        val_losses.append(val_loss)
        print(f'Validation Loss after Easy Epoch {epoch}: {val_loss:.6f}, Accuracy: {val_accuracy:.4f}')
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), BEST_CURRICULUM_MODEL_PATH)

    # 難しいサンプルでの学習
    print("Continuing curriculum learning with hard samples")
    for epoch in range(8):
        train(hard_loader, epoch, model, criterion, optimizer, description="Hard")
        val_loss = calculate_val_loss(model, val_loader, criterion)
        val_accuracy = evaluate_model_accuracy(model, val_loader)
        val_losses.append(val_loss)
        print(f'Validation Loss after Hard Epoch {epoch}: {val_loss:.6f}, Accuracy: {val_accuracy:.4f}')
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), BEST_CURRICULUM_MODEL_PATH)

    # 損失の推移をプロット
    plt.plot(val_losses, label="Validation Loss")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()
    plt.title("Validation Loss during Curriculum Learning")
    plt.show()

if __name__ == '__main__':
    # モデルの読み込み（あなたのデータセットで学習済みの重みを使用）
    model = resnet50(pretrained=True)
    model.fc = nn.Linear(2048, len(CLASS_NAMES))
    model.load_state_dict(torch.load(MODEL_PATH))  # あなたのモデルの重みをロード
    model = model.to(device)

    # データセットの準備
    train_dataset = MyDataset(ORIGINAL_DATA_DIR, AUGMENTED_DATA_DIR, split='train')
    train_loader = DataLoader(train_dataset, batch_size=8, shuffle=False, num_workers=0)

    # 初期モデルで難易度判定のための予測とインデックスを取得
    all_labels, all_predictions, confidences, all_indices = evaluate_model(model, train_loader)

    # 難易度に基づくデータ分割
    easy_indices, hard_indices = get_difficulty_data(all_labels, all_predictions, confidences, all_indices, train_dataset)

    # カリキュラム学習の実行（新しいモデルを使用）
    curriculum_learning(easy_indices, hard_indices, train_dataset)

# ReSNetモデル or カリキュラムReSNetモデルをテストデータに適用

In [None]:
import os
import glob
import csv
import numpy as np
from PIL import Image
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision.models import resnet50
import torchvision.transforms as transforms

# 学習済みモデルのパスとクラス名(MODEL_PATHを変更させて，使用モデル（ReSNet,or,curriculum ResNet）を選択)
MODEL_PATH = "C:/Users/sugie/PycharmProjects/Study/SIGNATE/best_curriculum_model_latest.pth"
CLASS_NAMES = ["not-hold", "hold"]
TEST_DATA_DIR = "C:/Users/sugie/PycharmProjects/Study/SIGNATE/test/test"
OUTPUT_CSV = "test_predictions_with_confidence_curriculum_latest.csv"

# デバイスの設定
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# カスタムデータセットクラス
class TestDataset(Dataset):
    def __init__(self, test_data_dir, transform=None):
        self.images = glob.glob(f"{test_data_dir}/*.jpg")
        self.transform = transform if transform else transforms.Compose([transforms.ToTensor()])

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image_path = self.images[idx]
        image = Image.open(image_path).convert('RGB')
        file_name = os.path.basename(image_path)
        if self.transform:
            image = self.transform(image)
        return image, file_name

# モデルの準備
model = resnet50(pretrained=False)
model.fc = nn.Linear(2048, len(CLASS_NAMES))
model.load_state_dict(torch.load(MODEL_PATH))
model = model.to(device)
model.eval()

# データローダーの準備（リサイズは省略）
test_dataset = TestDataset(TEST_DATA_DIR)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False, num_workers=0)

# 予測とCSV出力
with open(OUTPUT_CSV, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['file_name', 'Predicted_Label', 'Confidence'])

    with torch.no_grad():
        for images, file_names in test_loader:
            images = images.to(device)
            outputs = model(images)
            probabilities = F.softmax(outputs, dim=1)  # 各クラスの確信度を計算
            confidences, predicted_labels = torch.max(probabilities, 1)  # 最大の確信度とラベル

            for file_name, predicted_label, confidence in zip(file_names, predicted_labels.cpu().numpy(), confidences.cpu().numpy()):
                writer.writerow([file_name, predicted_label, confidence])

print(f"予測結果が {OUTPUT_CSV} に保存されました。")


# アンサンブル学習

- ReSNetモデルをベースモデルとして，Confidenceの閾値を基にルールベースで，ReSNetのConfidence値が低いサンプルにはカリキュラムReSNetの結果を適用するようにした．
（これをアンサンブル学習といっていいか不明．結果をアンサンブルしただけ）
- 本当はスタッキング学習などをしたかったが，ReSNetモデルで既にF1-scoreが0.998程度であったため，不正解データが極端に少なかった．よってスタッキング学習のメタ学習が上手くいかないと考えたため，ルールベースのアンサンブル学習を採用した

In [None]:
# ファイルパス
resnet_csv_path = "C:/Users/sugie/PycharmProjects/Study/SIGNATE/test/test_predictions_with_confidence.csv"
gcn_csv_path = "C:/Users/sugie/PycharmProjects/Study/SIGNATE/test/test_predictions_with_confidence_curriculum_latest.csv"
failed_csv_path = "C:/Users/sugie/PycharmProjects/Study/SIGNATE/test/failed_images.csv" # GCNで予測できなかったファイルのリスト
output_csv_path = "amsumble_result_ResNet_curriculum_latest_0.98_0.95.csv"
gcn_selected_output_path = "curriculum_selected_samples_latest_0.98_0.95.csv"

# 閾値の設定
RESNET_CONFIDENCE_THRESHOLD = 0.98
GCN_CONFIDENCE_THRESHOLD = 0.95

# ResNetとGCNの結果を読み込む
resnet_df = pd.read_csv(resnet_csv_path)
gcn_df = pd.read_csv(gcn_csv_path)
failed_df = pd.read_csv(failed_csv_path)  # GCNで予測に失敗した画像のリスト

# GCNで予測できなかったファイル名のリスト
failed_files = set(failed_df['file_name'].tolist())

# 結果を統合
final_predictions = []

for _, resnet_row in resnet_df.iterrows():
    file_name = resnet_row['file_name']
    resnet_label = resnet_row['Predicted_Label']
    resnet_confidence = resnet_row['Confidence']

    # GCNの予測結果を取得
    if file_name in failed_files:
        # GCNで予測に失敗した場合はResNetのラベルを使用
        final_label = resnet_label
        chosen_model = "ResNet"
    else:
        # GCNの結果が存在する場合
        gcn_row = gcn_df[gcn_df['file_name'] == file_name]
        if gcn_row.empty:
            print(f"Warning: {file_name} is missing in GCN predictions but not in failed list.")
            final_label = resnet_label
            chosen_model = "ResNet"
        else:
            gcn_label = gcn_row['Predicted_Label'].values[0]
            gcn_confidence = gcn_row['Confidence'].values[0]

            # フロー図に基づいた最終予測ラベルの決定
            if resnet_confidence >= RESNET_CONFIDENCE_THRESHOLD:
                final_label = resnet_label
                chosen_model = "ResNet"
            else:
                if gcn_confidence >= GCN_CONFIDENCE_THRESHOLD:
                    final_label = gcn_label
                    chosen_model = "GCN"
                else:
                    final_label = resnet_label
                    chosen_model = "ResNet"

    # 最終結果をリストに追加
    final_predictions.append({
        "file_name": file_name,
        "ResNet_Label": resnet_label,
        "ResNet_Confidence": resnet_confidence,
        "GCN_Label": gcn_label if not gcn_row.empty else None,
        "GCN_Confidence": gcn_confidence if not gcn_row.empty else None,
        "Final_Predicted_Label": final_label,
        "Chosen_Model": chosen_model
    })

# データフレームに変換
final_df = pd.DataFrame(final_predictions)

# GCNが選択されたサンプルのみを抽出
gcn_selected_df = final_df[final_df['Chosen_Model'] == "GCN"]

# 最終結果をCSVに保存
final_df.to_csv(output_csv_path, index=False)
gcn_selected_df.to_csv(gcn_selected_output_path, index=False)

print(f"最終予測結果が {output_csv_path} に保存されました。")
print(f"GCNが選択されたサンプルのみの結果が {gcn_selected_output_path} に保存されました。")
