In [1]:
import tensorflow as tf
import numpy as np
import glob
import pandas as pd
import random
from tensorflow.keras import backend as K
from keras import Input, Model
from sklearn.manifold import TSNE
from sklearn.model_selection import train_test_split
from tensorflow.keras.layers import Input, Conv2D, Flatten, Dense, Reshape, concatenate
import os


In [2]:

dilantin_path = 'Siezure med dataset/dataset_used_in_CBMS_paper/fft_images_100Hz_ws_1_overlap_0.1_1Hz_50Hz/dilantin'
keppra_path = 'Siezure med dataset/dataset_used_in_CBMS_paper/fft_images_100Hz_ws_1_overlap_0.1_1Hz_50Hz/keppra'
none_path = 'Siezure med dataset/dataset_used_in_CBMS_paper/fft_images_100Hz_ws_1_overlap_0.1_1Hz_50Hz/none_0.15'

In [3]:
def get_data(folder_path,label):
    data  = []
    file_list = os.listdir(folder_path)
    npy_files  =  [f for f in file_list if f.endswith('.npy')]

    
    for file in npy_files:
        file_path = os.path.join(folder_path, file)
        try:
            loaded_data = np.load(file_path)
            
            num_samples = loaded_data.shape[0]
            data.extend([(loaded_data[i], label) for i in range(num_samples)])
        except Exception as e:
            print(f"Error processing file {file}: {e}")
    return data


In [4]:
dilantin_data =  get_data(dilantin_path, 0)
keppra_data = get_data(keppra_path, 1)
none_path = get_data(none_path, 2)

In [5]:
from sklearn.model_selection import train_test_split

def split_data(dataset, max_samples=100000, test_size=0.2, random_state=None):
    """
    Split dataset into training and test sets with a maximum of max_samples samples.

    Parameters:
    - dataset: A list of tuples where each tuple contains (sample, label).
    - max_samples: The maximum number of samples to include from the dataset (default=100000).
    - test_size: The proportion of the dataset to include in the test split (default=0.2).
    - random_state: Controls the shuffling applied to the data before applying the split.

    Returns:
    - Tuple containing (x_train, x_test, y_train, y_test)
    """
    # Limit the dataset size to max_samples
    if len(dataset) > max_samples:
        dataset = dataset[:max_samples]

    x_data = []
    y_data = []

    for sample, label in dataset:
        x_data.append(sample)
        y_data.append(label)

    x_train, x_test, y_train, y_test = train_test_split(x_data, y_data, test_size=test_size, random_state=random_state)

    return x_train, x_test, y_train, y_test


In [6]:
x_train_dilantin, x_test_dilantin, y_train_dilantin, y_test_dilantin = split_data(dilantin_data, test_size=0.2, random_state=42)

In [7]:
x_train_keppra, x_test_keppra, y_train_keppra, y_test_keppra = split_data(keppra_data, test_size=0.2, random_state = 42)

In [8]:
x_train_none, x_test_none, y_train_none, y_test_none = split_data(none_path, test_size=0.2, random_state = 42)

In [9]:
x_train = np.concatenate((x_train_dilantin, x_train_keppra, x_train_none), axis=0)

In [10]:
y_train = np.concatenate((y_train_dilantin, y_train_keppra, y_train_none), axis=0)

x_test = np.concatenate((x_test_dilantin, x_test_keppra, x_test_none), axis=0)
y_test = np.concatenate((y_test_dilantin, y_test_keppra, y_test_none), axis=0)

In [12]:
# x_train = x_train.reshape(x_train.shape[0], 19, 50, 1).astype('float32') / 255
# x_test = x_test.reshape(x_test.shape[0], 19, 50, 1).astype('float32') / 255

In [11]:
x_train.shape, y_train.shape

((240000, 19, 50), (240000,))

In [81]:
def generate_triplets(x_train, y_train, batch_size=32):
    while True:
        anchors_list = []
        positive_list = []
        negative_list = []
        
        for _ in range(batch_size):
            anchor_index = np.random.choice(len(x_train))
            anchor = x_train[anchor_index]
            label = y_train[anchor_index]
            
            positive_indices = np.where(y_train == label)[0]
            positive_index = np.random.choice(positive_indices)
            positive = x_train[positive_index]

            negative_indices = np.where(y_train!= label)[0]
            negative_index = np.random.choice(negative_indices)
            negative = x_train[negative_index]

            anchors_list.append(anchor)
            positive_list.append(positive)
            negative_list.append(negative)
        
        # Convert lists to NumPy arrays and apply reshaping and normalization
        anchors = np.array(anchors_list).reshape(-1, 19, 50, 1).astype('float32') / 255
        positives = np.array(positive_list).reshape(-1, 19, 50, 1).astype('float32') / 255
        negatives = np.array(negative_list).reshape(-1, 19, 50, 1).astype('float32') / 255

        # Convert NumPy arrays to TensorFlow tensors
        anchors_tf = tf.convert_to_tensor(anchors)
        positives_tf = tf.convert_to_tensor(positives)
        negatives_tf = tf.convert_to_tensor(negatives)

        
        yield [anchors_tf, positives_tf, negatives_tf], tf.zeros((batch_size,))   

In [15]:
def triplet_loss(y_true, y_pred, margin = 0.2):
    anchor, positive, negative = y_pred[:, 0:64], y_pred[:, 64:128], y_pred[:, 128:]
    
    positive_dist = tf.reduce_sum(tf.square(anchor - positive), axis=-1)
    negative_dist = tf.reduce_sum(tf.square(anchor - negative), axis=-1)
    
    loss = tf.maximum(positive_dist - negative_dist + margin, 0.0)
    return tf.reduce_mean(loss)

In [16]:
def common_network(input_shape):
    inputs= Input(shape = input_shape)
    x = Conv2D(32, (3,3), activation = 'relu')(inputs)
    x = Flatten()(x)
    x = Dense(128, activation='relu')(x)
    outputs = Dense(64)(x)
    return Model(inputs, outputs)

input_shape = (19, 50, 1)

common_network = common_network(input_shape)

In [17]:
anchor_input = Input(shape=input_shape, name='anchor_input')
positive_input = Input(shape=input_shape, name='positive_input')
negative_input = Input(shape=input_shape, name='negative_input')

encoded_anchor = common_network(anchor_input)
encoded_positive = common_network(positive_input)
encoded_negative = common_network(negative_input)

merged_output = concatenate([encoded_anchor, encoded_positive, encoded_negative], axis=-1)

triplet_model = Model(inputs=[anchor_input, positive_input, negative_input], outputs=merged_output)
triplet_model.compile(optimizer = 'adam', loss = triplet_loss)
triplet_model.summary()

In [77]:
new_train_generator = generate_triplets(x_train, y_train, batch_size=32)

In [79]:
for inputs, targets in train_generator:
    print([input_.shape for input_ in inputs], targets.shape)
    break

[TensorShape([32, 19, 50, 1]), TensorShape([32, 19, 50, 1]), TensorShape([32, 19, 50, 1])] (32,)


In [32]:
len(x_train)

240000

In [82]:
# Train the model
triplet_model.fit(generate_triplets(x_train, y_train, batch_size=32), steps_per_epoch=len(x_train) // 32, epochs=20)


TypeError: `output_signature` must contain objects that are subclass of `tf.TypeSpec` but found <class 'list'> which is not.