In [None]:
import kagglehub
yogeshwartheboss_cholect50_path = kagglehub.dataset_download('yogeshwartheboss/cholect50')

print('Data source import complete.')

In [None]:
import os
import cv2
import sys
import json
import time
import torch
import random
import platform
import numpy as np
from torch import nn
from PIL import Image
import torch.optim as optim
import torch.nn.functional as F
import torchvision.models as models
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
from torch.utils.data import Dataset, ConcatDataset, DataLoader

In [None]:
#hyperparameters
data_dir= '/root/.cache/kagglehub/datasets/yogeshwartheboss/cholect50/versions/1/CholecT50'
batch_size= 32
epochs= 40
OUT_HEIGHT = 8
OUT_WIDTH  = 14

In [None]:
class mymodel(nn.Module):
    def __init__(self, num_tool=6, num_verb=10, num_target=15, num_triplet=100, layer_size=5, num_class=100, use_ln=True):
        super(mymodel, self).__init__()
        self.basemodel= Backbone()
        self.model1= Model1(num_tool)
        self.model2= Model2(num_tool, num_verb, num_target)
        self.model3= Model3(num_verb)
        self.channelattention = nn.ModuleList([ChannelAttention() for i in range(layer_size)])
        self.ffn = nn.ModuleList([FFN() for i in range(layer_size)])
        self.classifier = Classifier(num_class)

    def forward(self, x):
        x_i, x_t, x_v = self.basemodel(x)
        H_i = self.model1(x_i)
        H_t = self.model2(x_t, H_i[0])
        H_v = self.model3(x_v, H_t[0])
        for C, F in zip(self.channelattention, self.ffn):
            X = C(H_i[0], H_t[0], H_v[0])
            X = F(X)
        logits = self.classifier(X)
        return H_i, H_t, H_v, logits


class Backbone(nn.Module):
    def __init__(self):
        super(Backbone, self).__init__()
        self.output_feature = {} 
        self.backbone      = models.resnet18(pretrained=True)
        self.increase_resolution()
        self.backbone.layer1[1].bn2.register_forward_hook(self.get_activation('low_level_feature'))
        self.backbone.layer2[1].bn2.register_forward_hook(self.get_activation('mid_level_feature'))
        self.backbone.layer4[1].bn2.register_forward_hook(self.get_activation('high_level_feature'))       

    def increase_resolution(self):  
        global OUT_HEIGHT, OUT_WIDTH
        self.backbone.layer2[0].conv1.stride = (1,1)
        self.backbone.layer2[0].downsample[0].stride=(1,1)  
        self.backbone.layer4[0].conv1.stride = (1,1)
        self.backbone.layer4[0].downsample[0].stride=(1,1)
        OUT_HEIGHT *= 4
        OUT_WIDTH  *= 4
        print("using high resolution output ({}x{})".format(OUT_HEIGHT,OUT_WIDTH))      

    def get_activation(self, layer_name):
        def hook(module, input, output):
            self.output_feature[layer_name] = output
        return hook
    
    def forward(self, x):
        _ = self.backbone(x)
        return self.output_feature['high_level_feature'], self.output_feature['mid_level_feature'], self.output_feature['low_level_feature']

class Model1(nn.Module):
    def __init__(self, num_tool):
        super(Model1, self).__init__()
        self.Conv1 = nn.Conv2d(in_channels= 512, out_channels=64, kernel_size=3, padding=1)
        self.cam   = nn.Conv2d(in_channels=64, out_channels=num_tool, kernel_size=1)
        self.elu   = nn.ELU()
        self.bn    = nn.BatchNorm2d(64)
        self.gmp   = nn.AdaptiveMaxPool2d((1,1))

    def forward (self, x):
        x = self.Conv1(x)
        x = self.bn(x)
        x = self.elu(x)
        cam = self.cam(x)
        logits  = self.gmp(cam).squeeze(-1).squeeze(-1)
        return cam, logits

class Model2(nn.Module):
    def __init__(self, num_tool, num_verb, num_target):
        super(Model2, self).__init__()
        self.Conv1 = nn.Conv2d(in_channels=128, out_channels=64, kernel_size=3, padding=1, stride=(2,2))
        self.Conv2 = nn.Conv2d(in_channels=70, out_channels=32, kernel_size=1, padding=0)
        self.cam   = nn.Conv2d(in_channels=32, out_channels=num_target, kernel_size=1)
        self.elu   = nn.ELU()
        self.bn1    = nn.BatchNorm2d(64)
        self.bn2    = nn.BatchNorm2d(32)
        self.gmp   = nn.AdaptiveMaxPool2d((1,1))
    
    def get_inp (self, raw_t, cam):
        M2_inp = torch.cat((raw_t, cam), dim =1)
        return M2_inp
    
    def forward (self, x, cam):
        x = self.Conv1(x)
        x = self.bn1(x)
        x = self.elu(x)
        x = self.get_inp(x, cam)
        x = self.Conv2(x)
        x = self.bn2(x)
        x = self.elu(x)
        cam_t = self.cam(x)
        logits_t  = self.gmp(cam_t).squeeze(-1).squeeze(-1)
        return cam_t, logits_t

class Model3(nn.Module):
    def __init__(self,num_verb):
        super(Model3, self).__init__()
        self.Conv1 = nn.Conv2d(in_channels=64, out_channels=32, kernel_size=3, padding=1, stride=(2,2))
        self.Conv2 = nn.Conv2d(in_channels=47, out_channels=16, kernel_size=1, padding=0)
        self.cam   = nn.Conv2d(in_channels=16, out_channels=num_verb, kernel_size=1)
        self.elu   = nn.ELU()
        self.bn1    = nn.BatchNorm2d(32)
        self.bn2    = nn.BatchNorm2d(16)
        self.gmp   = nn.AdaptiveMaxPool2d((1,1))
    
    def get_inp (self, raw_v, cam_t):
        M3_inp = torch.cat((raw_v, cam_t), dim =1)
        return M3_inp

    def forward (self, x, cam):
        x = self.Conv1(x)
        x = self.bn1(x)
        x = self.elu(x)
        x = self.get_inp(x, cam)
        x = self.Conv2(x)
        x = self.bn2(x)
        x = self.elu(x) 
        cam_v = self.cam(x)
        logits_v  = self.gmp(cam_v).squeeze(-1).squeeze(-1)
        return cam_v, logits_v

class ChannelAttention(nn.Module):
    def __init__(self, num_class=100):
        super(ChannelAttention, self).__init__()
        
        self.pool = nn.AdaptiveAvgPool2d(1) 
        
        self.fc1 = nn.Conv2d(in_channels=31, out_channels =128, kernel_size=1, stride=1, padding=0)
        self.fc2 = nn.Conv2d(in_channels=128, out_channels=num_class, kernel_size=1, stride=1, padding=0)
        
        self.sigmoid = nn.Sigmoid()

    def forward(self, cam_i, cam_t, cam_v):
        x = torch.cat((cam_i, cam_t, cam_v), dim =1 )
        gap = self.pool(x)

        x = F.relu(self.fc1(gap))
        x = self.fc2(x)
        
        attention_weights = self.sigmoid(x)
        
        return x * attention_weights

class FFN(nn.Module):
    def __init__(self, num_class=100, use_ln=True):
        super(FFN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=num_class, out_channels=num_class, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(in_channels=num_class, out_channels=num_class, kernel_size=1) 
        self.elu1  = nn.ELU() 
        self.elu2  = nn.ELU()    
        self.bn1   = nn.BatchNorm2d(num_class)    
        self.bn2   = nn.BatchNorm2d(num_class)
        self.ln    = nn.BatchNorm2d(num_class)

    def forward(self, inputs):
        x  = self.elu1(self.bn1(self.conv1(inputs)))
        x  = self.elu2(self.bn2(self.conv2(x)))
        x  = self.ln(x + inputs.clone())
        return x

class Classifier(nn.Module):
    def __init__(self, num_class=100):
        super(Classifier, self).__init__()
        self.gmp = nn.AdaptiveMaxPool2d((1,1)) 
        self.mlp = nn.Linear(in_features=num_class, out_features=num_class)     
        
    def forward(self, inputs):
        x = self.gmp(inputs).squeeze(-1).squeeze(-1)
        y = self.mlp(x)
        return y

In [None]:
def generate_bbox_for_instrument(cam):
    # Normalize cam values to range [0, 1]
    cam_normalized = (cam - cam.min()) / (cam.max() - cam.min())
    
    threshold = 0.5
    binary_mask = cam_normalized > threshold
    
    # Convert binary mask to uint8 format, moving the tensor to CPU
    binary_mask_uint8 = (binary_mask.cpu().numpy() * 255).astype(np.uint8)

    # Ensure it's a 2D array (single channel)
    if binary_mask_uint8.ndim != 2:
        raise ValueError("binary_mask_uint8 must be single-channel (2D array).")

    # Find contours
    contours, _ = cv2.findContours(binary_mask_uint8, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    
    most_activated_bbox = None
    max_activation_score = -1
    max_poss_activation_score = -1  # Initialize at the start

    for contour in contours:
        mask = np.zeros_like(cam.detach().cpu(), dtype=np.uint8)
        score = np.ones_like(cam.detach().cpu(), dtype=np.uint8)
        cv2.drawContours(mask, [contour], -1, 255, thickness=-1)
        
        activation_score = torch.sum(cam_normalized[mask > 0].detach().cpu())
        max_poss_activation_score = np.sum(score[mask > 0])  # Possible max score in this region
        
        if activation_score > max_activation_score:
            max_activation_score = activation_score
            x, y, w, h = cv2.boundingRect(contour)
            most_activated_bbox = (x, y, x + w, y + h)

    max_activation_score = max_activation_score / max_poss_activation_score

    # Rescaling box coordinates according to the original image size
    original_width, original_height = 854, 480
    scale_x = original_width / cam.shape[1]
    scale_y = original_height / cam.shape[0]

    # Rescale bounding box
    if most_activated_bbox:
        x, y, w, h = most_activated_bbox
        rescaled_box = [
            (int(x * scale_x), int(y * scale_y), int(w * scale_x), int(h * scale_y))
        ]
    else:
        rescaled_box = []

    return rescaled_box, max_activation_score

def compute_act_scores(cams):
    # Initialize act_scores as a 2D array (32, 6)
    act_scores = np.zeros((cams.shape[0], 6))
    
    # Loop through each cam (assuming cams shape is [32, 6, H, W])
    for i in range(cams.shape[0]):
        for j in range(6):
            bbox, act_score = generate_bbox_for_instrument(cams[i][j])
            act_scores[i, j] = act_score  # Store the activation score
    return act_scores



def make_dic(predicted_data):
        dic = {}

        for frame_id in range(len(predicted_data["triplet"])):
            dic[str(frame_id)] = {}
            new_dic = {}
            new_dic["recognition"] = predicted_data["triplet"][frame_id]

            threshold = 0.5
            activated_classes = (new_dic["recognition"] > threshold).nonzero(as_tuple=True)[0]
            predicted_labels = np.zeros(100)

            activated_classes_instrument = (predicted_data["instrument"] > threshold).nonzero(as_tuple=True)[0]
        
            for j in activated_classes:
                predicted_labels[j] = 1        

            new_dic["detection"] = []

            for p in activated_classes:
            
                if (p>=0) and (p<=21 or p==94): g=0
                elif (p>=22) and (p<=45 or p==95): g=1
                elif (p>=46) and (p<=64 or p==96): g=2
                elif (p>=65) and (p<=76 or p==97): g=3
                elif (p>=77) and (p<=81 or p==98): g=4
                elif (p>=82) and (p<=93 or p==99): g=5

                one_more_dic = {}
                one_more_dic["triplet"] = p
            
                bbox = generate_bbox_for_instrument(predicted_data["CAM_instrument"][frame_id][g])
            
                one_more_dic["instrument"] = [g, predicted_data["CAM_instrument"][frame_id][g], bbox[0], bbox[1], bbox[2], bbox[3]]
                new_dic["detection"].append(one_more_dic)

        return dic

In [None]:
class CholecT50():
    def __init__(self, dataset_dir, 
                 augmentation_list=['original', 'vflip', 'hflip', 'contrast', 'rot90']):
        self.dataset_dir = dataset_dir
        self.train_records = ['VID{}'.format(str(v).zfill(2)) for v in [1, 2, 4, 5, 6, 8, 10, 12]]
        self.val_records   = ['VID{}'.format(str(v).zfill(2)) for v in [13, 14]]
        self.test_records  = ['VID{}'.format(str(v).zfill(2)) for v in [92, 96, 103, 110, 111]]
        self.augmentations = {
            'original': self.no_augmentation,
            'vflip': transforms.RandomVerticalFlip(0.4),
            'hflip': transforms.RandomHorizontalFlip(0.4),
            'contrast': transforms.ColorJitter(brightness=0.1, contrast=0.2, saturation=0, hue=0),
            'rot90': transforms.RandomRotation(90,expand=True),
            'brightness': transforms.RandomAdjustSharpness(sharpness_factor=1.6, p=0.5),
            'autocontrast': transforms.RandomAutocontrast(p=0.5),
        }
        self.augmentation_list = []
        for aug in augmentation_list:
            self.augmentation_list.append(self.augmentations[aug])
        trainform, testform = self.transform()
        self.build_train_dataset(trainform)
        self.build_val_dataset(trainform)
        self.build_test_dataset(testform)

    def no_augmentation(self, x):
        return x
   
    def transform(self):
        normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        op_test   = [transforms.Resize((256, 448)), transforms.ToTensor(), normalize,]
        op_train  = [transforms.Resize((256, 448))] + self.augmentation_list + [transforms.Resize((256, 448)), transforms.ToTensor(), normalize,]
        testform  = transforms.Compose(op_test)
        trainform = transforms.Compose(op_train)
        return trainform, testform

    def build_train_dataset(self, transform):
        iterable_dataset = []
        for video in self.train_records:
            dataset = T50(img_dir = os.path.join(self.dataset_dir, 'videos', video), 
                        label_file = os.path.join(self.dataset_dir, 'labels', '{}.json'.format(video)), 
                        transform=transform)
            iterable_dataset.append(dataset)
        self.train_dataset = ConcatDataset(iterable_dataset)
    
    def build_val_dataset(self, transform):
        iterable_dataset = []
        for video in self.val_records:
            dataset = T50(img_dir = os.path.join(self.dataset_dir, 'videos', video), 
                        label_file = os.path.join(self.dataset_dir, 'labels', '{}.json'.format(video)), 
                        transform=transform)
            iterable_dataset.append(dataset)
        self.val_dataset = ConcatDataset(iterable_dataset)

    def build_test_dataset(self, transform):
        iterable_dataset = []
        for video in self.test_records:
            dataset = T50(img_dir = os.path.join(self.dataset_dir, 'videos', video), 
                        label_file = os.path.join(self.dataset_dir, 'labels', '{}.json'.format(video)), 
                        transform=transform)
            iterable_dataset.append(dataset)
        self.test_dataset = iterable_dataset

    def build(self):
        return   (self.train_dataset, self.val_dataset, self.test_dataset)


class T50(Dataset):
    def __init__(self, img_dir, label_file, transform=None, target_transform=None):
        self.img_dir = img_dir
        self.label_file_path = label_file
        self.transform = transform
        self.target_transform = target_transform
    def __len__(self):
        data_json = json.load(open(self.label_file_path, "r"))
        return len(data_json["annotations"])
    
    def __getitem__(self, index):
        basename = "{}.png".format(str(index).zfill(6))
        img_path = os.path.join(self.img_dir, basename)
        imagey    = Image.open(img_path)
        image = self.transform(imagey)
        
        with open(self.label_file_path, "r") as f:
            data = json.load(f)
            
            array = data["annotations"][str(index)]
            triplet_label = torch.zeros(100, dtype=torch.int)
            instrument_label = torch.zeros(6, dtype=torch.int)
            verb_label = torch.zeros(10, dtype=torch.int)
            target_label = torch.zeros(15, dtype=torch.int)
            
            for a in array:
                triplet_label[a[0]] = 1 if a[0]!= -1 else 0
                instrument_label[a[1]] = 1 if a[1]!= -1 else 0
                verb_label[a[7]] = 1 if a[7]!= -1 else 0
                target_label[a[8]] = 1 if a[8]!= -1 else 0
            
        return image, (triplet_label, instrument_label, verb_label, target_label)

In [None]:
def train_loop(dataloader, model, activation, loss_fn_i, loss_fn_v, loss_fn_t, loss_fn_ivt, loss_association, epoch):
    start = time.time()
    running_loss=0.0
    for batch, (img, (y_triplet, y_instrument, y_verb, y_target)) in enumerate(dataloader):
        img, y_triplet, y_instrument, y_verb, y_target = img.cuda(), y_triplet.cuda(), y_instrument.cuda(), y_verb.cuda(), y_target.cuda()
        model.train()
        instrument, target, verb, triplet = model(img)
        cam_i, logit_i  = instrument
        cam_v, logit_v  = verb
        cam_t, logit_t  = target
        logit_ivt = triplet

        act_values = compute_act_scores(cam_i)
        act_values_tensor = torch.from_numpy(act_values)

        
        loss_i          = loss_fn_i(logit_i, y_instrument.float())
        loss_v          = loss_fn_v(logit_v, y_verb.float())
        loss_t          = loss_fn_t(logit_t, y_target.float())
        loss_ivt        = loss_fn_ivt(logit_ivt, y_triplet.float())
        loss_ass        = loss_association(act_values_tensor.cuda(), y_instrument.float())

        loss            = (loss_i) + (loss_v) + (loss_t) + loss_ivt + loss_ass
        
        for param in model.parameters():
            param.grad = None
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        if batch % 50 == 0:
            print(f"Epoch [{epoch+1}/{epochs}], Step [{batch+1}/{len(dataloader)}], Loss: {loss.item():.4f}")
    epoch_loss = running_loss / len(dataloader)
    print(f"Epoch [{epoch+1}/{epochs}], Average Loss: {epoch_loss:.4f}")
    print(f'completed | Losses => i: [{loss_i.item():.4f}] v: [{loss_v.item():.4f}] t: [{loss_t.item():.4f}] ivt: [{loss_ivt.item():.4f}]')

In [None]:
def test_loop(dataloader, model, activation, final_eval=True):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)

    predictions = {"instrument": [], "verb": [], "target": [], "triplet": [], "CAM_instrument":[]}

    with torch.no_grad():
        for batch, (img, (y_triplet, y_instrument, y_verb, y_target)) in enumerate(dataloader):
            img, y_triplet, y_instrument, y_verb, y_target = img.cuda(), y_triplet.cuda(), y_instrument.cuda(), y_verb.cuda(), y_target.cuda()            
            model.eval()  
            instrument, target, verb, triplet = model(img)
            if final_eval:
                cam_i, logit_i = instrument
                cam_v, logit_v = verb
                cam_t, logit_t = target

                predictions["instrument"].append(activation(logit_i).cpu())
                predictions["verb"].append(activation(logit_v).cpu())
                predictions["target"].append(activation(logit_t).cpu())
                predictions["CAM_instrument"].append(cam_i.cpu())
                
            predictions["triplet"].append(activation(triplet).cpu())

    return predictions

In [None]:
dataloader_class = CholecT50( 
            dataset_dir=data_dir,
            augmentation_list=['original', 'vflip', 'hflip', 'contrast', 'rot90'],
            )

train_dataset, val_dataset, test_dataset = dataloader_class.build()

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, prefetch_factor=3*batch_size, num_workers=2, pin_memory=True, persistent_workers=True, drop_last=False)
val_dataloader   = DataLoader(val_dataset, batch_size=batch_size, shuffle=True, prefetch_factor=3*batch_size, num_workers=2, pin_memory=True, persistent_workers=True, drop_last=False)

test_dataloaders = []
for video_dataset in test_dataset:
    test_dataloader = DataLoader(video_dataset, batch_size=batch_size, shuffle=False, prefetch_factor=3*batch_size, num_workers=2, pin_memory=True, persistent_workers=True, drop_last=False)
    test_dataloaders.append(test_dataloader)
print("Dataset loaded ...")

In [None]:
model = mymodel(use_ln=True).cuda()

activation  = nn.Sigmoid()
loss_fn_i   = nn.BCEWithLogitsLoss()
loss_fn_v   = nn.BCEWithLogitsLoss()
loss_fn_t   = nn.BCEWithLogitsLoss()
loss_fn_ivt = nn.BCEWithLogitsLoss()
loss_association = nn.BCEWithLogitsLoss()

optimizer = optim.SGD(model.parameters(), lr=0.001)

In [None]:
for epoch in range(0, epochs):
    try:
        print("Traning | epoch {}".format(epoch), end=" | ")  
        train_loop(train_dataloader, model, activation, loss_fn_i, loss_fn_v, loss_fn_t, loss_fn_ivt, loss_association, epoch) 
           
    except KeyboardInterrupt:
        print(f'>> Process cancelled by user ...')   
        sys.exit(1)

In [None]:
vid_index = ["VID92", "VID96", "VID103", "VID110", "VID111"]
i=0
dic_data = {}

for test_dataloader in test_dataloaders:
    predicted_data = test_loop(test_dataloader, model, activation, final_eval=True)
    dic_data[vid_index[i]] = make_dic(predicted_data)
        
    i+=1

with open(f"mymodel.json", 'w') as json_file:
    json.dump(dic_data, json_file, indent=4)