In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import os
# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

# import os
# for dirname, _, filenames in os.walk('/kaggle/input'):
#     for filename in filenames:
#         print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

import wandb
wandb.init(mode='disabled')

os.environ["WANDB_DISABLED"] = "true"
os.environ["TOKENIZERS_PARALLELISM"] = "false"


from datasets import load_from_disk
import matplotlib.pyplot as plt
import pprint as pp
import torch.nn.functional as F
from sklearn.metrics import accuracy_score, f1_score
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple, Union
from datasets import load_dataset, enable_caching
from PIL import Image
import torch
from copy import deepcopy
import torch.nn as nn
from transformers import (TrainingArguments, Trainer,
                          AutoTokenizer, AutoFeatureExtractor, AutoImageProcessor,
                          AutoModel)  


import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import AutoModel
from typing import Optional

import torch
from transformers import AutoTokenizer, AutoImageProcessor



# Assuming VITBERTVQAModel is the model class
# loaded_model, loaded_tokenizer, loaded_processor = load_model(VITBERTVQAModel, save_directory)

import torch
import os

## trai

from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score



In [None]:



@dataclass
class Collator:
    tokenizer: AutoTokenizer
    img_processor: AutoFeatureExtractor
    label_map: Dict[str, int]  

    def tokenize_text(self, texts: List[str]) -> Dict[str, torch.Tensor]:
        encoded_text = self.tokenizer(
            text=texts,
            padding='longest',
            max_length=24,
            truncation=True,
            return_tensors='pt',
            return_token_type_ids=True,
            return_attention_mask=True,
        )
        return {
            "input_ids": encoded_text['input_ids'].squeeze(),
            "token_type_ids": encoded_text['token_type_ids'].squeeze(),
            "attention_mask": encoded_text['attention_mask'].squeeze(),
        }

    def process_images(self, images: List[Image.Image]) -> Dict[str, torch.Tensor]:
        
        if isinstance(images, list):
            for image in images:
                if len(np.array(image).shape) != 3:
                    print('oh no')
        else:
            if len(np.array(images).shape) != 3:
                    print('oh no')
                
        processed_images = self.img_processor(
            images=images,
            return_tensors="pt",
        ) 
        return {
            "pixel_values": processed_images['pixel_values'].squeeze(),
        }
    
    def __call__(self, raw_batch_dict) -> Dict[str, torch.Tensor]:
        
        if isinstance(raw_batch_dict, dict):
            questions = raw_batch_dict['question']
            images = raw_batch_dict['image'].convert('RGB')
            answers = raw_batch_dict['multiple_choice_answer']# multiple_choice_answerが正解ラベル
        else:
            questions = [i['question'] for i in raw_batch_dict]
            images = [i['image'] for i in raw_batch_dict]
            answers = [i['multiple_choice_answer'] for i in raw_batch_dict]
        
        tokenized_texts = self.tokenize_text(questions)
        processed_images = self.process_images(images)
#         print(answers)
        if isinstance(answers, str):
            labels = self.label_map[answers] 
        else:
#             labels = [self.label_map[answer] for answer in answers]
            labels = [self.label_map.get(answer, self.label_map['unknown']) for answer in answers]
        
        return {
            **tokenized_texts,
            **processed_images,
            'labels': torch.tensor(labels, dtype=torch.int64).squeeze()
        }
    
class CoAttention(nn.Module):
    def __init__(self, text_hidden_dim, image_hidden_dim, common_hidden_dim, dropout_rate=0.3):
        super(CoAttention, self).__init__()
        self.common_hidden_dim = common_hidden_dim

        # W_Q,W_K,W_V for text encodings
        self.text_query = nn.Linear(text_hidden_dim, common_hidden_dim)
        self.text_key = nn.Linear(text_hidden_dim, common_hidden_dim)
        self.text_value = nn.Linear(text_hidden_dim, common_hidden_dim)
        # W_Q,W_K,W_V for image encodings
        self.image_query = nn.Linear(image_hidden_dim, common_hidden_dim)
        self.image_key = nn.Linear(image_hidden_dim, common_hidden_dim)
        self.image_value = nn.Linear(image_hidden_dim, common_hidden_dim)
        
        self.text_feed_forward = nn.Linear(common_hidden_dim + text_hidden_dim, common_hidden_dim)
        self.image_feed_forward = nn.Linear(common_hidden_dim + image_hidden_dim, common_hidden_dim)
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, text_features, image_features):
        # Q,K,V for text encodings
        Q_text = self.text_query(text_features)
        K_text = self.text_key(text_features)
        V_text = self.text_value(text_features)
        # Q,K,V for image encodings
        Q_image = self.image_query(image_features)
        K_image = self.image_key(image_features)
        V_image = self.image_value(image_features)

        # attention weights for text encodings
        attention_text_to_image = torch.matmul(Q_text, K_image.transpose(-2, -1)) / (self.common_hidden_dim ** 0.5)
        attention_text_to_image = F.softmax(attention_text_to_image, dim=-1)
        # attention weights for image encodings
        attention_image_to_text = torch.matmul(Q_image, K_text.transpose(-2, -1)) / (self.common_hidden_dim ** 0.5)
        attention_image_to_text = F.softmax(attention_image_to_text, dim=-1)
        # attention scores of text encodings
        attended_text_to_image = torch.matmul(attention_text_to_image, V_image)
        # attention scores of image encodings
        attended_image_to_text = torch.matmul(attention_image_to_text, V_text)

        text_combined = torch.cat((text_features, attended_text_to_image), dim=-1)
        image_combined = torch.cat((image_features, attended_image_to_text), dim=-1)

        updated_text_features = self.text_feed_forward(text_combined)
        updated_image_features = self.image_feed_forward(image_combined)

        return updated_text_features, updated_image_features
    
    
    
class VITBERTcoAttentionVQAmodel(nn.Module):
    def __init__(
            self,
            num_labels: int = len(LABEL_MAP),
            intermediate_dim: int = 512,
            common_hidden_dim: int = 256,
            pretrained_text_name: str = 'bert-base-uncased',
            pretrained_image_name: str = 'google/vit-base-patch16-224-in21k'):
     
        super(VITBERTcoAttentionVQAmodel, self).__init__()
        
        self.num_labels = num_labels
        self.pretrained_text_name = pretrained_text_name
        self.pretrained_image_name = pretrained_image_name
        
#         self.text_encoder = AutoModel.from_pretrained(self.pretrained_text_name)
#         self.image_encoder = AutoModel.from_pretrained(self.pretrained_image_name)
        self.text_encoder = AutoModel.from_pretrained(self.pretrained_text_name, 
                                              hidden_dropout_prob=0.3,
                                              attention_probs_dropout_prob=0.3)
        self.image_encoder = AutoModel.from_pretrained(self.pretrained_image_name)
#                                                hidden_dropout_prob=0.3,
#                                                attention_probs_dropout_prob=0.3)
        
        text_hidden_dim = self.text_encoder.config.hidden_size
        
        if 'efficientnet' in self.pretrained_image_name.lower():
            image_hidden_dim = self.image_encoder.config.num_channels
        else:
            image_hidden_dim = self.image_encoder.config.hidden_size
        
#         if 'efficientnet' in pretrained_image_name.lower():
#             image_hidden_dim = self.image_encoder.config.num_channels
#         else:
#             image_hidden_dim = self.image_encoder.config.hidden_size
        
#         image_hidden_dim = self.image_encoder.config.hidden_size
        self.co_attention = CoAttention(text_hidden_dim, image_hidden_dim, common_hidden_dim)
        
        self.text_transformer_encoder_layer = nn.TransformerEncoderLayer(d_model=common_hidden_dim, nhead=8)
        self.text_transformer_encoder = nn.TransformerEncoder(self.text_transformer_encoder_layer, num_layers=1)
        
        self.image_transformer_encoder_layer = nn.TransformerEncoderLayer(d_model=common_hidden_dim, nhead=8)
        self.image_transformer_encoder = nn.TransformerEncoder(self.image_transformer_encoder_layer, num_layers=1)

        self.fusion = nn.Sequential(
            nn.Linear(common_hidden_dim * 2, intermediate_dim),  
#             nn.BatchNorm1d(intermediate_dim),
            nn.ReLU(),
            nn.Dropout(0.5),  # Dropout層を追加
            nn.LayerNorm(intermediate_dim)
            
        )
        
        self.classifier = nn.Linear(intermediate_dim, self.num_labels)
        
        self.criterion = nn.CrossEntropyLoss()
    
    def forward(
            self,
            input_ids: torch.LongTensor,
            pixel_values: torch.FloatTensor,
            attention_mask: Optional[torch.LongTensor] = None,
            token_type_ids: Optional[torch.LongTensor] = None,
            labels: Optional[torch.LongTensor] = None):
        
        # Encode text
        encoded_text = self.text_encoder(
            input_ids=input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            return_dict=True,
        )
        text_features = encoded_text.last_hidden_state  

        # Encode image
        encoded_image = self.image_encoder(
            pixel_values=pixel_values,
            return_dict=True,
        )
        image_features = encoded_image.last_hidden_state  

        # Co-attention mechanism
        updated_text_features, updated_image_features = self.co_attention(text_features, image_features)

        # Pass through respective transformer encoders
        transformer_text_output = self.text_transformer_encoder(updated_text_features)
        transformer_image_output = self.image_transformer_encoder(updated_image_features)

        #CLS token encoding 
        cls_text_output = transformer_text_output[:, 0, :]  
        cls_image_output = transformer_image_output[:, 0, :]  

        # Fuse the CLS token encodings
        fused_output = self.fusion(
            torch.cat([cls_text_output, cls_image_output], dim=1)
        )

        logits = self.classifier(fused_output)
        
        out = {
            "logits": logits
        }
        if labels is not None:
            loss = self.criterion(logits, labels)
            out["loss"] = loss
        
        return out




In [None]:
def create_collator_and_model(text='bert-base-uncased', image='google/vit-base-patch16-224-in21k'):
    
    tokenizer = AutoTokenizer.from_pretrained(text)
    processor = AutoImageProcessor.from_pretrained(image, use_fast=True)

    collator = Collator(
        tokenizer=tokenizer,
        img_processor=processor,
        label_map = LABEL_MAP
    )

    model = VITBERTcoAttentionVQAmodel(pretrained_text_name=text, pretrained_image_name=image).to(device)
    return collator, model

def load_model(model_class, save_directory, device='cuda' if torch.cuda.is_available() else 'cpu'):
    # Load the state_dict
    model_state_dict = torch.load(os.path.join(save_directory, 'model.pth'), map_location=device)
    
    # Initialize the model instance
    model = model_class().to(device)
    
    # Load the state_dict into the model
    model.load_state_dict(model_state_dict)
    
    # Load the tokenizer and processor
    tokenizer = AutoTokenizer.from_pretrained(save_directory)
    processor = AutoImageProcessor.from_pretrained(save_directory)
    
    print(f"Model, tokenizer, and processor loaded from {save_directory}")
    
    return model, tokenizer, processor

def save_model(model, tokenizer, processor, save_directory):
    if not os.path.exists(save_directory):
        os.makedirs(save_directory)
    
    # Save the model state_dict
    torch.save(model.state_dict(), os.path.join(save_directory, 'model.pth'))
    
    # Save the tokenizer and processor
    tokenizer.save_pretrained(save_directory)
    processor.save_pretrained(save_directory)

    print(f"Model, tokenizer, and processor saved to {save_directory}")

# def compute_metrics(eval_tuple: Tuple[np.ndarray, np.ndarray]) -> Dict[str, float]:
#     logits, labels = eval_tuple
#     preds = logits.argmax(axis=-1)
#     return {
#         "acc": accuracy_score(labels, preds),
#         "f1": f1_score(labels, preds, average='macro'),
#         "precision": precision_score(labels, preds, average='macro', zero_division=0),
#         "recall": recall_score(labels, preds, average='macro', zero_division=0)
#     }


In [None]:
def create_collator_and_model(text='bert-base-uncased', image='google/vit-base-patch16-224-in21k'):
    
    tokenizer = AutoTokenizer.from_pretrained(text)
    processor = AutoImageProcessor.from_pretrained(image, use_fast=True)

    collator = Collator(
        tokenizer=tokenizer,
        img_processor=processor,
        label_map = LABEL_MAP
    )

#     model = VITBERTcoAttentionVQAmodel(pretrained_text_name=text, pretrained_image_name=image).to(device)
    model =  MultimodalVQAModel(pretrained_text_name=text, pretrained_image_name=image).to(device)
    return collator, model

In [None]:
import json
def load_json(path):
    with open(path) as f:
        data = json.load(f)
    return data
train =load_json("data/train.json")

In [None]:
import json
def load_json(path):
    with open(path) as f:
        data = json.load(f)
    return data
train =load_json("data/train.json")

In [None]:
import re
def process_text(text):
    # lowercase
    text = text.lower()

    # 数詞を数字に変換
    num_word_to_digit = {
        'zero': '0', 'one': '1', 'two': '2', 'three': '3', 'four': '4',
        'five': '5', 'six': '6', 'seven': '7', 'eight': '8', 'nine': '9',
        'ten': '10'
    }
    for word, digit in num_word_to_digit.items():
        text = text.replace(word, digit)

    # 小数点のピリオドを削除
    text = re.sub(r'(?<!\d)\.(?!\d)', '', text)

    # 冠詞の削除
    text = re.sub(r'\b(a|an|the)\b', '', text)

    # 短縮形のカンマの追加
    contractions = {
        "dont": "don't", "isnt": "isn't", "arent": "aren't", "wont": "won't",
        "cant": "can't", "wouldnt": "wouldn't", "couldnt": "couldn't"
    }
    for contraction, correct in contractions.items():
        text = text.replace(contraction, correct)

    # 句読点をスペースに変換
    text = re.sub(r"[^\w\s':]", ' ', text)

    # 句読点をスペースに変換
    text = re.sub(r'\s+,', ',', text)

    # 連続するスペースを1つに変換
    text = re.sub(r'\s+', ' ', text).strip()

    return text

# answers = [self.answer2idx[process_text(answer["answer"])] for answer in self.df["answers"][idx]]
# mode_answer_idx = mode(answers)  # 最頻値を取得（正解ラベル）

In [None]:
from statistics import mode
answer_list = []
all_answers = []

# 全てのIDに対して最頻値を取得
for id, answers in train["answers"].items():
    processed_answers = [process_text(answer["answer"]) for answer in answers]
    most_common_answer = mode(processed_answers)
    all_answers.extend(processed_answers)  # すべての回答を1次元リストに追加
    answer_list.append({
        "id": id,
        "most-answer": most_common_answer
    })

# 重複を除去し、ユニークな回答のセットを作成
unique_answers = set(all_answers)

# 正解ラベルを数値に変換
LABEL_MAP = {answer: i for i, answer in enumerate(unique_answers)}


In [None]:


import os
from copy import deepcopy
from transformers import Trainer, TrainingArguments, TrainerCallback
import json


import numpy as np
import torch
from sklearn.metrics import accuracy_score, f1_score


def VQA_criterion(batch_pred: torch.Tensor, batch_answers: torch.Tensor):
    total_acc = 0.
    batch_size = batch_pred.size(0)

    for pred, answer in zip(batch_pred, batch_answers):
        acc = 0.
        pred = pred.item()  # テンソルから単一の値を取得
        answer = answer.item()  # テンソルから単一の値を取得
        
        if pred == answer:
            acc = 1.0
        
        total_acc += acc

    return total_acc / batch_size

def compute_metrics(eval_tuple):
    logits, labels = eval_tuple
    preds = np.argmax(logits, axis=-1)
    
    # Convert numpy arrays to torch tensors for VQA_criterion
    predictions_tensor = torch.from_numpy(preds)
    labels_tensor = torch.from_numpy(labels)
    
    # Calculate VQA accuracy
    vqa_accuracy = VQA_criterion(predictions_tensor, labels_tensor)
    print(f"vqa_accuracy:{vqa_accuracy}")
    
    return {
        "acc": accuracy_score(labels, preds),
        "f1": f1_score(labels, preds, average='macro'),
        "vqa_accuracy": vqa_accuracy  # テンソルから単一の値を取得
    }




class MetricsCallback(TrainerCallback):
    def __init__(self):
        self.metrics = {'train': [], 'eval': []}

    def on_log(self, args, state, control, **kwargs):
        # Collect training metrics
#         print('on_epoch_end')
        logs = state.log_history[-1] if state.log_history else {}
        pp.pprint(logs)
        if state.is_world_process_zero:
            if len(state.log_history)%2 == 1: 
                self.metrics['train'].append(logs)
            else:
                self.metrics['eval'].append(logs)
    def on_epoch_begin(self, args, state, control, **kwargs):
        print(f'epoch: {state.epoch}')



def createAndTrainModel(dataset, args, text_model='bert-base-uncased', image_model='google/vit-base-patch16-224-in21k', multimodal_model='bert_vit',lr=0.1):
    collator, model = create_collator_and_model(text_model, image_model)
#     print(model)
    multi_args = deepcopy(args)
    multi_args.output_dir = os.path.join("./", "checkpoint", multimodal_model)
    metrics_callback = MetricsCallback()
#     multi_trainer = Trainer(
#         model,
#         multi_args,
#         train_dataset=train_dataset,
#         eval_dataset=val_dataset,
#         data_collator=collator,
#         compute_metrics=compute_metrics,
#         callbacks=[metrics_callback]
#     )
    from transformers import AdamW, get_linear_schedule_with_warmup

    optimizer = AdamW(model.parameters(), lr=lr, weight_decay=0.01)
    scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=len(train_dataset) * args.num_train_epochs)

    multi_trainer = Trainer(
        model,
        multi_args,
        train_dataset=train_dataset,
        eval_dataset=val_dataset,
        data_collator=collator,
        compute_metrics=compute_metrics,
        optimizers=(optimizer, scheduler),
        callbacks=[metrics_callback]
    )
    
#     train_multi_metrics = multi_trainer.train()
    
    try:
        train_multi_metrics = multi_trainer.train()
    except Exception as e:
        print("Error occurred during training:")
        print(type(e).__name__, ":", str(e))
        import traceback
        traceback.print_exc()
        raise
    
    
    eval_multi_metrics = multi_trainer.evaluate()
    metrics_dict = metrics_callback.metrics
    
    # Save the model, tokenizer, and processor
    save_directory = os.path.join(multi_args.output_dir, "final_model")
    save_model(model, collator.tokenizer, collator.img_processor, save_directory)


    return collator, model, train_multi_metrics, eval_multi_metrics, metrics_dict


In [None]:
import pandas as pd
from collections import Counter

class VQADataset(torch.utils.data.Dataset):
    def __init__(self, df_path, image_dir, transform=None, answer=True, is_test=False, class_mapping_path="data/annotations/class_mapping.csv"):
        self.transform = transform
        self.image_dir = image_dir
        self.df = pd.read_json(df_path)
        self.answer = answer
        self.is_test = is_test

        self.question2idx = {}
        self.idx2question = {}
        
        # 新しい回答辞書の読み込み
        cm = pd.read_csv(class_mapping_path)
        self.answer2idx = {row["answer"]: row["class_id"] for _, row in cm.iterrows()}
        self.idx2answer = {row["class_id"]: row["answer"] for _, row in cm.iterrows()}

        for question in self.df["question"]:
            question = process_text(question)
            words = question.split(" ")
            for word in words:
                if word not in self.question2idx:
                    self.question2idx[word] = len(self.question2idx)
        self.idx2question = {v: k for k, v in self.question2idx.items()}

        if self.answer and not self.is_test:
            # 最頻値の回答を計算して新しい列に追加
            self.df['multiple_choice_answer'] = self.df['answers'].apply(
                lambda x: self._get_most_common_answer([process_text(answer["answer"]) for answer in x])
            )

    def _get_most_common_answer(self, answers):
        # 回答の頻度をカウント
        answer_counts = Counter(answers)
        # 最も頻度の高い回答を取得
        most_common_answer = answer_counts.most_common(1)[0][0]
        # 新しい回答辞書に存在する場合はそのまま返し、存在しない場合は最も近い回答を返す
        return self._get_closest_answer(most_common_answer)

    def _get_closest_answer(self, answer):
        if answer in self.answer2idx:
            return answer
        # 最も近い回答を見つける（例：編集距離を使用）
        import Levenshtein
        closest_answer = min(self.answer2idx.keys(), key=lambda x: Levenshtein.distance(x, answer))
        return closest_answer

    def update_dict(self, dataset):
        self.question2idx = dataset.question2idx
        self.idx2question = dataset.idx2question
        # answer2idxとidx2answerは更新しない（新しい辞書を使用しているため）

    def __getitem__(self, idx):
        image = Image.open(f"{self.image_dir}/{self.df['image'][idx]}")
        image = self.transform(image) if self.transform else image

        question = self.df["question"][idx]

        result = {
            'image': image,
            'question': question
        }

        if self.answer and not self.is_test:
            answer = self.df['multiple_choice_answer'][idx]
            result['multiple_choice_answer'] = answer
            result['answer_id'] = self.answer2idx[answer]

        return result

    def __len__(self):
        return len(self.df)

In [None]:
@dataclass
class TestCollator:
    tokenizer: AutoTokenizer
    img_processor: AutoFeatureExtractor

    def tokenize_text(self, texts: List[str]) -> Dict[str, torch.Tensor]:
        encoded_text = self.tokenizer(
            text=texts,
            padding='longest',
            max_length=24,
            truncation=True,
            return_tensors='pt',
            return_token_type_ids=True,
            return_attention_mask=True,
        )
        return {
            "input_ids": encoded_text['input_ids'].squeeze(),
            "token_type_ids": encoded_text['token_type_ids'].squeeze(),
            "attention_mask": encoded_text['attention_mask'].squeeze(),
        }

    def process_images(self, images: List[Image.Image]) -> Dict[str, torch.Tensor]:
        if isinstance(images, list):
            for image in images:
                if len(np.array(image).shape) != 3:
                    print('oh no')
        else:
            if len(np.array(images).shape) != 3:
                print('oh no')
                
        processed_images = self.img_processor(
            images=images,
            return_tensors="pt",
        ) 
        return {
            "pixel_values": processed_images['pixel_values'].squeeze(),
        }
    
    def __call__(self, raw_batch_dict) -> Dict[str, torch.Tensor]:
        if isinstance(raw_batch_dict, dict):
            questions = raw_batch_dict['question']
            images = raw_batch_dict['image'].convert('RGB')
        else:
            questions = [i['question'] for i in raw_batch_dict]
            images = [i['image'] for i in raw_batch_dict]
        
        tokenized_texts = self.tokenize_text(questions)
        processed_images = self.process_images(images)
        
        return {
            **tokenized_texts,
            **processed_images,
        }

In [None]:
from torch.utils.data import random_split
import torch
import re
import random
import time
from statistics import mode

from PIL import Image
import numpy as np
import pandas
import torch
import torch.nn as nn
import torchvision
from torchvision import transforms

from tqdm import tqdm


collator = Collator(
        tokenizer=tokenizer,
        img_processor=image_processor,
        label_map = LABEL_MAP
    )

transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.RandomHorizontalFlip(p=1.0),
#         transforms.RandomRotation(degrees=(-180, 180)),
#         transforms.RandomCrop(32, padding=(4, 4, 4, 4), padding_mode='constant'),
#         transforms.RandomErasing(p=0.8, scale=(0.02, 0.33), ratio=(0.3, 3.3)),
#         transforms.ColorJitter(brightness=0.5, contrast=0.5, saturation=0.5),
        transforms.GaussianBlur(kernel_size=3, sigma=(0.1, 2.0)),
        transforms.ToTensor()
    ])

# 元の訓練データセットを作成
full_train_dataset = VQADataset(df_path="./data/train.json", image_dir="./data/train", transform=transform)

# テストデータセットを作成
test_dataset = VQADataset(df_path="./data/valid.json", image_dir="./data/valid", transform=transform, answer=False)
test_dataset.update_dict(full_train_dataset)

# 訓練データセットを分割（例：80%訓練、20%評価）
train_size = int(0.8 * len(full_train_dataset))
val_size = len(full_train_dataset) - train_size
train_dataset, val_dataset = random_split(full_train_dataset, [train_size, val_size])

# データローダーの作成
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=128, shuffle=True,collate_fn=collator)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=128, shuffle=False, collate_fn=collator)
test_dataset = VQADataset(df_path="./data/valid.json", image_dir="./data/valid", transform=transform, answer=False,is_test=True)

test_collator = TestCollator(tokenizer=collator.tokenizer, img_processor=collator.img_processor)
# test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=1, shuffle=False, collate_fn=test_collator)
# テストデータローダーの作成
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=1, shuffle=False, collate_fn=test_collator)

# データセットとローダーのサイズを確認
print(f"Full train dataset size: {len(full_train_dataset)}")
print(f"Train dataset size: {len(train_dataset)}")
print(f"Validation dataset size: {len(val_dataset)}")
print(f"Test dataset size: {len(test_dataset)}")
print(f"Number of batches in train loader: {len(train_loader)}")
print(f"Number of batches in validation loader: {len(val_loader)}")
print(f"Number of batches in test loader: {len(test_loader)}")

In [None]:

# Check if CUDA is available
use_fp16 = torch.cuda.is_available()

from transformers import TrainingArguments
import datetime

# 現在の日時を取得してフォーマット
current_time = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")

# モデル名やその他の識別子を設定
model_name = "VITBERT_coAttention-test03"
run_identifier = f"{model_name}_{current_time}"


args = TrainingArguments(
    output_dir=f"./checkpoints/{run_identifier}",  # チェックポイントの基本ディレクトリ
    run_name=run_identifier,  # 実行の固有名
    seed=42, 
    evaluation_strategy="epoch",
    logging_strategy="epoch",
    save_strategy="epoch",
    save_total_limit=3,
    metric_for_best_model='eval_f1',
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    remove_unused_columns=False,
    num_train_epochs=3,
    fp16=use_fp16,
    dataloader_num_workers=4,
    load_best_model_at_end=True,
    report_to='none',
    save_steps=1,  # 1ステップごとに保存（実質的には1エポックごとに保存）
    save_on_each_node=True,  # 分散トレーニングの場合、各ノードで保存
)


In [None]:
## start training
# lr=を5e-5から0.01に変更
print('='*20)
print('begin training')
collator2, model2, train_multi_metrics2, eval_multi_metrics2,metrics_dict2 = createAndTrainModel(train_dataset, args, text_model="microsoft/deberta-base",lr=0.01)
print('training ended')
print('='*20)

In [None]:
def predict_test_data(model, test_loader, device='cuda'):
    model.eval()
    model.to(device)
    
    predictions = []
    
    print(f"Starting predictions on {len(test_loader.dataset)} samples")
    
    with torch.no_grad():
        for batch_idx, batch in enumerate(test_loader):
            # Move batch to device
            batch = {k: v.to(device) if isinstance(v, torch.Tensor) else v for k, v in batch.items()}
            
            # Forward pass
            outputs = model(**batch)
            logits = outputs['logits']
            
            # Get predicted class
            pred_classes = torch.argmax(logits, dim=1)
            
            # Convert to list and append to predictions
            pred_classes = pred_classes.cpu().tolist()
            predictions.extend(pred_classes)
            
            # Print progress
            if (batch_idx + 1) % 10 == 0 or (batch_idx + 1) == len(test_loader):
                print(f"Processed {batch_idx + 1}/{len(test_loader)} batches")
    
    print("Predictions completed")
    return predictions

# 予測の実行
predictions = predict_test_data(model2, test_loader)
# 数値の予測結果を元の回答文字列に変換
idx2answer = {v: k for k, v in collator.label_map.items()}
predicted_answers = [idx2answer[pred] for pred in predictions]
#　予測ファイルに変換
submission=np.array(predicted_answers)
np.save("submission-sample2.npy", submission)
# 以下はいらない
# 結果の保存
results = []
for idx, pred_answer in enumerate(predicted_answers):
    results.append({
        "question_id": test_dataset.df['question_id'][idx],
        "predicted_answer": pred_answer
    })

# 結果をJSONファイルに保存
with open('test_predictions.json', 'w') as f:
    json.dump(results, f, indent=2)

print("Predictions saved to test_predictions.json")