<a href="https://colab.research.google.com/github/mobarakol/Project_Gen/blob/main/PitVQA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
#Download code
!git clone https://github.com/HRL-Mike/PitVQA.git
!mkdir /content/PitVQA/datasets
%cd /content/PitVQA/datasets

#Download Dataset
!gdown --id 1WGdztykX3nW6pi_BKp4rO8nA7ESNRfVN
# https://drive.google.com/file/d/1FoAEY_u0PTAlrscjEifi2om15A83wL78/view?usp=drive_link

# Unzipping the VQA EndoVis18 Dataset\
!unzip -q EndoVis-18-VQA.zip

%cd /content/PitVQA

Cloning into 'PitVQA'...
remote: Enumerating objects: 104, done.[K
remote: Counting objects: 100% (104/104), done.[K
remote: Compressing objects: 100% (98/98), done.[K
remote: Total 104 (delta 32), reused 0 (delta 0), pack-reused 0[K
Receiving objects: 100% (104/104), 129.06 KiB | 777.00 KiB/s, done.
Resolving deltas: 100% (32/32), done.
/content/PitVQA/datasets
Downloading...
From (original): https://drive.google.com/uc?id=1WGdztykX3nW6pi_BKp4rO8nA7ESNRfVN
From (redirected): https://drive.google.com/uc?id=1WGdztykX3nW6pi_BKp4rO8nA7ESNRfVN&confirm=t&uuid=cf7e7f13-019d-4085-ba4b-dd895002326e
To: /content/PitVQA/datasets/EndoVis-18-VQA.zip
100% 2.70G/2.70G [00:57<00:00, 46.7MB/s]
/content/PitVQA


### model

In [4]:
import torch
from torch import nn
import torch.nn.functional as F
from transformers import GPT2Tokenizer, GPT2Model, ViTModel
from transformers import BertConfig, BertModel

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


class PitVQANet(nn.Module):
    def __init__(self,
           med_config='/content/PitVQA/config.json',  # change to your abs path
           num_class=59,  # 18/59
        ):
        super().__init__()

        # visual encoder
        model_name = "google/vit-base-patch16-224-in21k"
        self.visual_encoder = ViTModel.from_pretrained(model_name)
        vision_width = 768

        # tokenizer
        self.tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
        self.tokenizer.pad_token = self.tokenizer.eos_token  # end of string

        # text encoder
        # encoder_config = BertConfig.from_json_file(med_config)
        # encoder_config.vocab_size = self.tokenizer.vocab_size  # 30524 --> 50257
        # encoder_config.encoder_width = vision_width
        # self.text_encoder = BertModel(config=encoder_config, add_pooling_layer=False)

        self.text_encoder = BertModel.from_pretrained("bert-base-uncased")

        # decoder
        self.gpt_decoder = GPT2Model.from_pretrained('gpt2')

        # intermediate layers
        self.intermediate_layer = nn.Linear(768, 512)
        self.se_layer = nn.Sequential(
            nn.Linear(512, 512),
            nn.Sigmoid()
        )
        self.LayerNorm = nn.BatchNorm1d(512)
        self.dropout = nn.Dropout(0.2)

        # classifier
        self.classifier = nn.Linear(512, num_class)

    def forward(self, image, question):
        image = image.to(device)

        # visual encoder
        image_embeds = self.visual_encoder(image).last_hidden_state
        image_atts = torch.ones(image_embeds.size()[:-1], dtype=torch.long).to(image.device)

        # text encoder
        encoder_question = self.tokenizer(question, return_tensors="pt", truncation=True,
                                          padding='max_length', max_length=25).to(image.device)

        text_output = self.text_encoder(input_ids=encoder_question.input_ids,
                                        attention_mask=encoder_question.attention_mask,
                                        encoder_hidden_states=image_embeds,
                                        encoder_attention_mask=image_atts,
                                        return_dict=True)

        text_embeds = text_output.last_hidden_state

        # text decoder
        gpt_output = self.gpt_decoder(inputs_embeds=text_embeds,
                                      encoder_attention_mask=encoder_question.attention_mask)
        decoder_output = gpt_output.last_hidden_state

        # average pool
        decoder_output = decoder_output.swapaxes(1, 2)
        decoder_output = F.adaptive_avg_pool1d(decoder_output, 1)
        decoder_output = decoder_output.swapaxes(1, 2).squeeze(1)

        out = self.intermediate_layer(decoder_output)
        out = torch.mul(out, self.se_layer(out))
        out = self.LayerNorm(out)
        out = self.dropout(out)

        # classification layer
        out = self.classifier(out)

        return out

### main

In [5]:
import os
import torch
import argparse
import torch.utils.data
import torch.nn.functional as F
import numpy as np
import random

from torch import nn
from utils import save_clf_checkpoint, adjust_learning_rate, calc_acc, calc_precision_recall_fscore, calc_classwise_acc
from torch.utils.data import DataLoader

from dataloader import EndoVis18VQAGPTClassification

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

def seed_everything(seed=3407):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    np.random.seed(seed)
    random.seed(seed)

def train(train_dataloader, model, criterion, optimizer, epoch, device):
    model.train()
    total_loss = 0.0
    label_true = None
    label_pred = None
    label_score = None

    for i, (_, images, questions, labels) in enumerate(train_dataloader, 0):

        # labels
        labels = labels.to(device)
        outputs = model(image=images.to(device), question=questions)  # questions is a tuple
        loss = criterion(outputs, labels)  # calculate loss
        optimizer.zero_grad()
        loss.backward()  # calculate gradient
        optimizer.step()  # update parameters

        # print statistics
        total_loss += loss.item()

        scores, predicted = torch.max(F.softmax(outputs, dim=1).data, 1)
        if label_true is None:  # accumulate true labels of the entire training set
            label_true = labels.data.cpu()
        else:
            label_true = torch.cat((label_true, labels.data.cpu()), 0)
        if label_pred is None:  # accumulate pred labels of the entire training set
            label_pred = predicted.data.cpu()
        else:
            label_pred = torch.cat((label_pred, predicted.data.cpu()), 0)
        if label_score is None:
            label_score = scores.data.cpu()
        else:
            label_score = torch.cat((label_score, scores.data.cpu()), 0)

    # loss and acc
    acc, c_acc = calc_acc(label_true, label_pred), calc_classwise_acc(label_true, label_pred)
    precision, recall, f_score = calc_precision_recall_fscore(label_true, label_pred)
    print(f'Train: epoch: {epoch} loss: {total_loss} | Acc: {acc} | '
          f'Precision: {precision} | Recall: {recall} | F1 Score: {f_score}')
    return acc

def validate(val_loader, model, criterion, epoch, device):
    model.eval()
    total_loss = 0.0
    label_true = None
    label_pred = None
    label_score = None
    file_names = list()

    with torch.no_grad():
        for i, (file_name, images, questions, labels) in enumerate(val_loader, 0):
            # label
            labels = labels.to(device)

            # model forward pass
            outputs = model(image=images.to(device), question=questions)

            # loss
            loss = criterion(outputs, labels)
            total_loss += loss.item()

            scores, predicted = torch.max(F.softmax(outputs, dim=1).data, 1)
            label_true = labels.data.cpu() if label_true is None else torch.cat((label_true, labels.data.cpu()), 0)
            label_pred = predicted.data.cpu() if label_pred is None else torch.cat((label_pred, predicted.data.cpu()), 0)
            label_score = scores.data.cpu() if label_score is None else torch.cat((label_score, scores.data.cpu()), 0)
            for f in file_name:
                file_names.append(f)  # not used

    acc = calc_acc(label_true, label_pred)
    c_acc = 0.0
    precision, recall, f_score = calc_precision_recall_fscore(label_true, label_pred)
    print(f'Test: epoch: {epoch} test loss: {total_loss} | test acc: {acc} | '
          f'test precision: {precision} | test recall: {recall} | test F1: {f_score}')
    return acc, c_acc, precision, recall, f_score

if __name__ == '__main__':

    epochs = 50
    batch_size = 20
    random_seed = 21
    lr = 0.00002
    checkpoint_dir = '/content/PitVQA/checkpoints/test_pit'
    question_len = 25
    num_class = 18

    os.makedirs('./checkpoints/', exist_ok=True)

    seed_everything(random_seed)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    start_epoch = 1
    best_epoch = [0]
    best_results = [0.0]
    epochs_since_improvement = 0

    # data location
    # fold-1
    train_seq = [2, 3, 4, 6, 7, 9, 10, 11, 12, 14, 15]  # 11
    val_seq = [1, 5, 16]
    print(f'current train seq: {train_seq}')
    print(f'current val seq: {val_seq}')

    folder_head = '/content/PitVQA/datasets/EndoVis-18-VQA/seq_'  # set your path
    folder_tail = '/vqa/Classification/*.txt'

    # dataloader
    train_dataset = EndoVis18VQAGPTClassification(train_seq, folder_head, folder_tail)
    train_dataloader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True, num_workers=8)
    val_dataset = EndoVis18VQAGPTClassification(val_seq, folder_head, folder_tail)
    val_dataloader = DataLoader(dataset=val_dataset, batch_size=batch_size, shuffle=False, num_workers=8)

    model = PitVQANet()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    model = model.to(device)
    criterion = nn.CrossEntropyLoss().to(device)

    print('Start training.')
    for epoch in range(start_epoch, epochs+1):
        if epochs_since_improvement > 0 and epochs_since_improvement % 5 == 0:
            adjust_learning_rate(optimizer, 0.8)

        # train
        train_acc = train(train_dataloader=train_dataloader, model=model, criterion=criterion,
                          optimizer=optimizer, epoch=epoch, device=device)
        # validation
        test_acc, test_c_acc, test_precision, test_recall, test_f_score \
            = validate(val_loader=val_dataloader, model=model,
                       criterion=criterion, epoch=epoch, device=device)

        if test_acc >= best_results[0]:
            print('Best Epoch:', epoch)
            epochs_since_improvement = 0
            best_results[0] = test_acc
            best_epoch[0] = epoch
            save_clf_checkpoint(checkpoint_dir, epoch, epochs_since_improvement,
                      model, optimizer, best_results[0], final_args=None)
    print('End training.')

current train seq: [2, 3, 4, 6, 7, 9, 10, 11, 12, 14, 15]
current val seq: [1, 5, 16]
Total files: 1560 | Total question: 9014
Total files: 447 | Total question: 2769




config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/548M [00:00<?, ?B/s]

Start training.


FileNotFoundError: Caught FileNotFoundError in DataLoader worker process 0.
Original Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/torch/utils/data/_utils/worker.py", line 308, in _worker_loop
    data = fetcher.fetch(index)  # type: ignore[possibly-undefined]
  File "/usr/local/lib/python3.10/dist-packages/torch/utils/data/_utils/fetch.py", line 51, in fetch
    data = [self.dataset[idx] for idx in possibly_batched_index]
  File "/usr/local/lib/python3.10/dist-packages/torch/utils/data/_utils/fetch.py", line 51, in <listcomp>
    data = [self.dataset[idx] for idx in possibly_batched_index]
  File "/content/PitVQA/dataloader.py", line 52, in __getitem__
    raw_image = Image.open(img_loc).convert('RGB')
  File "/usr/local/lib/python3.10/dist-packages/PIL/Image.py", line 3227, in open
    fp = builtins.open(filename, "rb")
FileNotFoundError: [Errno 2] No such file or directory: '/content/PitVQA/datasets/EndoVis-18-VQA/seq_7/left_fr/frame018.png'
