In [None]:

from google.colab import drive
drive.mount('/content/drive')


In [None]:
import os
import re
import pickle
from sklearn.preprocessing import LabelEncoder
#same process as with all labels here except use your own directory path

directory = '/content/drive/My Drive/batches matches full'
files = [os.path.join(directory, f) for f in os.listdir(directory) if f.endswith('.pkl')]


video_files = {}
for f in files:
    match = re.search(r'P(\d+)_batch_(\d+)', f)
    if match:
        video_id, batch_number = int(match.group(1)), int(match.group(2))
        if video_id not in video_files:
            video_files[video_id] = []
        video_files[video_id].append((batch_number, f))

for video_id in video_files.keys():
    video_files[video_id].sort(key=lambda x: x[0])
    video_files[video_id] = [f for _, f in video_files[video_id]]

sorted_video_ids = sorted(video_files.keys())
train_video_ids = sorted_video_ids[:20]
test_video_ids = sorted_video_ids[-5:]

train_files = [file for vid in train_video_ids for file in video_files[vid]]
test_files = [file for vid in test_video_ids for file in video_files[vid]]

def load_sequences_and_labels(file_list):
    sequences = []
    labels = []
    for file_path in file_list:
        with open(file_path, 'rb') as file:
            sequence_data = pickle.load(file)
            frames = [item[0] for item in sequence_data]
            sequence_labels = [item[1] for item in sequence_data]
            sequences.append(frames)
            labels.append(sequence_labels)
    return sequences, labels

X_train, Y_train = load_sequences_and_labels(train_files)
X_test, Y_test = load_sequences_and_labels(test_files)

In [None]:
label_mapping = {'Injection': 0, 'Optimising_Position': 1, 'Polypectomy': 2, 'Inspecting_Resection':3,'Other':4}

#filtering out frames and labels where label is 'Other'
def remove_label_other(X, Y, label_to_remove='Other'):
    filtered_X = []
    filtered_Y = []
    for x_seq, y_seq in zip(X, Y):
        filtered_seq = [(frame, label) for frame, label in zip(x_seq, y_seq) if label != label_to_remove]
        if filtered_seq:
            filtered_frames, filtered_labels = zip(*filtered_seq)
            filtered_X.append(list(filtered_frames))
            filtered_Y.append([label for label in filtered_labels])
    return filtered_X, filtered_Y

X_train_filtered, Y_train_filtered = remove_label_other(X_train, Y_train, 'Other')
X_test_filtered, Y_test_filtered = remove_label_other(X_test, Y_test, 'Other')

Y_train_int_filtered = [[label_mapping[label] for label in seq] for seq in Y_train_filtered]
Y_test_int_filtered = [[label_mapping[label] for label in seq] for seq in Y_test_filtered]


In [None]:
import numpy as np
from scipy.stats import mode
from tensorflow.keras.utils import Sequence, to_categorical

class SequentialDataGenerator(Sequence):
    def __init__(self, X, Y_int, batch_size=16, num_classes=None, max_sequence_length=None):
        self.X = X
        self.Y_int = Y_int
        self.batch_size = batch_size
        self.num_classes = num_classes
        self.max_sequence_length = max_sequence_length or self._max_length()
        self.indexes = np.arange(len(self.X))

    def __len__(self):
        return int(np.ceil(len(self.X) / self.batch_size))

    def __getitem__(self, index):
        indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size]
        batch_X = [self.X[k] for k in indexes]
        batch_Y_int = [self.Y_int[k] for k in indexes]

        X_temp_padded = self._pad_sequences(batch_X, self.max_sequence_length)
        Y_temp_encoded = [to_categorical(y, num_classes=self.num_classes) for y in batch_Y_int]
        Y_temp_padded = self._pad_sequences(Y_temp_encoded, self.max_sequence_length, pad_val='mode')  # Indicating mode padding with 'mode'

        return np.array(X_temp_padded), np.array(Y_temp_padded)

    def on_epoch_end(self):
        pass

    def _max_length(self):
        return max(len(seq) for seq in self.X)
#here because the 'Other' label is removed, instead of padding with 'Other', we will be padding with the mode of that sequence if the sequence is shorter than 16
    def _pad_sequences(self, sequences, max_seq_length, pad_val=0):
        padded_sequences = []
        for seq in sequences:
            seq = np.array(seq)
            current_length = seq.shape[0]
            pad_size = max_seq_length - current_length

            if pad_size > 0:
                if isinstance(pad_val, str) and pad_val == 'mode':
                    if seq.ndim == 2 and seq.size > 0:
                        int_labels = np.argmax(seq, axis=1)
                        if int_labels.size > 0:
                            try:
                                mode_result = mode(int_labels)
                                most_common_label_index = mode_result.mode[0]
                            except IndexError:
                                most_common_label_index = 0
                            mode_label_one_hot = to_categorical(most_common_label_index, num_classes=self.num_classes)
                            pad_array = np.tile(mode_label_one_hot, (pad_size, 1))
                        else:
                            pad_array = np.zeros((pad_size, self.num_classes))
                    else:
                        pad_array = np.zeros((pad_size, self.num_classes))
                else:
                    if isinstance(pad_val, list) and len(pad_val) == seq.shape[-1]:
                        pad_array = np.array([pad_val] * pad_size)
                    else:
                        pad_array = np.zeros((pad_size,) + seq.shape[1:], dtype=seq.dtype)

                padded_seq = np.concatenate((seq, pad_array), axis=0)
            else:
                padded_seq = seq

            padded_sequences.append(padded_seq)
        return padded_sequences

In [None]:
num_classes = 4
train_generator = SequentialDataGenerator(X_train_filtered, Y_train_int_filtered, batch_size=16, num_classes=len(label_mapping))
test_generator = SequentialDataGenerator(X_test_filtered, Y_test_int_filtered, batch_size=16, num_classes=len(label_mapping))


x_sample, y_sample = next(iter(train_generator))
print("X_sample shape:", x_sample.shape)
print("Y_sample shape:", y_sample.shape)