# Suturing Chalange - Task1 Pipeline

In [None]:
# Import Libraries 
import os
import sys
import pandas as pd
import cv2
import numpy as np
import torch
from torchinfo import summary
from torch.nn import Sigmoid, ReLU, Softmax, Module, Linear
from torch.optim import SGD, Adam
from torch.nn.init import xavier_uniform_, zeros_
from torchvision import transforms
from PIL import Image, ImageDraw
from pathlib import Path
from ultralytics import YOLO
from sklearn.decomposition import PCA
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.model_selection  import train_test_split
from sklearn.metrics import classification_report, f1_score, accuracy_score, mean_squared_error
import matplotlib.pyplot as plt
from pathlib import Path
from ultralytics import YOLO
import matplotlib.pyplot as plt
from PIL import Image
from matplotlib.patches import Rectangle
import seaborn as sns
from matplotlib.colors import to_rgba_array
from glob import glob
from livelossplot import PlotLosses
import monai
from monai.apps import download_and_extract
from monai.config import print_config
from monai.data import Dataset, DataLoader, ImageDataset, CacheDataset, NumpyReader, CSVDataset
from monai.transforms import (
    Activations,
    AddChanneld,
    Compose,
    CastToTyped,
    EnsureType,
    LoadImaged,
    EnsureTyped,
    ToTensor
)

pin_memory = torch.cuda.is_available()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
def seed_everything(TORCH_SEED):
    random.seed(TORCH_SEED)
    os.environ['PYTHONHASHSEED'] = str(TORCH_SEED)
    np.random.seed(TORCH_SEED)
    torch.manual_seed(TORCH_SEED)
    torch.cuda.manual_seed_all(TORCH_SEED)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

seed_everything(1)

## Step 1 - check data

In [None]:
Dataset_base_path = "./Dataset/Train"
Csv_file_path = f"{Dataset_base_path}/OSATS.csv"

df = pd.read_csv(Csv_file_path, sep = ";")
df

## Step 2 - Feature extraction
2 Yolo versions were experimented with:
- Yolo v8 - follow step 2.1
- Yolo v5 - follow step 2.2

## Step 2.1 - Prepare Yolo model v8 to extract features

In [None]:
#####################  YOLO Functions  #####################
# Define hook function
def hook_fn(module, input, output):
    intermediate_features.append(output)

# Define feature extraction function
def extract_features(model, img, layer_index = 20): ##Choose the layer that fit your application
    global intermediate_features
    intermediate_features = []
    hook = model.model.model[layer_index].register_forward_hook(hook_fn)
    #print(hook)
    with torch.no_grad():
        model(img)
    hook.remove()
    return intermediate_features[0]  # Access the first element of the list

# Make sure to preprocess the image since the input image must be 640x640x3
def preprocess_image(img_path):
    transform = transforms.Compose([
        transforms.Resize((640, 640)),
        transforms.Grayscale(num_output_channels = 3),  # Convert to RGB
        transforms.ToTensor(),
        transforms.Normalize(mean = 0., std = 1.)
    ])
    if(type(img_path) == type(Path())):
        img = Image.open(img_path)
    else:
        img = Image.fromarray(img_path)
    img = transform(img)
    img = img.unsqueeze(0)
    
    return img

# Plot the Features extracted
def extract_and_plot_features(img_path, layer_index=20, channel_index=5):
    model = YOLO(Path("./model/yolov8n.pt"))
    img = preprocess_image(img_path)
    features = extract_features(model, img, layer_index)

        # Print the shape of the features
    print(f"Features shape for {img_path.name if(type(img_path) == type(Path())) else 'this frame'}: {features.shape}")

        # Plot the features as a heatmap for a specific channel
    plt.figure(figsize=(10, 5))
    sns.heatmap(features[0][channel_index].cpu().numpy(), cmap='viridis', annot=False)
    plt.title(f'Features for {img_path.name if(type(img_path) == type(Path())) else "this frame"} - Layer {layer_index} - Channel {channel_index}')
    plt.show()

def get_features(img, layer_index = 20):
    model = YOLO(Path("./model/yolov8n.pt"))
    return extract_features(model, preprocess_image(img), layer_index)

## Step 2.2 - Prepare Yolo model v5 to extract features

In [None]:
# Define feature extraction function
def get_features_from_layer(model, image_tensor, layer_idx=-2):
    """
    Extract features from a specific layer.
    
    Parameters:
    - model: The loaded YOLOv5 model.
    - image_tensor: The input image tensor with shape [C, H, W] and batch dimension added.
    - layer_idx: Index of the layer from which to extract features. Default is -2, the second to last layer.
    
    Returns:
    - features: The extracted features from the specified layer.
    """
    # Ensure model is in evaluation mode
    model.eval()
    
    # Hook to capture the output of the specified layer
    features = []
    def hook_fn(module, input, output):
        features.append(output)
    
    # Register the hook to the desired layer
    handle = list(model.model.model.children())[0][-2].register_forward_hook(hook_fn)
    
    # Perform a forward pass to get the features
    with torch.no_grad():
        _ = model(image_tensor)  # Add batch dimension if not present
    
    # Remove the hook
    handle.remove()
    
    # Return the features
    return features[0]

# Make sure to preprocess the image since the input image must be 640x640x3
def preprocess_image(img_path):
    transform = transforms.Compose([
        transforms.Resize((320, 320)),
        transforms.Grayscale(num_output_channels = 3),  # Convert to RGB
        transforms.ToTensor(),
        transforms.Normalize(mean = 0., std = 1.)
    ])
    if(type(img_path) == type(Path())):
        img = Image.open(img_path)
    else:
        img = Image.fromarray(img_path)
    img = transform(img)
    img = img.unsqueeze(0)
    return img
    
def get_features(img, layer_index=-2):
    pre_process_img = preprocess_image(img)
    print(f"Pre processed image shape: {pre_process_img.shape}")
    return get_features_from_layer(model=model, image_tensor=pre_process_img, layer_idx=-2)

## Step 3 - Run trough the videos and extract the features

In [None]:
#####################  OpenCV Functions  #####################
def process_video(video_name, clip_minutes=5):
    cap = cv2.VideoCapture(f'{video_name}.mp4')  # Create a VideoCapture object and read from input file
    video_frame_features = []
    
    # Check if capture opened successfully
    if not cap.isOpened():
        print("Error opening video stream or file")
        return None

    # Get frame rate (frames per second)
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    print(f'Frames per second: {fps} FPS')

    # Total frames for the first 5 minutes (clip_minutes * 60 seconds * fps)
    max_frames = int(clip_minutes * 60 * fps)
    
    # Get the total number of frames in the video
    frames_total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    print(f'Frame count in video: {frames_total}')

    # Ensure we don't process more than the total number of frames in the video
    frames_to_process = min(max_frames, frames_total)
    print(f'Processing {frames_to_process} frames (up to {clip_minutes} minutes)')

    # Capture one frame per second (skip frames based on fps)
    for frame in range(frames_to_process):
        ret, img = cap.read()  # Capture frame-by-frame
        if not ret:
            break  # Break if we can't read the frame
        
        if frame % fps == 0:  # Process one frame per second
            print(f'Processing frame {frame}/{frames_to_process}')
            print(f"Input image shape: {img.shape}")
            processed_frame = get_features(img).cpu().numpy()  # Process the frame
            print(f"Feature shape: {processed_frame.shape}")
            video_frame_features.append(processed_frame)

    # Release the video capture object and close all OpenCV windows
    cap.release()
    cv2.destroyAllWindows()
    return np.array(video_frame_features)

def process_videos():
    for i in range(len(video_list)):
        video_name = video_list[i]
        numpy_file = f"/notebooks/disk4/Suturing_Challenge/Hand_gesture/features_extracted/{video_name}.npz"
        video_score = np.array(0 if rating_score[i] < 16 else 1 if 15 < rating_score[i] < 24 else 2 if 23 < rating_score[i] < 32 else 3)
        if(not os.path.isfile(numpy_file)): # if the video hasn't been already processed
            print(f"Video {i}/{len(video_list)} - {video_name}.mp4 Score: {video_score}")
            video_features = process_video(f"{Dataset_base_path}/videos/{video_name}")
            np.savez(numpy_file, video_features, video_score) #usar a compressed como abaixo

def convert_numpy_files():
    all_videos = glob("./numpy_files/*.npz")
    for file in all_videos:
        old_numpy_file = np.load(file)
        new_file = f"./Dataset/Train/numpy_files/{file.split('/')[-1]}"
        #get video
        video = old_numpy_file["arr_0"].reshape( -1, 384, 20, 20)[::3,:,:,:][0:300] #sub sample from 3FPS to 1 and select the first 300 (5min)
        #get scores
        score = old_numpy_file["arr_1"]
        np.savez_compressed(new_file, video = video, score = score)

def reduce_numpy(): #apply PCA
    numpy_files = glob("./Dataset/Train/numpy_files/*.npz")
    reduced_numpy_path = "./Dataset/Train/numpy_files_reduced"
    pca = PCA()
    for i,file in enumerate(numpy_files):
        print(f"File {i+1}/{len(numpy_files)}")
        file_name = file.split("/")[-1]
        print(file_name)
        numpy_video = np.load(file)["video"]
        print(numpy_video.shape)
        pca.fit(numpy_video.reshape(300, -1))
        features_reduced = pca.transform(numpy_video.reshape(300, -1))
        print(f"Original shape: {numpy_video.reshape(300, -1).shape}")
        print(f"New shape: {features_reduced.shape}")
        np.savez_compressed(f"{reduced_numpy_path}/{file_name}", video = features_reduced)

reduce_numpy()

## Step 4 - prepare dataset.csv to be used with data loaders

In [None]:
def get_GRS(score_list, index):
    score1 = 0 if score_list[index] < 16 else 1 if 15 < score_list[index] < 24 else 2 if 23 < score_list[index] < 32 else 3
    score2 = 0 if score_list[index+1] < 16 else 1 if 15 < score_list[index+1] < 24 else 2 if 23 < score_list[index+1] < 32 else 3
    score3 = 0 if score_list[index+2] < 16 else 1 if 15 < score_list[index+2] < 24 else 2 if 23 < score_list[index+2] < 32 else 3
    return int((score1+score2+score3)/3)

def create_Dataset(): #combine both of these functions to include all the propper columns
    DATASET_CSV = "./Dataset.csv"
    NUMPY_FILES_PATH = "./Dataset/Train/numpy_files_reduced"
    header = { "VIDEO" : [], 
              "NUMPY": [], 
              "GRS" : [],}

    df_dataset = pd.DataFrame(header)
    for i in range(0,len(video_list),3):
        row = []
        row.append(video_list[i])#VIDEO name
        row.append(f"{NUMPY_FILES_PATH}/{video_list[i]}.npz")#NUMPY file
        row.append(get_GRS(rating_score, i)) #GRS
        df_dataset.loc[len(df_dataset)] = row
    df_dataset.to_csv(DATASET_CSV, index = False)

## Step 5 - Prepare dataloaders

In [None]:
# Define transforms
train_transforms = Compose([
    LoadImaged(keys = ["NUMPY"], reader = NumpyReader(npz_keys = ("video"))),
    CastToTyped(keys = ["NUMPY"]), #cast to float32
    ToTensor(),
])            

val_transforms = Compose([
    LoadImaged(keys = ["NUMPY"], reader = NumpyReader(npz_keys = ("video"))),
    CastToTyped(keys = ["NUMPY"]), #cast to float32
    ToTensor(),
])

BATCH_SIZE = 10

# create a training data loader
train_ds = CSVDataset(src = "./Dataset.csv", col_names = ["NUMPY", "GRS"], row_indices = [[0,250]], transform = train_transforms)
train_loader = DataLoader(train_ds, batch_size = BATCH_SIZE, shuffle = True, num_workers = 2, pin_memory = pin_memory, drop_last = True)

# create a validation data loader
val_ds = CSVDataset(src = "./Dataset.csv", col_names = ["NUMPY", "GRS"], row_indices = [[251,314]], transform = val_transforms)
val_loader = DataLoader(val_ds, batch_size = BATCH_SIZE, num_workers = 2, pin_memory = pin_memory, drop_last = True)

## Step 6 - Prepare model

In [None]:
in_channels = 300*300
hidden_channels = 9000 #int(in_channels/2)

# Definição classe para o modelo
class MLP(Module):
    # definir elementos do modelo
    def __init__(self, n_inputs):
        super(MLP, self).__init__()
        # input para a primeira camada
        self.hidden1 = Linear(n_inputs, hidden_channels)
        xavier_uniform_(self.hidden1.weight)  #O underscore significa que a inicialização vai ser feita por referencia e não pelo return
        self.act1 = ReLU()
        self.hidden2 = Linear(hidden_channels, 4)
        xavier_uniform_(self.hidden2.weight)
        self.act2 = Softmax(dim = 1)
 
    # sequencia de propagação do input 
    def forward(self, X):
        # input para a primeira camada
        X = self.hidden1(X)
        X = self.act1(X)
        # segunda camada
        X = self.hidden2(X)
        X = self.act2(X)
        return X

## Step 7 - Train model

In [None]:
model = MLP(in_channels).to(device)

BEST_MODEL_PATH = f"./model/MLP"
BEST_MODEL_FILE = f"{BEST_MODEL_PATH}/best_metric_model_classification3d_array_attempt{len(glob(f'{BEST_MODEL_PATH}/*'))+1:02}.pth"

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

EPOCHS = 200
LEARNING_RATE = 0.001

# treino do modelo
def train_model(train_dl, model):
    liveloss = PlotLosses() ##para visualizarmos o processo de treino
    # definir o loss e a função de otimização
    #criterion = torch.nn.MSELoss() #neste caso implementa a sparse_categorical_crossentropy
    criterion = torch.nn.CrossEntropyLoss()
    #optimizer = SGD(model.parameters(), lr=LEARNING_RATE, momentum=0.9) #stochastic gradient descent
    optimizer = Adam(model.parameters(), lr = LEARNING_RATE)
    # iterar as epochs
    #lowest_rmse = sys.float_info.max
    highest_f1 = 0
    for epoch in range(EPOCHS):
        print(f"Epoch {epoch+1}/{EPOCHS}")
        logs = {} ##para visualizarmos o processo de treino
        # iterar as batchs
        epoch_loss  = 0 ##para visualizarmos o processo de treino
        #epoch_rmse  = 0 ##para visualizarmos o processo de treino
        epoch_f1 = 0
        for batch_data in train_dl:
            inputs, labels = batch_data["NUMPY"].to(device), batch_data["GRS"].to(device)
            # inicializar os gradientes
            optimizer.zero_grad() #coloca os gradientes de todos os parametros a zero
            # calcular o output do modelo
            outputs = model(inputs.reshape(BATCH_SIZE, -1))
            labels = torch.nn.functional.one_hot(labels.long(), num_classes = 4).float()
            # calcular o loss
            loss = criterion(outputs, labels) #.unsqueeze(1)
            f1 = f1_score(torch.argmax(labels, dim = 1).cpu().detach().numpy().astype(int), torch.argmax(outputs,dim = 1).cpu().detach().numpy().astype(int), average = 'macro')
            # atribuição alteraçoes "In the backward pass we receive a Tensor containing the gradient of the loss
            #with respect to the output, and we need to compute the gradient of the loss with respect to the input.
            loss.backward()
            # update pesos do modelo
            optimizer.step()
            #só para multiclass:
            #valores, predictions = torch.max(outputs, 1) #retorna um tensor com os indices do valor maximo em cada caso
            epoch_loss += loss.item()
            #epoch_rmse += rmse.item()
            epoch_f1 += f1.item()

        #print(f'Epoch {epoch:03}: | Loss: {epoch_loss/len(train_dl):.5f} | RMSE: {epoch_rmse/len(train_dl):.3f}')   
        print(f'Epoch {epoch:03}: | Loss: {epoch_loss/len(train_dl):.5f} | F1 Score: {epoch_f1/len(train_dl):.3f}')      
        logs['loss'] = epoch_loss/len(train_dl) ##para visualizarmos o processo de treino
        #logs['rmse'] = epoch_rmse/len(train_dl) ##para visualizarmos o processo de treino
        logs['f1_score'] = epoch_f1/len(train_dl) ##para visualizarmos o processo de treino
        liveloss.update(logs) ##para visualizarmos o processo de treino
        liveloss.send() ##para visualizarmos o processo de treino

        if epoch_f1/len(train_dl) > highest_f1:
            highest_f1 = f1
            highest_f1_epoch = epoch + 1
            torch.save(model.state_dict(), BEST_MODEL_FILE)
            print("saved new best metric model")
 
# treinar o modelo
#train_model(train_loader, model)

## Step 8 - Test model

In [None]:
# Avaliar o modelo
def evaluate_model(test_dl, model):
    predictions = list()
    actual_values = list()
    for val_data in test_dl:
        inputs, labels = val_data["NUMPY"].to(device), val_data["GRS"].to(device)
        labels = torch.nn.functional.one_hot(labels.long(), num_classes = 4).float()
        # avaliar o modelo com os casos de teste
        yprev = model(inputs.reshape(BATCH_SIZE, -1))
        # retirar o array numpy
        
        yprev = torch.argmax(yprev, dim = 1).cpu().detach().numpy().astype(int)
        actual = torch.argmax(labels, dim = 1).cpu().detach().numpy().astype(int)
        #actual = actual.reshape((len(actual), 1))
        # guardar
        predictions.append(yprev)
        actual_values.append(actual)

    predictions, actual_values = np.vstack(predictions), np.vstack(actual_values)
    return actual_values, predictions

#avaliar o modelo
actual_values, predictions = evaluate_model(val_loader, model)
#actuals, predictions = evaluate_model(train_dl, model)
for r,p in zip(actual_values, predictions):
    print(f'real:{r} previsão:{p}') 

#calcular a accuracy
mse = mean_squared_error(actual_values, predictions)
print(f'MSE: {mse:0.3f}, RMSE: {np.sqrt(mse):0.3f}\n')

In [None]:
model.load_state_dict(torch.load(BEST_MODEL_FILE))
all_outputs = np.array([])
all_labels = np.array([])
for val_data in val_loader:
    val_images, val_labels = val_data["NUMPY"].to(device), val_data["GRS"].to(device)
    val_labels = torch.nn.functional.one_hot(val_labels.long(), num_classes = 4).float()
    with torch.no_grad():
        val_prev = model(val_images.reshape(BATCH_SIZE, -1))
        all_outputs = np.append(all_outputs, torch.argmax(val_prev, dim = 1).cpu().detach().numpy().astype(int))
        all_labels = np.append(all_labels, torch.argmax(val_labels, dim = 1).cpu().detach().numpy().astype(int))

for r,p in zip(all_labels, all_outputs):
    print(f'real:{r} previsão:{p}') 
print(f"F1-Score: {f1_score(all_labels, all_outputs, average = 'macro'):0.3f}")