<a href="https://colab.research.google.com/github/reuen-mayer/dl_lecture_competition_pub/blob/VQA-competition/final_assignment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### ドライブのマウント

In [None]:
import re
import random
import time
from statistics import mode
from PIL import Image
import numpy as np
import pandas
import pandas as pd
import torch
import torch.nn as nn
import torchvision
from torchvision import transforms
import os
from transformers import BertTokenizer, BertModel
import torchvision.models as models
from torch.cuda.amp import GradScaler, autocast


def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False


def process_text(text):
    # lowercase
    text = text.lower()

    # 数詞を数字に変換
    num_word_to_digit = {
        'zero': '0', 'one': '1', 'two': '2', 'three': '3', 'four': '4',
        'five': '5', 'six': '6', 'seven': '7', 'eight': '8', 'nine': '9',
        'ten': '10'
    }
    for word, digit in num_word_to_digit.items():
        text = text.replace(word, digit)

    # 小数点のピリオドを削除
    text = re.sub(r'(?<!\d)\.(?!\d)', '', text)

    # 冠詞の削除
    text = re.sub(r'\b(a|an|the)\b', '', text)

    # 短縮形のカンマの追加
    contractions = {
        "dont": "don't", "isnt": "isn't", "arent": "aren't", "wont": "won't",
        "cant": "can't", "wouldnt": "wouldn't", "couldnt": "couldn't"
    }
    for contraction, correct in contractions.items():
        text = text.replace(contraction, correct)

    # 句読点をスペースに変換
    text = re.sub(r"[^\w\s':]", ' ', text)

    # 句読点をスペースに変換
    text = re.sub(r'\s+,', ',', text)

    # 連続するスペースを1つに変換
    text = re.sub(r'\s+', ' ', text).strip()

    return text

# 1. データローダーの作成
class VQADataset(torch.utils.data.Dataset):
    def __init__(self, df_path, image_dir, transform=None, answer=True):
        self.transform = transform  # 画像の前処理
        self.image_dir = image_dir  # 画像ファイルのディレクトリ
        self.df = pandas.read_json(df_path)  # 画像ファイルのパス，question, answerを持つDataFrame
        self.answer = answer

        answer_id = pd.read_csv("/workspace/dl_lecture_competition_pub/data/class_mapping.csv")#class-mappingを使用

        self.answer2idx = dict(zip(answer_id["answer"], answer_id["class_id"]))
        self.idx2answer = {v: k for k, v in self.answer2idx.items()}
        #self.question2idx = {}#BERTを使わないときは使用
        #self.idx2question = {}
        """
        for question in self.df["question"]:#BERTを使わないときはこちらを使用
            question = process_text(question)
            words = question.split(" ")
            for word in words:
                if word not in self.question2idx:
                    self.question2idx[word] = len(self.question2idx)
        self.idx2question = {v: k for k, v in self.question2idx.items()}  # 逆変換用の辞書(question)
        """
        if self.answer:
            for answers in self.df["answers"]:
                for answer in answers:
                    word = answer["answer"]
                    word = process_text(word)
                    if word not in self.answer2idx:
                        self.answer2idx[word] = len(self.answer2idx)
            self.idx2answer = {v: k for k, v in self.answer2idx.items()}
    def update_dict(self, dataset):
        #self.question2idx = dataset.question2idx  #BERTを使わないときはこっちを使う
        #self.idx2question = dataset.idx2question
        self.answer2idx = dataset.answer2idx
        self.idx2answer = dataset.idx2answer

    def __getitem__(self, idx):
        image = Image.open(f"{self.image_dir}/{self.df['image'][idx]}")
        image = self.transform(image) # 画像の前処理の実行
        #question = np.zeros(len(self.idx2question) + 1)  # 未知語用の要素を追加　BERTを使わないときはこっちを使用
        #question_words = self.df["question"][idx].split(" ")
        question = self.df["question"][idx]
        """
        for word in question_words:#BERTを使わないためこちらを使用
            try:
                question[self.question2idx[word]] = 1  # one-hot表現に変換
            except KeyError:
                question[-1] = 1  # 未知語
        """


        if self.answer:
            answers = [self.answer2idx[process_text(answer["answer"])] for answer in self.df["answers"][idx]]
            mode_answer_idx = mode(answers)
            return image, question, torch.Tensor(answers), int(mode_answer_idx)#BERT用
            #return image, torch.Tensor(question), torch.Tensor(answers), int(mode_answer_idx)#非BERT用

        else:
            #return image, torch.Tensor(question)#非BERT用
            return image, question#BERT用

    def __len__(self):
        return len(self.df)

# 2. 評価指標の実装
# 簡単にするならBCEを利用する
def VQA_criterion(batch_pred: torch.Tensor, batch_answers: torch.Tensor):
#def VQA_criterion(batch_pred, batch_answers):
    total_acc = 0.

    for pred, answers in zip(batch_pred, batch_answers):
        acc = 0.
        for i in range(len(answers)):
            num_match = 0
            for j in range(len(answers)):
                if i == j:
                    continue
                if pred == answers[j]:
                    num_match += 1
            acc += min(num_match / 3, 1)
        total_acc += acc / 10

    return total_acc / len(batch_pred)


# 3. モデルのの実装
# ResNetを利用できるようにしておく
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_channels: int, out_channels: int, stride: int = 1):
        super().__init__()

        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        residual = x
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))

        out += self.shortcut(residual)
        out = self.relu(out)

        return out


class BottleneckBlock(nn.Module):
    expansion = 4

    def __init__(self, in_channels: int, out_channels: int, stride: int = 1):
        super().__init__()

        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.conv3 = nn.Conv2d(out_channels, out_channels * self.expansion, kernel_size=1, stride=1)
        self.bn3 = nn.BatchNorm2d(out_channels * self.expansion)
        self.relu = nn.ReLU(inplace=True)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels * self.expansion:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels * self.expansion, kernel_size=1, stride=stride),
                nn.BatchNorm2d(out_channels * self.expansion)
            )

    def forward(self, x):
        residual = x
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.relu(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))

        out += self.shortcut(residual)
        out = self.relu(out)

        return out


class ResNet(nn.Module):
    def __init__(self, block, layers):
        super().__init__()
        self.in_channels = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(block, layers[0], 64)
        self.layer2 = self._make_layer(block, layers[1], 128, stride=2)
        self.layer3 = self._make_layer(block, layers[2], 256, stride=2)
        self.layer4 = self._make_layer(block, layers[3], 512, stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, 512)


    def _make_layer(self, block, blocks, out_channels, stride=1):
        layers = []
        layers.append(block(self.in_channels, out_channels, stride))
        self.in_channels = out_channels * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.relu(self.bn1(self.conv1(x)))
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)

        return x


def ResNet18():
    return ResNet(BasicBlock, [2, 2, 2, 2])


def ResNet50():
    return ResNet(BottleneckBlock, [3, 4, 6, 3])

def ResNet101():
    return ResNet(BottleneckBlock, [3, 4, 23, 3])

class VQAModel(nn.Module):
    #ef __init__(self, vocab_size: int, n_answer: int):#非BERT用
    def __init__(self, n_answer: int):#BERT用
        super().__init__()
        self.resnet = ResNet101()
        #self.text_encoder = nn.Linear(vocab_size, 512)#非BERT用
        self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')#BERTのtokenizer
        self.bert_model = BertModel.from_pretrained('bert-base-uncased', torch_dtype=torch.float32, attn_implementation="sdpa")#BERTのmodel
        for param in self.bert_model.parameters():
            param.requires_grad = False

        self.fc = nn.Sequential(
            nn.Linear(768+512, 512),#BERTのサイズは768
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),#Dropoutを追加
            nn.Linear(512, n_answer)
        )

    def forward(self, image, question):
        N = image.shape[0]
        image_feature = self.resnet(image)  # 画像の特徴量
        assert image_feature.shape == (N, 512)
        with torch.no_grad():
            question = self.tokenizer(question, truncation=True, padding=True, return_tensors="pt",).to(image.device)
            question_feature = self.bert_model(**question).last_hidden_state[:, 0, :]
            assert question_feature.shape == (N, 768)
        #question_feature = self.text_encoder(question)  # テキストの特徴量※非BERT用

        x = torch.cat([image_feature, question_feature], dim=1)
        x = self.fc(x)

        return x

def train(model, dataloader, optimizer, criterion, device, scaler):#scalerを追加
    model.train()

    total_loss = 0
    total_acc = 0
    simple_acc = 0

    start = time.time()
    for image, question, answers, mode_answer in dataloader:
        image, answers, mode_answer = \
            image.to(device, non_blocking=True), answers.to(device, non_blocking=True), mode_answer.to(device, non_blocking=True)

        pred = model(image, question)
        loss = criterion(pred, mode_answer.squeeze())

        optimizer.zero_grad()
        with autocast():#autocastの使用
            pred = model(image, question)
            loss = criterion(pred, mode_answer.squeeze())

        scaler.scale(loss).backward()#scalerの使用
        scaler.step(optimizer)
        scaler.update()

        total_loss += loss.item()
        total_acc += VQA_criterion(pred.argmax(1), answers)  # VQA accuracy
        simple_acc += (pred.argmax(1) == mode_answer).float().mean().item()  # simple accuracy

    return total_loss / len(dataloader), total_acc / len(dataloader), simple_acc / len(dataloader), time.time() - start


def eval(model, dataloader, optimizer, criterion, device):
    model.eval()

    total_loss = 0
    total_acc = 0
    simple_acc = 0

    start = time.time()
#    for image, question, answers, mode_answer in dataloader:
#        image, question, answer, mode_answer = \
#            image.to(device), question.to(device), answers.to(device), mode_answer.to(device)
    with torch.no_grad():
        for image, question, answers, mode_answer in dataloader:
            image, answers, mode_answer = \
                image.to(device, non_blocking=True), answers.to(device, non_blocking=True), mode_answer.to(device, non_blocking=True)
            with autocast():#autocastの使用
                pred = model(image, question)
                loss = criterion(pred, mode_answer.squeeze())

        total_loss += loss.item()
        total_acc += VQA_criterion(pred.argmax(1), answers)  # VQA accuracy
        simple_acc += (pred.argmax(1) == mode_answer).mean().item()  # simple accuracy

    return total_loss / len(dataloader), total_acc / len(dataloader), simple_acc / len(dataloader), time.time() - start

def main():
    # deviceの設定
    set_seed(42)
    device = "cuda" if torch.cuda.is_available() else "cpu"


    # dataloader / model
    transform = transforms.Compose([
        transforms.Resize((224, 224)),#Resize以外のデータ拡張を以下で行う
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(10),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])
    train_dataset = VQADataset(df_path="/workspace/dl_lecture_competition_pub/data/train.json", image_dir="/workspace/dl_lecture_competition_pub/train", transform=transform)
    test_dataset = VQADataset(df_path="/workspace/dl_lecture_competition_pub/data/valid.json", image_dir="/workspace/dl_lecture_competition_pub/valid", transform=transform, answer=False)
    test_dataset.update_dict(train_dataset)

    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True, pin_memory=True)
    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=1, shuffle=False, pin_memory=True)

    #model = VQAModel(vocab_size=len(train_dataset.question2idx)+1, n_answer=len(train_dataset.answer2idx)).to(device)#非BERT用
    model = VQAModel(n_answer=len(train_dataset.answer2idx)+1).to(device, non_blocking=True)#BERT用

    # optimizer / criterion
    num_epoch = 10
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)
    scaler = GradScaler()

    # train model
    for epoch in range(num_epoch):
        train_loss, train_acc, train_simple_acc, train_time = train(model, train_loader, optimizer, criterion, device, scaler)
        print(f"【{epoch + 1}/{num_epoch}】\n"
              f"train time: {train_time:.2f} [s]\n"
              f"train loss: {train_loss:.4f}\n"
              f"train acc: {train_acc:.4f}\n"
              f"train simple acc: {train_simple_acc:.4f}")

    # 提出用ファイルの作成
    model.eval()
    submission = []
    for image, question in test_loader:
        #image, question = image.to(device), question.to(device)
        image = image.to(device)
        #inputs = model.tokenizer(question, return_tensors='pt', truncation=True, padding=True)
        #inputs = {k: v.to(device) for k, v in inputs.items()}

        #with torch.no_grad(), autocast():
        #    outputs = model.bert_model(**inputs)
        #    question_feature = outputs.last_hidden_state[:, 0, :].to(device)
        #    image_feature = model.resnet(image).to(device)
        #    x = torch.cat([image_feature, question_feature], dim=1)
        #    pred = model.fc(x)
        pred = model(image, question)
        pred = pred.argmax(1).cpu().item()
        submission.append(pred)

    submission = [train_dataset.idx2answer[id] for id in submission]
    submission = np.array(submission)
    torch.save(model.state_dict(), "model.pth")
    np.save("submission.npy", submission)

if __name__ == "__main__":
    main()

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!unzip "/content/drive/MyDrive/Colab Notebooks/DL2024S/Final/train.zip"

[1;30;43mストリーミング出力は最後の 5000 行に切り捨てられました。[0m
  inflating: train/train_18210.jpg   
  inflating: train/train_11980.jpg   
  inflating: train/train_04643.jpg   
  inflating: train/train_09325.jpg   
  inflating: train/train_12287.jpg   
  inflating: train/train_12545.jpg   
  inflating: train/train_02716.jpg   
  inflating: train/train_02614.jpg   
  inflating: train/train_03288.jpg   
  inflating: train/train_17562.jpg   
  inflating: train/train_17932.jpg   
  inflating: train/train_07305.jpg   
  inflating: train/train_10279.jpg   
  inflating: train/train_15986.jpg   
  inflating: train/train_16588.jpg   
  inflating: train/train_17370.jpg   
  inflating: train/train_14211.jpg   
  inflating: train/train_03553.jpg   
  inflating: train/train_17934.jpg   
  inflating: train/train_11471.jpg   
  inflating: train/train_02841.jpg   
  inflating: train/train_05577.jpg   
  inflating: train/train_12246.jpg   
  inflating: train/train_00710.jpg   
  inflating: train/train_06372.jpg   
  in

In [None]:
from google.colab import files
files.upload()

KeyboardInterrupt: 

In [None]:
!unzip "/content/drive/MyDrive/Colab Notebooks/DL2024S/Final/valid.zip"

Archive:  /content/drive/MyDrive/Colab Notebooks/DL2024S/Final/valid.zip


In [None]:
import os
path = "/content/train"
files = os.listdir(path)
count = len(files)
print(count)

19873


In [None]:
import os
path = "/content/valid"
files2 = os.listdir(path)
count2 = len(files2)
print(count2)

4969


In [None]:
%cd /content/drive/MyDrive/Colab Notebooks/DL2024S/Final/

/content/drive/MyDrive/Colab Notebooks/DL2024S/Final


In [None]:
!git clone https://github.com/reuen-mayer/dl_lecture_competition_pub.git

fatal: destination path 'dl_lecture_competition_pub' already exists and is not an empty directory.


In [None]:
!git config --global user.email "scarletnolight@gmail.com"

In [None]:
!git config --global user.name "かんなづき"

In [None]:
%pwd

'/content/drive/MyDrive/Colab Notebooks/DL2024S/Final'

In [None]:
!git config --global init.defaultBranch VQA-competition

In [None]:
%cd dl_lecture_competition_pub

/content/drive/MyDrive/Colab Notebooks/DL2024S/Final/dl_lecture_competition_pub


In [None]:
!git checkout VQA-competition

M	main.py
M	requirements.txt
Already on 'VQA-competition'
Your branch is up to date with 'origin/VQA-competition'.


In [None]:
!pip install -r requirements.txt

Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch->-r requirements.txt (line 1))
  Using cached nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (23.7 MB)
Collecting nvidia-cuda-runtime-cu12==12.1.105 (from torch->-r requirements.txt (line 1))
  Using cached nvidia_cuda_runtime_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (823 kB)
Collecting nvidia-cuda-cupti-cu12==12.1.105 (from torch->-r requirements.txt (line 1))
  Using cached nvidia_cuda_cupti_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (14.1 MB)
Collecting nvidia-cudnn-cu12==8.9.2.26 (from torch->-r requirements.txt (line 1))
  Using cached nvidia_cudnn_cu12-8.9.2.26-py3-none-manylinux1_x86_64.whl (731.7 MB)
Collecting nvidia-cublas-cu12==12.1.3.1 (from torch->-r requirements.txt (line 1))
  Using cached nvidia_cublas_cu12-12.1.3.1-py3-none-manylinux1_x86_64.whl (410.6 MB)
Collecting nvidia-cufft-cu12==11.0.2.54 (from torch->-r requirements.txt (line 1))
  Using cached nvidia_cufft_cu12-11.0.2.54-py3-none-man

In [None]:
!ls -la

total 135580
drwx------ 2 root root      4096 Jun 26 22:38 configs
drwx------ 2 root root      4096 Jun 26 22:38 data
-rw------- 1 root root        36 Jun 26 22:38 description.txt
-rw------- 1 root root       726 Jun 26 22:38 Dockerfile
drwx------ 8 root root      4096 Jul  2 15:18 .git
-rw------- 1 root root     14089 Jul  1 15:31 main.py
-rw------- 1 root root 138543578 Jun 28 12:57 model.pth
drwx------ 2 root root      4096 Jun 26 22:38 notebooks
-rw------- 1 root root      4691 Jun 26 22:38 README.md
-rw------- 1 root root        36 Jun 26 22:38 requirements.txt
drwx------ 3 root root      4096 Jun 26 22:38 src
-rw------- 1 root root    238640 Jun 28 12:57 submission.npy
drwx------ 2 root root      4096 Jul  1 14:51 train
-rw------- 1 root root       550 Jun 26 22:38 train.py
drwx------ 2 root root      4096 May 22 10:40 valid


In [None]:
!python main.py

tokenizer_config.json:   0% 0.00/48.0 [00:00<?, ?B/s]tokenizer_config.json: 100% 48.0/48.0 [00:00<00:00, 213kB/s]
vocab.txt: 100% 232k/232k [00:00<00:00, 4.80MB/s]
tokenizer.json: 100% 466k/466k [00:00<00:00, 12.3MB/s]
config.json: 100% 570/570 [00:00<00:00, 2.95MB/s]
model.safetensors: 100% 440M/440M [00:03<00:00, 138MB/s]
Image feature shape: torch.Size([64, 512])
Question feature shape: torch.Size([64, 512])
^C


In [None]:
python3 main.py

SyntaxError: invalid syntax (<ipython-input-18-c8e836b0d607>, line 1)

### データの読み込み
- この部分は修正しないでください．

### データローダの準備  

### 自己教師あり学習の実装
- 初期の形式はMAEを利用することを想定していますが，他の自己教師あり学習を利用していただいて構いません．   

In [None]:
!pip install einops

Collecting einops
  Downloading einops-0.8.0-py3-none-any.whl (43 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/43.2 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.2/43.2 kB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: einops
Successfully installed einops-0.8.0


In [None]:
import math
import random

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import datasets, transforms
from einops.layers.torch import Rearrange
from einops import rearrange

### 事前学習（自己教師あり学習）

In [None]:
import re
import random
import time
from statistics import mode

from PIL import Image
import numpy as np
import pandas
import torch
import torch.nn as nn
import torchvision
from torchvision import transforms


def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False


def process_text(text):
    # lowercase
    text = text.lower()

    # 数詞を数字に変換
    num_word_to_digit = {
        'zero': '0', 'one': '1', 'two': '2', 'three': '3', 'four': '4',
        'five': '5', 'six': '6', 'seven': '7', 'eight': '8', 'nine': '9',
        'ten': '10'
    }
    for word, digit in num_word_to_digit.items():
        text = text.replace(word, digit)

    # 小数点のピリオドを削除
    text = re.sub(r'(?<!\d)\.(?!\d)', '', text)

    # 冠詞の削除
    text = re.sub(r'\b(a|an|the)\b', '', text)

    # 短縮形のカンマの追加
    contractions = {
        "dont": "don't", "isnt": "isn't", "arent": "aren't", "wont": "won't",
        "cant": "can't", "wouldnt": "wouldn't", "couldnt": "couldn't"
    }
    for contraction, correct in contractions.items():
        text = text.replace(contraction, correct)

    # 句読点をスペースに変換
    text = re.sub(r"[^\w\s':]", ' ', text)

    # 句読点をスペースに変換
    text = re.sub(r'\s+,', ',', text)

    # 連続するスペースを1つに変換
    text = re.sub(r'\s+', ' ', text).strip()

    return text







In [None]:
seed: 42
batch_size: 128
num_epoch: 20
learning_rate: 0.001
weight_decay: 1e-5
train_data:
  df_path: "drive/MyDrive/Colab Notebooks/DL2024S/Final/train.json"
  image_dir: "drive/MyDrive/Colab Notebooks/DL2024S/Final/train"
valid_data:
  df_path: "drive/MyDrive/Colab Notebooks/DL2024S/Final/valid.json"
  image_dir: "drive/MyDrive/Colab Notebooks/DL2024S/Final/valid"

In [None]:
# 1. データローダーの作成

class VQADataset(torch.utils.data.Dataset):
    def __init__(self, df_path, image_dir, transform=None, answer=True):
        self.transform = transform  # 画像の前処理
        self.image_dir = image_dir  # 画像ファイルのディレクトリ
        self.df = pandas.read_json(df_path)  # 画像ファイルのパス，question, answerを持つDataFrame
        self.answer = answer

        # question / answerの辞書を作成
        self.question2idx = {}
        self.answer2idx = {}
        self.idx2question = {}
        self.idx2answer = {}

        # 質問文に含まれる単語を辞書に追加
        for question in self.df["question"]:
            question = process_text(question)
            words = question.split(" ")
            for word in words:
                if word not in self.question2idx:
                    self.question2idx[word] = len(self.question2idx)
        self.idx2question = {v: k for k, v in self.question2idx.items()}  # 逆変換用の辞書(question)

        if self.answer:
            # 回答に含まれる単語を辞書に追加
            for answers in self.df["answers"]:
                for answer in answers:
                    word = answer["answer"]
                    word = process_text(word)
                    if word not in self.answer2idx:
                        self.answer2idx[word] = len(self.answer2idx)
            self.idx2answer = {v: k for k, v in self.answer2idx.items()}  # 逆変換用の辞書(answer)

    def update_dict(self, dataset):
        """
        検証用データ，テストデータの辞書を訓練データの辞書に更新する．

        Parameters
        ----------
        dataset : Dataset
            訓練データのDataset
        """
        self.question2idx = dataset.question2idx
        self.answer2idx = dataset.answer2idx
        self.idx2question = dataset.idx2question
        self.idx2answer = dataset.idx2answer

    def __getitem__(self, idx):
        """
        対応するidxのデータ（画像，質問，回答）を取得．

        Parameters
        ----------
        idx : int
            取得するデータのインデックス

        Returns
        -------
        image : torch.Tensor  (C, H, W)
            画像データ
        question : torch.Tensor  (vocab_size)
            質問文をone-hot表現に変換したもの
        answers : torch.Tensor  (n_answer)
            10人の回答者の回答のid
        mode_answer_idx : torch.Tensor  (1)
            10人の回答者の回答の中で最頻値の回答のid
        """
        image = Image.open(f"{self.image_dir}/{self.df['image'][idx]}")
        image = self.transform(image)
        question = np.zeros(len(self.idx2question) + 1)  # 未知語用の要素を追加
        question_words = self.df["question"][idx].split(" ")
        for word in question_words:
            try:
                question[self.question2idx[word]] = 1  # one-hot表現に変換
            except KeyError:
                question[-1] = 1  # 未知語

        if self.answer:
            answers = [self.answer2idx[process_text(answer["answer"])] for answer in self.df["answers"][idx]]
            mode_answer_idx = mode(answers)  # 最頻値を取得（正解ラベル）

            return image, torch.Tensor(question), torch.Tensor(answers), int(mode_answer_idx)

        else:
            return image, torch.Tensor(question)

    def __len__(self):
        return len(self.df)


In [None]:
# 2. 評価指標の実装
# 簡単にするならBCEを利用する
def VQA_criterion(batch_pred: torch.Tensor, batch_answers: torch.Tensor):
    total_acc = 0.

    for pred, answers in zip(batch_pred, batch_answers):
        acc = 0.
        for i in range(len(answers)):
            num_match = 0
            for j in range(len(answers)):
                if i == j:
                    continue
                if pred == answers[j]:
                    num_match += 1
            acc += min(num_match / 3, 1)
        total_acc += acc / 10

    return total_acc / len(batch_pred)


In [None]:


# 3. モデルのの実装
# ResNetを利用できるようにしておく
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_channels: int, out_channels: int, stride: int = 1):
        super().__init__()

        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        residual = x
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))

        out += self.shortcut(residual)
        out = self.relu(out)

        return out


class BottleneckBlock(nn.Module):
    expansion = 4

    def __init__(self, in_channels: int, out_channels: int, stride: int = 1):
        super().__init__()

        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.conv3 = nn.Conv2d(out_channels, out_channels * self.expansion, kernel_size=1, stride=1)
        self.bn3 = nn.BatchNorm2d(out_channels * self.expansion)
        self.relu = nn.ReLU(inplace=True)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels * self.expansion:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels * self.expansion, kernel_size=1, stride=stride),
                nn.BatchNorm2d(out_channels * self.expansion)
            )

    def forward(self, x):
        residual = x
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.relu(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))

        out += self.shortcut(residual)
        out = self.relu(out)

        return out


class ResNet(nn.Module):
    def __init__(self, block, layers):
        super().__init__()
        self.in_channels = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(block, layers[0], 64)
        self.layer2 = self._make_layer(block, layers[1], 128, stride=2)
        self.layer3 = self._make_layer(block, layers[2], 256, stride=2)
        self.layer4 = self._make_layer(block, layers[3], 512, stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, 512)

    def _make_layer(self, block, blocks, out_channels, stride=1):
        layers = []
        layers.append(block(self.in_channels, out_channels, stride))
        self.in_channels = out_channels * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.relu(self.bn1(self.conv1(x)))
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)

        return x


def ResNet18():
    return ResNet(BasicBlock, [2, 2, 2, 2])


def ResNet50():
    return ResNet(BottleneckBlock, [3, 4, 6, 3])


class VQAModel(nn.Module):
    def __init__(self, vocab_size: int, n_answer: int):
        super().__init__()
        self.resnet = ResNet18()
        self.text_encoder = nn.Linear(vocab_size, 512)

        self.fc = nn.Sequential(
            nn.Linear(1024, 512),
            nn.ReLU(inplace=True),
            nn.Linear(512, n_answer)
        )

    def forward(self, image, question):
        image_feature = self.resnet(image)  # 画像の特徴量
        question_feature = self.text_encoder(question)  # テキストの特徴量

        x = torch.cat([image_feature, question_feature], dim=1)
        x = self.fc(x)

        return x




In [None]:
# 4. 学習の実装
def train(model, dataloader, optimizer, criterion, device):
    model.train()

    total_loss = 0
    total_acc = 0
    simple_acc = 0

    start = time.time()
    for image, question, answers, mode_answer in dataloader:
        image, question, answer, mode_answer = \
            image.to(device), question.to(device), answers.to(device), mode_answer.to(device)

        pred = model(image, question)
        loss = criterion(pred, mode_answer.squeeze())

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        total_acc += VQA_criterion(pred.argmax(1), answers)  # VQA accuracy
        simple_acc += (pred.argmax(1) == mode_answer).float().mean().item()  # simple accuracy

    return total_loss / len(dataloader), total_acc / len(dataloader), simple_acc / len(dataloader), time.time() - start


def eval(model, dataloader, optimizer, criterion, device):
    model.eval()

    total_loss = 0
    total_acc = 0
    simple_acc = 0

    start = time.time()
    for image, question, answers, mode_answer in dataloader:
        image, question, answer, mode_answer = \
            image.to(device), question.to(device), answers.to(device), mode_answer.to(device)

        pred = model(image, question)
        loss = criterion(pred, mode_answer.squeeze())

        total_loss += loss.item()
        total_acc += VQA_criterion(pred.argmax(1), answers)  # VQA accuracy
        simple_acc += (pred.argmax(1) == mode_answer).mean().item()  # simple accuracy

    return total_loss / len(dataloader), total_acc / len(dataloader), simple_acc / len(dataloader), time.time() - start




In [None]:
def main():
    # deviceの設定
    set_seed(42)
    device = "cuda" if torch.cuda.is_available() else "cpu"

    # dataloader / model
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor()
    ])
    train_dataset = VQADataset(df_path="drive/MyDrive/Colab Notebooks/DL2024S/Final/train.json", image_dir="drive/MyDrive/Colab Notebooks/DL2024S/Final/train", transform=transform)
    test_dataset = VQADataset(df_path="drive/MyDrive/Colab Notebooks/DL2024S/Final/valid.jsonx", image_dir="drive/MyDrive/Colab Notebooks/DL2024S/Final/valid", transform=transform, answer=False)
    test_dataset.update_dict(train_dataset)

    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=128, shuffle=True)
    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=1, shuffle=False)

    model = VQAModel(vocab_size=len(train_dataset.question2idx)+1, n_answer=len(train_dataset.answer2idx)).to(device)

    # optimizer / criterion
    num_epoch = 20
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)

    # train model
    for epoch in range(num_epoch):
        train_loss, train_acc, train_simple_acc, train_time = train(model, train_loader, optimizer, criterion, device)
        print(f"【{epoch + 1}/{num_epoch}】\n"
              f"train time: {train_time:.2f} [s]\n"
              f"train loss: {train_loss:.4f}\n"
              f"train acc: {train_acc:.4f}\n"
              f"train simple acc: {train_simple_acc:.4f}")


# 提出用ファイルの作成
    model.eval()
    submission = []
    for image, question in test_loader:
        image, question = image.to(device), question.to(device)
        pred = model(image, question)
        pred = pred.argmax(1).cpu().item()
        submission.append(pred)

    submission = [train_dataset.idx2answer[id] for id in submission]
    submission = np.array(submission)
    torch.save(model.state_dict(), "model.pth")
    np.save("submission", submission)

#if __name__ == "__main__":
#    main()

In [None]:
class ZCAWhitening():
    def __init__(self, epsilon=1e-4, device="cpu"):  # 計算が重いのでGPUを用いる
        self.epsilon = epsilon
        self.device = device

    def fit(self, images):  # 変換行列と平均をデータから計算
        x = images[0][0].reshape(1, -1)
        self.mean = torch.zeros([1, x.size()[1]]).to(self.device)
        con_matrix = torch.zeros([x.size()[1], x.size()[1]]).to(self.device)
        for i in range(len(images)):  # 各データについての平均を取る
            x = images[i][0].reshape(1, -1).to(self.device)
            self.mean += x / len(images)
            con_matrix += torch.mm(x.t(), x) / len(images)
            if i % 10000 == 0:
                print("{0}/{1}".format(i, len(images)))
        self.E, self.V = torch.linalg.eigh(con_matrix)  # 固有値分解
        self.E = torch.max(self.E, torch.zeros_like(self.E)) # 誤差の影響で負になるのを防ぐ
        self.ZCA_matrix = torch.mm(torch.mm(self.V, torch.diag((self.E.squeeze()+self.epsilon)**(-0.5))), self.V.t())
        print("completed!")

    def __call__(self, x):
        size = x.size()
        x = x.reshape(1, -1).to(self.device)
        x -= self.mean
        x = torch.mm(x, self.ZCA_matrix.t())
        x = x.reshape(tuple(size))
        x = x.to("cpu")
        return x


# (datasetのクラスを自作したので，このあたりの処理が少し変わっています)

zca = ZCAWhitening()
zca.fit(trainval_data)

val_size = 3000
train_data, val_data = torch.utils.data.random_split(trainval_data, [len(trainval_data)-val_size, val_size])  # 訓練データと検証データに分割


# 前処理を定義
transform_train = transforms.Compose([transforms.ToTensor(),
                                    zca])# WRITE ME

transform = transforms.Compose([transforms.ToTensor(),
                                    zca])# WRITE ME

# データセットに前処理を設定
trainval_data.transform = transform_train
test_data.transform = transform

batch_size = 32

dataloader_train = torch.utils.data.DataLoader(
    train_data,
    batch_size=batch_size,
    shuffle=True
)

dataloader_valid = torch.utils.data.DataLoader(
    val_data,
    batch_size=batch_size,
    shuffle=True
)

dataloader_test = torch.utils.data.DataLoader(
    test_data,
    batch_size=batch_size,
    shuffle=False
)

In [None]:
!pip install hydra-core



In [None]:
import torch
import hydra
from omegaconf import DictConfig

#from src.utils import set_seed


@hydra.main(version_base=None, config_path="configs", config_name="base")
def train(args: DictConfig):
    set_seed(args.seed)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    train_dataset = VQADataset(args.train_data.df_path, args.train_data.image_dir, transform)
    valid_dataset = VQADataset(args.valid_data.df_path, args.valid_data.image_dir, transform)
    valid_dataset.update_dict(train_dataset)

    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=args.batch_size, shuffle=True)
    valid_loader = torch.utils.data.DataLoader(valid_dataset, batch_size=args.batch_size, shuffle=False)

    model = VQAModel(len(train_dataset.idx2question), len(train_dataset.idx2answer)).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.AdamW(model.parameters(), lr=args.learning_rate, weight_decay=args.weight_decay)

    best_val_acc = 0

    for epoch in range(args.num_epoch):
        model.train()
        running_loss = 0.0
        for images, questions, answers, _ in train_loader:
            images, questions, answers = images.to(device), questions.to(device), answers.to(device, dtype=torch.long)

            optimizer.zero_grad()
            outputs = model(images, questions)
            loss = criterion(outputs, answers)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        model.eval()
        val_loss = 0.0
        val_acc = 0.0
        with torch.no_grad():
            for images, questions, answers, mode_answer_idx in valid_loader:
                images, questions, answers = images.to(device), questions.to(device), answers.to(device, dtype=torch.long)

                outputs = model(images, questions)
                loss = criterion(outputs, mode_answer_idx)
                val_loss += loss.item()

                pred = outputs.argmax(dim=1)
                val_acc += VQA_criterion(pred.cpu(), answers.cpu())

        val_loss /= len(valid_loader)
        val_acc /= len(valid_loader)

        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(model.state_dict(), "best_model.pth")

        print(f"Epoch [{epoch+1}/{args.num_epoch}], Train Loss: {running_loss/len(train_loader):.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")

    # Save final model
    torch.save(model.state_dict(), "final_model.pth")

    # ------------------
    #    Dataloader
    # ------------------

    # ------------------
    #       Model
    # ------------------

    # ------------------
    #  Loss & optimizer
    # ------------------

    # ------------------
    #   Start training
    # ------------------


#if __name__ == "__main__":
#    train()

In [None]:
train()