## Sentence Transformer

In [None]:
import os
os.environ["CUDA_VISIBLE_DEVICES"]="1"

from sentence_transformers import SentenceTransformer
text = "What is the fastest car in the"
text_feature_extractor = SentenceTransformer('bert-large-nli-mean-tokens')
encoder_sentence = text_feature_extractor.encode(text)

## GPT - 2

In [8]:
import os
os.environ["CUDA_VISIBLE_DEVICES"]="1"

import torch
import torch.nn.functional as F
from transformers import GPT2Tokenizer, GPT2Model

tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
tokenizer.pad_token = tokenizer.eos_token
model = GPT2Model.from_pretrained('gpt2').cuda()

In [16]:
sentences = ['tissue name ?', 'what is prograph forcep doing hello?','hello','how you doing']
inputs = tokenizer(sentences, padding=True, truncation=True, return_tensors="pt")

print(inputs['input_ids'])
print(inputs['attention_mask'])

inputs['input_ids'] = inputs['input_ids'].cuda()
inputs['attention_mask'] = inputs['attention_mask'].cuda()

model.eval()
with torch.no_grad():
    outputs = model(**inputs)
    outputs1 = outputs.last_hidden_state.swapaxes(1,2)

outputs1 = F.adaptive_avg_pool1d(outputs1,1)
outputs1 = outputs1.swapaxes(1,2).squeeze(1)
outputs1.size()

tensor([[   83, 21949,  1438,  5633, 50256, 50256, 50256, 50256, 50256],
        [10919,   318,  1172,  1470,  2700,    79,  1804, 23748,    30],
        [31373, 50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256],
        [ 4919,   345,  1804, 50256, 50256, 50256, 50256, 50256, 50256]])
tensor([[1, 1, 1, 1, 0, 0, 0, 0, 0],
        [1, 1, 1, 1, 1, 1, 1, 1, 1],
        [1, 0, 0, 0, 0, 0, 0, 0, 0],
        [1, 1, 1, 0, 0, 0, 0, 0, 0]])


torch.Size([4, 768])

In [None]:
import os
os.environ["CUDA_VISIBLE_DEVICES"]="1"

# Import required libraries
import torch
from pytorch_transformers import GPT2Tokenizer, GPT2LMHeadModel

# Load pre-trained model tokenizer (vocabulary)
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')

# Encode a text inputs
text = "tissue name ?"
indexed_tokens = tokenizer.encode(text)
print(indexed_tokens)

text = "what is prograph forcep doing ?"
indexed_tokens = tokenizer.encode(text)
print(indexed_tokens)

## InferSent

In [None]:
import os
os.environ["CUDA_VISIBLE_DEVICES"]="1"

import nltk
nltk.download('punkt')

import torch
from InferSent.models import InferSent

params_model = {'bsize': 64, 'word_emb_dim': 300, 'enc_lstm_dim': 2048,
                'pool_type': 'max', 'dpout_model': 0.0, 'version': 2}
infersent = InferSent(params_model)
infersent.load_state_dict(torch.load('InferSent/encoder/infersent2.pkl'))
infersent.set_w2v_path('InferSent/fastText/crawl-300d-2M.vec')
infersent.build_vocab_k_words(K=100000)

query = ['I had pizza and pasta', 'I had pizza and pasta', 'I had pizza and pasta']
query_vec =  infersent.encode(query)
print(len(query_vec))

## Rapid evaluation code

In [None]:
import numpy as np

from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix
from sklearn.metrics import average_precision_score

def calc_acc(y_true, y_pred):
    acc = accuracy_score(y_true, y_pred)
    return acc

def calc_classwise_acc(y_true, y_pred):
    matrix = confusion_matrix(y_true, y_pred)
    classwise_acc = matrix.diagonal()/matrix.sum(axis=1)
    return classwise_acc

def calc_map(y_true, y_scores):
    mAP = average_precision_score(y_true, y_scores,average=None)
    return mAP

In [None]:
import os
import glob
from PIL import Image

from torch.utils.data import Dataset
import torchvision.transforms as transforms


class SurgicalVQADataset(Dataset):
    def __init__(self, seq, folder_head, folder_tail, labels, transform=None):
        
        self.transform = transform
        
        # files, question and answers
        filenames = []
        for curr_seq in seq: filenames = filenames + glob.glob(folder_head + str(curr_seq) + folder_tail)
        self.vqas = []
        for file in filenames:
            file_data = open(file, "r")
            lines = [line.strip("\n") for line in file_data if line != "\n"]
            file_data.close()
            for line in lines: self.vqas.append([file, line])
        print('Total files: %d | Total question: %.d' %(len(filenames), len(self.vqas)))
        
        # Labels
        self.labels = labels
        
    def __len__(self):
        return len(self.vqas)

    def __getitem__(self, idx):
        
        # img
        loc = self.vqas[idx][0].split('/')
        img_loc = os.path.join(loc[0],loc[1],loc[2], 'left_frames',loc[-1].split('_')[0]+'.png')
        img = Image.open(img_loc)
        if self.transform: img = self.transform(img)
            
        # question and answer
        question = self.vqas[idx][1].split('|')[0]
        label = self.labels.index(str(self.vqas[idx][1].split('|')[1]))

        return img, question, label

In [None]:
from sentence_transformers import SentenceTransformer
import torch
import torch.nn as nn
from torchvision import models

class Surgical_VQA(nn.Module):
    def __init__(self, num_classes=12):
        super(Surgical_VQA, self).__init__()

        # text processing
        self.text_feature_extractor = SentenceTransformer('bert-base-nli-mean-tokens')
        # image processing
        self.img_feature_extractor = models.resnet50(pretrained=True)
        new_fc = nn.Sequential(*list(self.img_feature_extractor.fc.children())[:-1])
        self.img_feature_extractor.fc = new_fc

        #classifier
        self.classifier = nn.Linear(2816, num_classes)

    def forward(self, img, text):
        img_feature = self.img_feature_extractor(img)
        
        text_feature = self.text_feature_extractor.encode(text)
        text_feature = torch.tensor(text_feature).cuda()
        
        img_text_features = torch.cat((img_feature, text_feature), dim=1)
        
        out = self.classifier(img_text_features)
        return out

In [None]:
import torch.nn.functional as F

def test_model(epoch, model, valid_dataloader):
    
    model.eval()

    total_loss = 0.0    
    label_true = None
    label_pred = None
    label_score = None
    
    criterion = nn.CrossEntropyLoss()
    
    with torch.no_grad():
        for i, (imgs, q, labels) in enumerate(valid_dataloader, 0):
            questions = []
            for question in q: questions.append(question)
            imgs, labels = imgs.cuda(), labels.cuda()
            
            outputs = model(imgs, questions)

            loss = criterion(outputs,labels)
            total_loss += loss.item()
        
            scores, predicted = torch.max(F.softmax(outputs, dim=1).data, 1)    
            label_true = labels.data.cpu() if label_true == None else torch.cat((label_true, labels.data.cpu()), 0)
            label_pred = predicted.data.cpu() if label_pred == None else torch.cat((label_pred, predicted.data.cpu()), 0)
            label_score = scores.data.cpu() if label_score == None else torch.cat((label_score, scores.data.cpu()), 0)

            
    acc, c_acc, mAP = calc_acc(label_true, label_pred), calc_classwise_acc(label_true, label_pred), 0.0#calc_map(label_true, label_score)

    print('Test: epoch: %d loss: %.6f | Acc: %.6f | mAP: %.6f' %(epoch, total_loss, acc, mAP))
    print(c_acc)
    
    return (acc, c_acc, mAP)

In [None]:
import os
import torch

from torchvision import transforms
from torch.utils.data import DataLoader

os.environ["CUDA_VISIBLE_DEVICES"]="1"

def seed_everything(seed=27):
    '''
    Set random seed for reproducible experiments
    Inputs: seed number 
    '''
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    
if __name__ == "__main__":
     
    # Set random seed
    seed_everything()  
    
    # Device Count
    num_gpu = torch.cuda.device_count()
    
    # hyperparameters
    bs = 32
    epochs = 1
    lr = 0.00001
    
    checkpoint_dir = 'checkpoints/v1/simple/'
    
    # train and test dataloader
    train_seq = [2, 3, 4, 6, 7, 9, 10, 11, 12, 14, 15]
    val_seq = [1, 5, 16]
    folder_head = 'dataset/instruments18/seq_'
    folder_tail = '/vqa/simple/*.txt'

    labels = ['kidney',
          'Idle', 'Grasping', 'Retraction', 'Tissue_Manipulation',
          'Tool_Manipulation', 'Cutting', 'Cauterization', 'Suction', 
          'Looping', 'Suturing', 'Clipping', 'Staple', 'Ultrasound_Sensing',
          'left-top', 'right-top', 'left-bottom', 'right-bottom']

    transform = transforms.Compose([
                transforms.Resize((300,256)),
                transforms.ToTensor(),
                transforms.Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224, 0.225])
                ])

    # train_dataset
    train_dataset = SurgicalVQADataset(train_seq, folder_head, folder_tail, labels, transform=transform)
    train_dataloader = DataLoader(dataset=train_dataset, batch_size= bs, shuffle=True)

    # Val_dataset
    val_dataset = SurgicalVQADataset(val_seq, folder_head, folder_tail, labels, transform=transform)
    val_dataloader = DataLoader(dataset=val_dataset, batch_size= bs, shuffle=False)
    
    # model
    model = Surgical_VQA(num_classes=len(labels)).cuda()
    
    best_epoch = [0]
    best_results = [0.0]
    
    # load pre-trained model
    print('Loading pre-trained weights')
    pretrained_model = torch.load(('checkpoints/v1/simple/checkpoint_238_epoch.pth'))
    pretrained_model = pretrained_model['state_dict']
    model.load_state_dict(pretrained_model)
    
    acc, c_acc, mAP = test_model(epochs, model, train_dataloader)
    
#     for epoch in range(1, epochs):
#         train_model(epoch, model, train_dataloader, lr)
#         test_acc = test_model(epoch, model, train_dataloader)
    
#         if test_acc >= best_results[0]:
#             best_results[0] = test_acc
#             best_epoch[0] = epoch
        
#         print('Best epoch: %d | Best acc: %.6f' %(best_epoch[0], best_results[0]))
#         checkpoint = {'lr': lr, 'b_s': bs, 'state_dict': model.state_dict() }
#         save_name = "checkpoint_" + str(epoch) + '_epoch.pth'
        
#         torch.save(checkpoint, os.path.join(checkpoint_dir, save_name))