<a href="https://colab.research.google.com/github/rafael0130/Thesis/blob/main/thesis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
# pip install mne
# import mne
import pandas as pd
import scipy
import matplotlib.pyplot as plt
import xml.etree.cElementTree as et
import tensorflow as tf
import librosa

from google.colab import drive
drive.mount('/content/drive')
rootpath='/content/drive/MyDrive/thesis/'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
def create_dataset(data, window_size,samp_rate):
    windows = []
    window_size=window_size*samp_rate
    for i in range(0, data.shape[0], window_size):
        window = data[i:i+window_size]
        if len(window) == window_size:  # Discard incomplete windows (if any)
            windows.append(window)
    return np.array(windows)

def create_mfccs(data):
    data=data.reshape(data.shape[0],)
    mfccs = librosa.feature.mfcc(y=data,n_mfcc =13,sr=500)
    delta = librosa.feature.delta(mfccs)
    delta2 = librosa.feature.delta(mfccs,order=2)
    mfccs_concat = np.concatenate((mfccs,delta,delta2))
    return mfccs_concat
def get_labels(rootpath,patient_num,event_type,window_size,ch_len):
  annotations =et.parse(rootpath+"0000"+patient_num+"-100507.rml").getroot()
  events = []
  # samp_rate=100
  for event in annotations.iter('{http://www.respironics.com/PatientStudy.xsd}Event'):
      events.append(event.attrib)
  events_df= pd.DataFrame(events)
  events_df.drop(['Machine', 'OriginatedOnDevice'], axis='columns', inplace=True)
  events_df=events_df[events_df.Family==event_type].reset_index(drop=True)
  events_df.Start=events_df.Start.astype('float64')
  events_df.Duration=events_df.Duration.astype('float64')
  events_df['end']=(events_df.Start+events_df.Duration)
  events_df['window_start']=events_df.Start//window_size
  events_df['window_end']=events_df.end//window_size
  # print(events_df)
  events_set=set(events_df.window_start)|set(events_df.window_end)
  lbls=[]
  for i in range(ch_len):
      if i in events_set:
          lbls.append(1)
      else:
          lbls.append(0)
  return lbls

In [None]:
first=True
window_size=30
for num in ['0995','0999','1000']:
  df = pd.read_csv(rootpath+'0000'+num+'-100507_Flow Patient-0.csv')
  df2 = pd.read_csv(rootpath+'0000'+num+'-100507_SpO2.csv')
  df3 = pd.read_csv(rootpath+'0000'+num+'-100507_Snore.csv')
  ch1_temp = create_dataset(df,window_size,100)
  ch2_temp = create_dataset(df2,window_size,1)
  ch3_temp = create_dataset(df3,window_size,500)
  ch_len=len(ch1_temp)
  lbls_temp = get_labels(rootpath,num,"Respiratory",window_size,ch_len)
  if first:
    ch1=ch1_temp
    ch2=ch2_temp
    ch3=ch3_temp
    lbls=lbls_temp
    first=False
  else:
    ch1=np.concatenate((ch1,ch1_temp))
    ch2=np.concatenate((ch2,ch2_temp))
    ch3=np.concatenate((ch3,ch3_temp))
    lbls=np.concatenate((lbls,lbls_temp))

ch3_mod = []
for item in range(len(ch3)):
    result = create_mfccs(ch3[item]).reshape(39,30,1)
    ch3_mod.append(result)
#     plt.figure(figsize=(25,10))
#     librosa.display.specshow(result,x_axis='time',sr=500)
#     plt.colorbar(format='%+2f')
ch3_mod=np.array(ch3_mod)

In [None]:
print(ch1.shape,ch2.shape,ch3_mod.shape,lbls.shape)

(1588, 3000, 1) (1588, 30, 1) (1588, 39, 30, 1) (1588,)


In [None]:
idx = np.random.randint(lbls.shape[0], size=lbls.shape[0])

In [None]:
ch1=ch1[idx]
ch2=ch2[idx]
ch3_mod=ch3_mod[idx]
lbls=lbls[idx]

In [None]:
split_ratio = int(lbls.shape[0] * 0.8)
split_ratio

1270

In [None]:
def encoder_1d(input_shape, modality_name,filters,code_size,l2_rate):
    initializer = tf.keras.initializers.RandomNormal(mean=0., stddev=1.)
    input = tf.keras.layers.Input(input_shape)
    x = tf.keras.layers.Conv1D(filters=2 * filters,
                               kernel_size=10,
                               activation="linear",
                               padding="same",
                               strides=1,
                               kernel_regularizer=tf.keras.regularizers.l2(l2_rate),
                               kernel_initializer=initializer)(input)

    x = tf.keras.layers.LayerNormalization()(x)
    x = tf.keras.layers.PReLU(shared_axes=[1])(x)

    x = tf.keras.layers.Conv1D(filters=filters,
                               kernel_size=8,
                               activation="linear",
                               padding="same",
                               strides=1,
                               kernel_regularizer=tf.keras.regularizers.l2(l2_rate),
                               kernel_initializer=initializer)(x)
    x = tf.keras.layers.LayerNormalization()(x)
    x = tf.keras.layers.PReLU(shared_axes=[1])(x)

    x = tf.keras.layers.Conv1D(filters=code_size,
                               kernel_size=4,
                               activation="linear",
                               padding="same",
                               strides=1,
                               kernel_regularizer=tf.keras.regularizers.l2(l2_rate),
                               kernel_initializer=initializer)(x)
    # output = tf.keras.layers.LayerNormalization()(x)
    x = tf.keras.layers.PReLU(shared_axes=[1])(x)
    output = tf.keras.layers.BatchNormalization(epsilon=1e-6)(x)

    return tf.keras.models.Model(input, output, name=modality_name)

def encoder_2d(input_shape, modality_name,filters):
    input = tf.keras.layers.Input(input_shape) #input_shape=(height, width, channels)
    # Add a 2D convolutional layer with 32 filters, 5x5 kernel size, and 'relu' activation
    x = tf.keras.layers.Conv2D(filters, (5, 5), activation='relu')(input)

    # Add a 2D max pooling layer with 2x2 pool size
    x=tf.keras.layers.MaxPooling2D(pool_size=(2, 2))(x)

    # Add a dropout layer with a dropout rate of 0.25 (adjust as needed)
    x=tf.keras.layers.Dropout(0.25)(x)

    # Add a 2D convolutional layer with 32 filters, 5x5 kernel size, and 'relu' activation
    x = tf.keras.layers.Conv2D(filters*2, (4, 4), activation='relu')(x)

    # Add a 2D max pooling layer with 2x2 pool size
    x=tf.keras.layers.MaxPooling2D(pool_size=(2, 2))(x)

    # Add a dropout layer with a dropout rate of 0.25 (adjust as needed)
    x=tf.keras.layers.Dropout(0.25)(x)

    # Add a 2D convolutional layer with 32 filters, 5x5 kernel size, and 'relu' activation
    x = tf.keras.layers.Conv2D(filters*3, (3, 3), activation='relu')(x)

    # Add a dropout layer with a dropout rate of 0.25 (adjust as needed)
    output=tf.keras.layers.Dropout(0.25)(x)

    return tf.keras.models.Model(input, output, name=modality_name)

In [None]:
# modality specific encoders
mod_encoder = []
mod_input = []

input_shape = (ch1.shape[1], ch1.shape[2])
encoder = encoder_1d(input_shape,
                                  modality_name='flow',
                                  filters=24,
                                  code_size=256,
                                  l2_rate=1e-4)


mod_input.append(tf.keras.layers.Input(shape=input_shape))

x_a = encoder(mod_input[-1])
x_a = tf.keras.layers.GlobalMaxPooling1D()(x_a)
x_a = tf.keras.layers.Dense(256, activation="linear")(x_a)
mod_encoder.append(x_a)

input_shape = (ch2.shape[1], ch2.shape[2])
encoder = encoder_1d(input_shape,
                                  modality_name='spo2',
                                  filters=24,
                                  code_size=256,
                                  l2_rate=1e-4)

mod_input.append(tf.keras.layers.Input(shape=input_shape))

x_a = encoder(mod_input[-1])
x_a = tf.keras.layers.GlobalMaxPooling1D()(x_a)
x_a = tf.keras.layers.Dense(256, activation="linear")(x_a)
mod_encoder.append(x_a)

input_shape = (ch3_mod.shape[1], ch3_mod.shape[2],ch3_mod.shape[3])
encoder = encoder_2d(input_shape,
                                  modality_name='snore',
                                  filters=36)

mod_input.append(tf.keras.layers.Input(shape=input_shape))

x_a = encoder(mod_input[-1])
x_a = tf.keras.layers.GlobalMaxPooling2D()(x_a)
x_a = tf.keras.layers.Dense(256, activation="linear")(x_a)
mod_encoder.append(x_a)

embedding_model = tf.keras.Model(mod_input, mod_encoder)
# input_x=embedding_model.input
# print(input_x)
# # input_x = ([(ch1.shape[1], ch1.shape[2]),(ch2.shape[1], ch2.shape[2])])
# xi=embedding_model(input_x)
# x = tf.keras.layers.Concatenate()(embedding_model.output)
# x = tf.keras.layers.Flatten()(x)

# x = tf.keras.layers.Dense(128, activation="relu")(x)

# # Final classification layer
# output = tf.keras.layers.Dense(1, activation=binar)(x)

# # Define the new model
# classifier_model = tf.keras.models.Model(inputs=embedding_model.inputs, outputs=output)
# classifier_model.summary()
embedding_model.summary()

Model: "model_16"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_66 (InputLayer)       [(None, 3000, 1)]            0         []                            
                                                                                                  
 input_68 (InputLayer)       [(None, 30, 1)]              0         []                            
                                                                                                  
 input_70 (InputLayer)       [(None, 39, 30, 1)]          0         []                            
                                                                                                  
 flow (Functional)           (None, 3000, 256)            36096     ['input_66[0][0]']            
                                                                                           

In [None]:
# X_train,X_test,y_train,y_test = [ch1[0:400,:,:],ch2[0:400,:,:]],[ch1[400:,:,:],ch2[400:,:,:]],lbls[0:400],lbls[400:]
# # Step 3: Train Your Model
# classifier_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
# classifier_model.fit(X_train, y_train, epochs=10, batch_size=8)

# # Evaluate the model on the test set
# test_loss, test_accuracy = classifier_model.evaluate(X_test, y_test)
# print(f'Test accuracy: {test_accuracy}')

## Contrastive loss

In [None]:
def cocoa_loss(ytrue, ypred):
    temperature = 0.5
    lambd=3.9e-3
    scale_loss=1/32
    batch_size, dim_size = ypred.shape[1], ypred.shape[0]
    print(ypred.shape)
    # Positive Pairs
    pos_error = []
    for i in range(batch_size):
        sim = tf.linalg.matmul(ypred[:, i, :], ypred[:, i, :], transpose_b=True)
        sim = tf.subtract(tf.ones([dim_size, dim_size], dtype=tf.dtypes.float32), sim)
        sim = tf.exp(sim/temperature)
        pos_error.append(tf.reduce_mean(sim))
    # Negative pairs
    neg_error = 0
    for i in range(dim_size):
        sim = tf.cast(tf.linalg.matmul(ypred[i], ypred[i], transpose_b=True), dtype=tf.dtypes.float32)
        sim = tf.exp(sim /temperature)
        # sim = tf.add(sim, tf.ones([batch_size, batch_size]))
        tri_mask = np.ones(batch_size ** 2, dtype=bool).reshape(batch_size, batch_size)
        tri_mask[np.diag_indices(batch_size)] = False
        off_diag_sim = tf.reshape(tf.boolean_mask(sim, tri_mask), [batch_size, batch_size - 1])
        neg_error += (tf.reduce_mean(off_diag_sim, axis=-1))

    error = tf.multiply(tf.reduce_sum(pos_error),scale_loss) + lambd * tf.reduce_sum(neg_error)

    return error

# ------------------------------------------------------------------------- #
class DotProduct(tf.keras.layers.Layer):
    def call(self, x, y):
        x = tf.nn.l2_normalize(x, axis=-1)
        y = tf.nn.l2_normalize(y, axis=-1)
        return tf.linalg.matmul(x, y, transpose_b=True)


# ------------------------------------------------------------------------- #
class ContrastiveModel(tf.keras.Model):
    def __init__(self, embedding_model, loss_fn, temperature=1.0, **kwargs):
        super().__init__()
        self.embedding_model = embedding_model
        self._temperature = temperature
        self._similarity_layer = DotProduct()
        self._lossfn = loss_fn

    def train_step(self, data):
        with tf.GradientTape() as tape:
            modality_embeddings = self.embedding_model(data, training=True)

            sparse_labels = tf.range(tf.shape(modality_embeddings[0])[0])

            pred = modality_embeddings
            pred = tf.nn.l2_normalize(tf.stack(pred), axis=-1)

            loss = self.compiled_loss(sparse_labels, pred)
            loss += sum(self.losses)

        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))
        return {m.name: m.result() for m in self.metrics}

    def call(self, input):
        return self.embedding_model(input)


In [None]:
ssl = ContrastiveModel(embedding_model, cocoa_loss)
batch_size=8
train=tf.data.Dataset.from_tensor_slices((ch1, ch2, ch3_mod))
X_train= train.batch(batch_size, drop_remainder=True).prefetch(tf.data.experimental.AUTOTUNE)

In [None]:
# X_train,X_test,y_train,y_test = [ch1[0:500,:,:],ch2[0:500,:,:],ch3_mod[0:500,:,:]],[ch1[400:,:,:],ch2[400:,:,:],ch3_mod[400:,:,:]],[lbls[0:400],lbls[0:400],lbls[0:400]],[lbls[400:],lbls[400:],lbls[400:]]
# Step 3: Train Your Model

ssl.compile(optimizer='adam', loss=cocoa_loss, metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
result = ssl.fit(X_train, epochs=10)


Epoch 1/10
(3, 8, 256)
(3, 8, 256)
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


## add the base model

In [None]:
base_model = ssl.embedding_model
# for layer in base_model.layers:
#     print(layer)
#     layer.trainable = False
input_x = base_model.input
xi = base_model(input_x)
x = tf.keras.layers.Concatenate()(xi)
x = tf.keras.layers.Flatten()(x)

x = tf.keras.layers.Dense(128, activation="relu", kernel_regularizer=tf.keras.regularizers.L1(0.005))(x)

classifier_model = tf.keras.layers.Dense(1,activation="sigmoid",kernel_regularizer=tf.keras.regularizers.L1(0.005),)(x)
c_model = tf.keras.Model(input_x, classifier_model)
c_model.compile(
    optimizer='adam',
    loss="binary_crossentropy",
    metrics=["accuracy"],
)

In [None]:
c_model.summary()

Model: "model_17"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_66 (InputLayer)       [(None, 3000, 1)]            0         []                            
                                                                                                  
 input_68 (InputLayer)       [(None, 30, 1)]              0         []                            
                                                                                                  
 input_70 (InputLayer)       [(None, 39, 30, 1)]          0         []                            
                                                                                                  
 model_16 (Functional)       [(None, 256),                344252    ['input_66[0][0]',            
                              (None, 256),                           'input_68[0][0]',     

In [None]:
train,val=tf.data.Dataset.from_tensor_slices((ch1[0:split_ratio], ch2[0:split_ratio], ch3_mod[0:split_ratio])),\
          tf.data.Dataset.from_tensor_slices((ch1[split_ratio:], ch2[split_ratio:], ch3_mod[split_ratio:]))
train_lbl,val_lbl =  tf.data.Dataset.from_tensor_slices(lbls[0:split_ratio]),\
                     tf.data.Dataset.from_tensor_slices(lbls[split_ratio:])
train=tf.data.Dataset.zip(train,train_lbl)
val=tf.data.Dataset.zip(val,val_lbl)
training= train.batch(batch_size, drop_remainder=True).prefetch(tf.data.experimental.AUTOTUNE)
validation = val.batch(batch_size, drop_remainder=True).prefetch(tf.data.experimental.AUTOTUNE)
c_model.fit(training, epochs=10)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.src.callbacks.History at 0x7c23028217e0>

In [None]:
test_loss, test_accuracy = c_model.evaluate(validation)
print(f'Test accuracy: {test_accuracy}')

Test accuracy: 0.6346153616905212


In [None]:
y_train2.shape

AttributeError: 'list' object has no attribute 'shape'

In [None]:
X_train[0].shape
# y_train[1].shape

(400, 3000, 1)

In [None]:
# Create TensorFlow Datasets from your data
train_dataset = tf.data.Dataset.from_tensor_slices(([X_train_modality_1, X_train_modality_2], y_train)).batch(batch_size).prefetch(tf.data.experimental.AUTOTUNE)
test_dataset = tf.data.Dataset.from_tensor_slices(([X_test_modality_1, X_test_modality_2], y_test)).batch(batch_size).prefetch(tf.data.experimental.AUTOTUNE)

# Compile the model
classifier_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Train the model
classifier_model.fit(train_dataset, epochs=10)

# Evaluate the model
test_loss, test_accuracy = classifier_model.evaluate(test_dataset)
print(f'Test accuracy: {test_accuracy}')


(96, 3000, 1)

In [None]:
# class CustomLoss:
#     # pos: exponential for positive example
#     # neg: sum of exponentials for negative examples
#     # N : number of negative examples
#     # temperature : temperature scaling
#     # tau : class probability
#     def __init__(
#             self,
#             temperature=0.5,
#             tau=0.01,
#             beta=2,
#             elimination_th=0,
#             elimination_topk=0.1,
#             lambd = 3.9e-3,
#             scale_loss= 1/32,
#             attraction=False
#     ):
#         self.temperature = temperature,
#         self.tau = tau,
#         self.beta = beta,
#         self.elimination_th = elimination_th,
#         self.elimination_topk = elimination_topk,
#         self.attraction = attraction
#         self.lambd = lambd
#         self.scale_loss = scale_loss
#         # Please double-check `reduction` parameter
#         self.criterion = tf.keras.losses.BinaryCrossentropy(
#             from_logits=False, reduction=tf.keras.losses.Reduction.SUM)



#     def get_loss_fn(self, loss_type):
#         loss = None
#         # Info-NCE
#         if loss_type == "nce":
#             def loss(ytrue, ypred):
#                 all_sim = K.exp(ypred / self.temperature)
#                 logits = tf.divide(
#                     tf.linalg.tensor_diag_part(all_sim), K.sum(all_sim, axis=1))
#                 print(logits)
#                 lbl = np.ones(ypred.shape[0])
#                 error = self.criterion(y_pred=logits, y_true=lbl)
#                 return error

#         # Debiased Contrastive Learning
#         elif loss_type in ["dcl", "harddcl"]:
#             def loss(ytrue, ypred):
#                 # dcl: from Debiased Contrastive Learning paper: https://github.com/chingyaoc/DCL/
#                 # harddcl: from ICLR2021 paper: Contrastive LEarning with Hard Negative Samples
#                 # https://www.groundai.com/project/contrastive-learning-with-hard-negative-samples
#                 # reweight = (beta * neg) / neg.mean()
#                 # Neg = max((-N * tau_plus * pos + reweight * neg).sum() / (1 - tau_plus), e ** (-1 / t))
#                 # hard_loss = -log(pos.sum() / (pos.sum() + Neg))
#                 N = ypred.shape[0]
#                 all_sim = K.exp(ypred / self.temperature)
#                 pos_sim = tf.linalg.tensor_diag_part(all_sim)

#                 tri_mask = np.ones(N ** 2, dtype=np.bool).reshape(N, N)
#                 tri_mask[np.diag_indices(N)] = False
#                 neg_sim = tf.reshape(tf.boolean_mask(all_sim, tri_mask), [N, N - 1])

#                 reweight = 1.0
#                 if loss_type == "harddcl":
#                     reweight = (self.beta * neg_sim) / tf.reshape(tf.reduce_mean(neg_sim, axis=1), [-1, 1])
#                 if self.beta == 0:
#                     reweight = 1.0

#                 Ng = tf.divide(
#                     tf.multiply(self.tau[0] * (1 - N), pos_sim) + K.sum(tf.multiply(reweight, neg_sim), axis=-1),
#                     (1 - self.tau[0]))
#                 print(Ng)
#                 # constrain (optional)
#                 Ng = tf.clip_by_value(Ng, clip_value_min=(N - 1) * np.e ** (-1 / self.temperature[0]),
#                                       clip_value_max=tf.float32.max)
#                 error = K.mean(- tf.math.log(pos_sim / (pos_sim + Ng)))
#                 return error

#         elif loss_type == "barlow":
#             def loss(ytrue, ypred):
#                 # Barlow Twins: Self-supervised representation learning with redundancy reduction
#                 # https://arxiv.org/pdf/2103.03230.pdf
#                 N = ypred.shape[0]

#                 # empirical cross-correlation matrix
#                 corr = tf.divide(ypred,N)
#                 on_diag = tf.linalg.tensor_diag_part(corr) # pos_sim
#                 invariance = tf.multiply(K.sum(tf.math.pow(tf.math.subtract(on_diag,1),2)), self.scale_loss)

#                 tri_mask = np.ones(N ** 2, dtype=np.bool).reshape(N, N)
#                 tri_mask[np.diag_indices(N)] = False
#                 off_diag = tf.reshape(tf.boolean_mask(corr, tri_mask), [N, N - 1])
#                 redundancy = tf.multiply(tf.math.reduce_sum(tf.math.pow(off_diag,2)), self.scale_loss)

#                 error = invariance + self.lambd * redundancy

#                 return error
#         # Contrasting More than two dimenstions
#         elif loss_type == "cocoa":
#             def loss(ytrue, ypred):
#                 batch_size, dim_size = ypred.shape[1], ypred.shape[0]

#                 # Positive Pairs
#                 pos_error=[]
#                 for i in range(batch_size):
#                     sim = tf.exp(tf.linalg.matmul(ypred[:,i,:], ypred[:,i,:], transpose_b=True)/self.temperature)
#                     tri_mask = np.ones(dim_size ** 2, dtype=np.bool).reshape(dim_size, dim_size)
#                     tri_mask[np.diag_indices(dim_size)] = False
#                     off_diag_sim = tf.reshape(tf.boolean_mask(sim, tri_mask), [dim_size, dim_size - 1])
#                     pos_error.append(tf.reduce_sum(off_diag_sim))
#                 # Negative pairs
#                 neg_error = 0
#                 for i in range(dim_size):
#                     sim = tf.cast(tf.linalg.matmul(ypred[i], ypred[i], transpose_b=True), dtype=tf.dtypes.float32)
#                     sim = tf.exp(sim/self.temperature)
#                     tri_mask = np.ones(batch_size ** 2, dtype=bool).reshape(batch_size, batch_size)
#                     tri_mask[np.diag_indices(batch_size)] = False
#                     off_diag_sim = tf.reshape(tf.boolean_mask(sim, tri_mask), [batch_size, batch_size - 1])
#                     neg_error += (tf.reduce_mean(off_diag_sim, axis=-1))

#                 logits = tf.divide(pos_error, pos_error+neg_error)
#                 lbl = np.ones(batch_size)
#                 error = self.criterion(y_pred=logits, y_true=lbl)
#                 return error

#         elif loss_type == "cocoa2":
#             def loss(ytrue, ypred):
#                 batch_size, dim_size = ypred.shape[1], ypred.shape[0]
#                 # Positive Pairs
#                 pos_error = []
#                 for i in range(batch_size):
#                     sim = tf.linalg.matmul(ypred[:, i, :], ypred[:, i, :], transpose_b=True)
#                     sim = tf.subtract(tf.ones([dim_size, dim_size], dtype=tf.dtypes.float32), sim)
#                     sim = tf.exp(sim/self.temperature)
#                     pos_error.append(tf.reduce_mean(sim))
#                 # Negative pairs
#                 neg_error = 0
#                 for i in range(dim_size):
#                     sim = tf.cast(tf.linalg.matmul(ypred[i], ypred[i], transpose_b=True), dtype=tf.dtypes.float32)
#                     sim = tf.exp(sim / self.temperature)
#                     # sim = tf.add(sim, tf.ones([batch_size, batch_size]))
#                     tri_mask = np.ones(batch_size ** 2, dtype=bool).reshape(batch_size, batch_size)
#                     tri_mask[np.diag_indices(batch_size)] = False
#                     off_diag_sim = tf.reshape(tf.boolean_mask(sim, tri_mask), [batch_size, batch_size - 1])
#                     neg_error += (tf.reduce_mean(off_diag_sim, axis=-1))

#                 error = tf.multiply(tf.reduce_sum(pos_error),self.scale_loss) + self.lambd * tf.reduce_sum(neg_error)

#                 return error

#         elif loss_type == "mse":
#             def loss(ytrue, ypred):
#                 reconstruction_error = tf.reduce_mean(tf.square(tf.subtract(ytrue, ypred)))
#                 return reconstruction_error

#         else:
#             raise ValueError("Undefined loss function.")

#         return loss


# def mse_loss(model, original):
#   reconstruction_error = tf.reduce_mean(tf.square(tf.subtract(model(original), original)))
#   return reconstruction_error

In [None]:
# custom_loss_obj = CustomLoss(temperature=(1/32))
# loss_fn = custom_loss_obj.get_loss_fn("cocoa2")

In [None]:
cocoa_loss

<function __main__.cocoa_loss(ytrue, ypred)>

In [None]:
loss_fn

NameError: name 'loss_fn' is not defined