In [None]:
print('hello world')

In [1]:
import os
import re  # 正規表現を使用して文字列操作を行うためのライブラリ
import random  # 乱数を生成するためのライブラリ
import time  # 時間に関する様々な関数を提供するライブラリ
from statistics import mode  # 基本的な統計計算を行うためのライブラリ。ここでは最頻値を計算するために使用
from PIL import Image  # 画像処理を行うためのライブラリ（Pillow）
import numpy as np  # 数値計算を効率的に行うためのライブラリ
import pandas  # データ操作と分析を行うためのライブラリ
import torch  # 機械学習ライブラリであるPyTorchの中核をなすパッケージ
import torch.nn as nn  # PyTorchの中でニューラルネットワークの構築に必要なモジュールやレイヤー、関数を提供するパッケージ
import torchvision  # PyTorchのコンピュータビジョン用のライブラリ
from torchvision import transforms  # 画像データを変換するための関数を提供するサブパッケージ
from tqdm import tqdm
import unicodedata
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer, WordNetLemmatizer
import nltk
nltk.download('stopwords')
nltk.download('wordnet')

[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/tomokitakata/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     /Users/tomokitakata/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


True

### 正規表現の基本的な構文と例

#### 1. 基本的なメタ文字
- **`.`（ドット）**: 任意の1文字とマッチします。
  - 例: `a.b` は `a` と `b` の間に任意の1文字が入るものとマッチします。例えば、`aab`, `acb`, `a9b` など。

- **`^`**: 文字列の先頭にマッチします。
  - 例: `^abc` は文字列の先頭が `abc` で始まるものとマッチします。

- **`$`**: 文字列の末尾にマッチします。
  - 例: `abc$` は文字列の末尾が `abc` で終わるものとマッチします。

- **`*`**: 直前の文字が0回以上繰り返されるものにマッチします。
  - 例: `a*` は空文字、`a`, `aa`, `aaa` などにマッチします。

- **`+`**: 直前の文字が1回以上繰り返されるものにマッチします。
  - 例: `a+` は `a`, `aa`, `aaa` などにマッチします。

- **`?`**: 直前の文字が0回または1回繰り返されるものにマッチします。
  - 例: `a?` は空文字または `a` にマッチします。

#### 2. 文字クラス
- **`[ ]`**: 角括弧内の任意の1文字にマッチします。
  - 例: `[abc]` は `a`, `b`, `c` のいずれか1文字にマッチします。

- **`[^ ]`**: 角括弧内に含まれない任意の1文字にマッチします。
  - 例: `[^abc]` は `a`, `b`, `c` 以外の任意の1文字にマッチします。

- **`[a-z]`**: 範囲内の任意の1文字にマッチします。
  - 例: `[a-z]` は小文字のアルファベット全てにマッチします。

#### 3. エスケープシーケンス
- **`\d`**: 任意の数字にマッチします。`[0-9]` と同じです。
- **`\D`**: 任意の数字以外の文字にマッチします。
- **`\w`**: 任意の単語構成文字（アルファベット、数字、アンダースコア）にマッチします。`[a-zA-Z0-9_]` と同じです。
- **`\W`**: 任意の単語構成文字以外の文字にマッチします。
- **`\s`**: 任意の空白文字にマッチします。スペース、タブ、改行など。
- **`\S`**: 任意の空白文字以外の文字にマッチします。

In [2]:
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

def process_text(text):

    # lowercase
    text = text.lower()

    # 数詞を数字に変換
    num_word_to_digit = {
        'zero': '0', 'one': '1', 'two': '2', 'three': '3', 'four': '4',
        'five': '5', 'six': '6', 'seven': '7', 'eight': '8', 'nine': '9',
        'ten': '10'
    }
    for word, digit in num_word_to_digit.items():
        text = text.replace(word, digit)

    # 小数点のピリオドを削除
    text = re.sub(r'(?<!\d)\.(?!\d)', '', text) # re.sub()は元のテキストに対して置換(substitute)を行う。
    '''
    このコードは、数字の前後にないピリオドを全て削除するものです。具体的には、次のようなピリオドを削除します：

	•	“This is a test.” → “This is a test”
	•	“The price is 3.50.” → “The price is 3.50”（変更なし）
	•	“Version 2.0 is released.” → “Version 2.0 is released”（変更なし）

	•	(?<!\d): ピリオドの前に数字がないことを確認。
	•	\.: ピリオド自体。
	•	(?!\d): ピリオドの後に数字がないことを確認。
    '''

    # 冠詞の削除
    text = re.sub(r'\b(a|an|the)\b', '', text)
    '''
    •	元のテキスト: “This is an example of a sentence with the articles.”
	•	冠詞を削除した後のテキスト: “This is example of sentence with articles.”
    re.sub(r'\b(a|an|the)\b', '', text) は次のように動作します：

	1.	単語の境界（\b）を探します。
	2.	単語の境界内に “a”、“an”、または “the” のいずれかがあるか確認します。
	3.	それらの冠詞を空文字列（''）で置き換えます。

この方法により、冠詞が文中から削除され、他の単語は変更されずに残ります。
    '''

    # 短縮形のカンマの追加
    contractions = {
        "dont": "don't", "isnt": "isn't", "arent": "aren't", "wont": "won't",
        "cant": "can't", "wouldnt": "wouldn't", "couldnt": "couldn't"
    }
    for contraction, correct in contractions.items():
        text = text.replace(contraction, correct)

    # 句読点をスペースに変換
    text = re.sub(r"[^\w\s':]", ' ', text)
    '''
    •	正規表現: [^\w\s':]
	•	\w: 単語文字（英数字およびアンダースコア）
	•	\s: 空白文字
	•	':: アポストロフィ（’）とコロン（:）
	•	[^...]: これらの文字以外のすべての文字にマッチ
	•	変換内容: 上記以外のすべての文字をスペースに置換します。
    '''

    # 句読点をスペースに変換
    text = re.sub(r'\s+,', ',', text)
    '''
    •	正規表現: \s+,
	•	\s+: 1つ以上の空白文字
	•	,: カンマ
	•	変換内容: カンマの前の空白を削除します。例えば、 ", " が "," になります。
    '''

    # 連続するスペースを1つに変換
    text = re.sub(r'\s+', ' ', text).strip()
    '''
    •	正規表現: \s+
	•	\s+: 1つ以上の空白文字
	•	変換内容: 連続する空白文字を1つのスペースに置換します。例えば、 "This  is   a test" が "This is a test" になります。
	•	strip(): 文字列の先頭および末尾の空白を削除します。

    '''

    # 改善点①　効果なし。
    
    # Unicode正規化
    # text = unicodedata.normalize('NFKD', text)

    # # ストップワードの除去
    # stop_words = set(stopwords.words('english'))
    # text = ' '.join([word for word in text.split() if word not in stop_words])

    # # ステミング
    # stemmer = PorterStemmer()
    # text = ' '.join([stemmer.stem(word) for word in text.split()])

    # # レンマタイゼーション
    # lemmatizer = WordNetLemmatizer()
    # text = ' '.join([lemmatizer.lemmatize(word) for word in text.split()])

    return text

# errorは解説文に反応してるだけやから特に意識せんでいい。

  '''
  '''
  '''
  '''


In [3]:
def process_image():
    pass

In [4]:
# 1. データローダーの作成
class VQADataset(torch.utils.data.Dataset):
    def __init__(self, df_path, image_dir, transform=None, answer=True):
        self.transform = transform  # 画像の前処理
        self.image_dir = image_dir  # 画像ファイルのディレクトリ
        self.df = pandas.read_json(df_path)  # 画像ファイルのパス，question, answerを持つDataFrame
        self.answer = answer

        # question / answerの辞書を作成
        self.question2idx = {}
        self.answer2idx = {}
        self.idx2question = {}
        self.idx2answer = {}

        # 質問文に含まれる単語を辞書に追加
        for question in self.df["question"]:
            question = process_text(question)
            words = question.split(" ")
            for word in words:
                if word not in self.question2idx:
                    self.question2idx[word] = len(self.question2idx) # dfのquestionカラムに含まれている問題文の各単語が、どの位置にあるのか、先頭を0として辞書形式で情報を格納。
        self.idx2question = {v: k for k, v in self.question2idx.items()}  # 逆変換用の辞書(question)

        if self.answer:
            # 回答に含まれる単語を辞書に追加
            for answers in self.df["answers"]:
                for answer in answers:
                    word = answer["answer"]
                    word = process_text(word)
                    if word not in self.answer2idx:
                        self.answer2idx[word] = len(self.answer2idx)
            self.idx2answer = {v: k for k, v in self.answer2idx.items()}  # 逆変換用の辞書(answer)

    def update_dict(self, dataset):
        """
        検証用データ，テストデータの辞書を訓練データの辞書に更新する．

        Parameters
        ----------
        dataset : Dataset
            訓練データのDataset
        """
        self.question2idx = dataset.question2idx
        self.answer2idx = dataset.answer2idx
        self.idx2question = dataset.idx2question
        self.idx2answer = dataset.idx2answer

    def __getitem__(self, idx):
        """
        対応するidxのデータ（画像，質問，回答）を取得．

        Parameters
        ----------
        idx : int
            取得するデータのインデックス

        Returns
        -------
        image : torch.Tensor  (C, H, W)
            画像データ
        question : torch.Tensor  (vocab_size)
            質問文をone-hot表現に変換したもの
        answers : torch.Tensor  (n_answer)
            10人の回答者の回答のid
        mode_answer_idx : torch.Tensor  (1)
            10人の回答者の回答の中で最頻値の回答のid
        """
        #try:
        image = Image.open(f"{self.image_dir}/{self.df['image'][idx]}")
        #except FileNotFoundError:
            #print("File not found")
            #return None
        
        image = self.transform(image)
        question = np.zeros(len(self.idx2question) + 1)  # 未知語用の要素を追加
        question_words = self.df["question"][idx].split(" ")
        for word in question_words:
            try:
                question[self.question2idx[word]] = 1  # one-hot表現に変換
            except KeyError:
                question[-1] = 1  # 未知語

        if self.answer:
            answers = [self.answer2idx[process_text(answer["answer"])] for answer in self.df["answers"][idx]]
            mode_answer_idx = mode(answers)  # 最頻値を取得（正解ラベル）

            return image, torch.Tensor(question), torch.Tensor(answers), int(mode_answer_idx)

        else:
            return image, torch.Tensor(question)

    def __len__(self):
        return len(self.df)

In [5]:
# 2. 評価指標の実装
# 簡単にするならBCEを利用する
def VQA_criterion(batch_pred: torch.Tensor, batch_answers: torch.Tensor):
    total_acc = 0.

    for pred, answers in zip(batch_pred, batch_answers):
        acc = 0.
        for i in range(len(answers)):
            num_match = 0
            for j in range(len(answers)):
                if i == j:
                    continue
                if pred == answers[j]:
                    num_match += 1
            acc += min(num_match / 3, 1)
        total_acc += acc / 10

    return total_acc / len(batch_pred)

# 3. モデルのの実装
# ResNetを利用できるようにしておく
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_channels: int, out_channels: int, stride: int = 1):
        super().__init__()

        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        residual = x
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))

        out += self.shortcut(residual)
        out = self.relu(out)

        return out

In [6]:
class BottleneckBlock(nn.Module):
    expansion = 4

    def __init__(self, in_channels: int, out_channels: int, stride: int = 1):
        super().__init__()

        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.conv3 = nn.Conv2d(out_channels, out_channels * self.expansion, kernel_size=1, stride=1)
        self.bn3 = nn.BatchNorm2d(out_channels * self.expansion)
        self.relu = nn.ReLU(inplace=True)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels * self.expansion:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels * self.expansion, kernel_size=1, stride=stride),
                nn.BatchNorm2d(out_channels * self.expansion)
            )

    def forward(self, x):
        residual = x
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.relu(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))

        out += self.shortcut(residual)
        out = self.relu(out)

        return out


class ResNet(nn.Module):
    def __init__(self, block, layers):
        super().__init__()
        self.in_channels = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(block, layers[0], 64)
        self.layer2 = self._make_layer(block, layers[1], 128, stride=2)
        self.layer3 = self._make_layer(block, layers[2], 256, stride=2)
        self.layer4 = self._make_layer(block, layers[3], 512, stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, 512)

    def _make_layer(self, block, blocks, out_channels, stride=1):
        layers = []
        layers.append(block(self.in_channels, out_channels, stride))
        self.in_channels = out_channels * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.relu(self.bn1(self.conv1(x)))
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)

        return x


# model1
def ResNet18():
    return ResNet(BasicBlock, [2, 2, 2, 2])

# model2
def ResNet50():
    return ResNet(BottleneckBlock, [3, 4, 6, 3])

## ベースライン

In [None]:
# class VQAModel(nn.Module):
#     def __init__(self, vocab_size: int, n_answer: int): # この部分で必要な引数を指定。
#         super().__init__()
#         self.img_encoder = ResNet50() # ここで基底モデルを設定。CNNベースの別モデルを使用することも考えられる。
#         self.text_encoder = nn.Linear(vocab_size, 512) # 全層結合(線形層)。入力ベクトルに対して線形変換を適用。ここを何かしらのモデルを使用してみることも検討。ここにnn.sequentialでNN作ってもいい。

#         self.fc = nn.Sequential(
#             nn.Linear(1024, 512), # ここで画像特徴量とテキスト特徴量を、学習がうまくいくように混ぜてる？, # 全結合層は重ねれば重ねるだけいい
#             nn.ReLU(inplace=True),
#             nn.Linear(512, n_answer)
#         ) # ここにドロップアウト層を入れてみるのもあり

#     def forward(self, image, question):
#         image_feature = self.img_encoder(image)  # 画像の特徴量
#         question_feature = self.text_encoder(question)  # テキストの特徴量

#         x = torch.cat([image_feature, question_feature], dim=1) # 画像特徴量とテキスト特徴量の結合。 アウトプットは512+512=1024の次元数。
#         x = self.fc(x) # fcでcatによる1024次元の出力を512次元に圧縮、ReLUを入れて非線形性を導入、512次元から回答の数に対応する出力ベクトルを作成。

#         return x

## 240711

In [7]:
class VQAModel(nn.Module):
    def __init__(self, vocab_size: int, n_answer: int): # この部分で必要な引数を指定。
        super().__init__()
        self.img_encoder = ResNet50() # 改善点②
        self.text_encoder = nn.Sequential(
            nn.Linear(vocab_size, 756),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.5),
            nn.Linear(756,512))# 改善点③

        self.fc = nn.Sequential(
            nn.Linear(1024, 756),
            nn.ReLU(inplace = True), 
            nn.Dropout(p=0.5),
            nn.Linear(756, 512), 
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.5),
            nn.Linear(512, n_answer)
        ) # 改善点④

    def forward(self, image, question):
        image_feature = self.img_encoder(image)  # 画像の特徴量
        question_feature = self.text_encoder(question)  # テキストの特徴量

        x = torch.cat([image_feature, question_feature], dim=1) # 画像特徴量とテキスト特徴量の結合。 アウトプットは512+512=1024の次元数。
        x = self.fc(x) # fcでcatによる1024次元の出力を512次元に圧縮、ReLUを入れて非線形性を導入、512次元から回答の数に対応する出力ベクトルを作成。

        return x

In [15]:
# 4. 学習の実装
def train(model, dataloader, optimizer, criterion, device):
    model.train()

    total_loss = 0
    total_acc = 0
    simple_acc = 0

    start = time.time()
    for image, question, answers, mode_answer in dataloader:
        # 一つでもNoneが含まれている場合にスキップ
        if image is None or question is None or answers is None or mode_answer is None:
            continue

        # ここの\は単に改行してるだけ
        image, question, answers, mode_answer = \
            image.to(device), question.to(device), answers.to(device), mode_answer.to(device)
        pred = model(image, question) # 順伝播(調整)
        loss = criterion(pred, mode_answer.squeeze()) # 損失の計算

        optimizer.zero_grad() # 勾配の初期化
        loss.backward() # 最適化に向けてどの方向に進むかを決める
        optimizer.step() # 実際に最適化する方向に足を踏み出す

        total_loss += loss.item() # lossはtensorの値であるから、pythonの値とするために.item()で取得。
        total_acc += VQA_criterion(pred.argmax(1), answers)  # VQA accuracy
        simple_acc += (pred.argmax(1) == mode_answer).float().mean().item()  # simple accuracy
        '''
        total_loss, total_acc, simple_accに全部足し合わせることで、
        dataloaderのエポック全体の損失とかaccとかを出そうとしている。
        '''
    return total_loss / len(dataloader), total_acc / len(dataloader), simple_acc / len(dataloader), time.time() - start


def eval(model, dataloader, optimizer, criterion, device):
    model.eval()

    total_loss = 0
    total_acc = 0
    simple_acc = 0

    start = time.time()
    for image, question, answers, mode_answer in tqdm(dataloader):
        image, question, answers, mode_answer = \
            image.to(device), question.to(device), answers.to(device), mode_answer.to(device)

        pred = model(image, question)
        loss = criterion(pred, mode_answer.squeeze())

        total_loss += loss.item()
        total_acc += VQA_criterion(pred.argmax(1), answers)  # VQA accuracy
        simple_acc += (pred.argmax(1) == mode_answer).mean().item()  # simple accuracy

    return total_loss / len(dataloader), total_acc / len(dataloader), simple_acc / len(dataloader), time.time() - start


def main():
    # deviceの設定
    set_seed(42)
    device = "cuda" if torch.cuda.is_available() else "cpu"

    # dataloader / model
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor()
    ]) # 画像データの前処理。単純に224*224にリサイズしてテンソルに変えてるだけ、他の処理もわんちゃん効くかも。

    train_dataset = VQADataset(df_path="../data/train.json", image_dir="../data/train", transform=transform)
    test_dataset = VQADataset(df_path="../data/valid.json", image_dir="../data/valid", transform=transform, answer=False)
    test_dataset.update_dict(train_dataset)

    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=128, shuffle=True) # 128づつモデルに読み込ませるためのDataLoader, shuffle=Trueによってモデルの過学習を抑える。
    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=1, shuffle=False)

    # これによって画像データとテキストデータが特徴量としてまとめられる。
    model = VQAModel(vocab_size=len(train_dataset.question2idx)+1, n_answer=len(train_dataset.answer2idx)).to(device) # +1はよくわからんけどノリで

    # 学習済みモデルをロード
    model_path = "model.pth"
    if os.path.exists(model_path):
        model.load_state_dict(torch.load(model_path), strict=False)
    print("Pre-trained model loaded.")

    # optimizer / criterion
    # ここで学習
    num_epoch = 5
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)

    # train model
    for epoch in tqdm(range(num_epoch)):
        train_loss, train_acc, train_simple_acc, train_time = train(model, train_loader, optimizer, criterion, device)
        print(f"【{epoch + 1}/{num_epoch}】\n"
              f"train time: {train_time:.2f} [s]\n"
              f"train loss: {train_loss:.4f}\n"
              f"train acc: {train_acc:.4f}\n"
              f"train simple acc: {train_simple_acc:.4f}")

    # 提出用ファイルの作成
    model.eval() # モデルを評価モードに変更(推論用)
    submission = []
    for image, question in test_loader:
        image, question = image.to(device), question.to(device)
        pred = model(image, question)
        pred = pred.argmax(1).cpu().item()
        submission.append(pred)

    submission = [train_dataset.idx2answer[id] for id in submission]
    submission = np.array(submission)
    torch.save(model.state_dict(), "model.pth")# pytorchモデルファイルの保存
    np.save("submission.npy", submission)

# この条件文は、スクリプトが直接実行されているかどうかを確認するためのものです。スクリプトが直接実行されている場合は__name__の値が"__main__"になるため、この条件は真になります。これにより、スクリプトが直接実行されたときにのみ特定のコードが実行されるようにすることができます。
# if __name__ == "__main__":
#     main()

In [16]:
main()

Pre-trained model loaded.


  0%|          | 0/5 [33:23<?, ?it/s]


KeyboardInterrupt: 

- nn.Linear()ってデータのshapeを変えるだけっていう認識でいい？
    - 大枠はそんな感じ、それ自体でも重みを持って、学習に影響を与える。
    - Linear()とかReLu()とか増やしたらわんちゃん精度上がる
    - コンボリューショナル層→CNN層
- VQAmodel()の画像とテキストのそれぞれの処理構造について教えてほしい。
    - おわり
- ResNet以外に使用できるものってある？
    - あるかも。transformer
- pthファイルって、重みが保存されているものっていう認識でいい？
    - pthを初期状態として、全レイヤを学習させるのもあり →fine tuning
    - pthをもとに、最後のレイヤーだけ学習する→転移学習
- データローダーの存在意義
    - バッチ処理
- 順伝播→損失計算→勾配初期化→逆伝播→パラメータ更新(step?)って具体的になにをしてる？
    - おけ
- これによって画像データとテキストデータが特徴量としてまとめられる。→特徴量を単に横方向に結合しているだけ？
    - おけ
- VQAModel()でdef __init__()とdef forward()しかないのはなんで？
    - ドキュメントよめ！
- 全体として、工夫をする上でいじるべきコードを明確にしたい。
    - コメントアウト確認！
- ローカルめちゃ時間かかるねんけどどうしよ、JupyterLabの使い方わからない、どういう構造っぽいかみてほしい。
    - google colabつかいなさい