## Build model

In [1]:
import numpy as np

import tensorflow as tf


# processes input image and flattens feature maps
def get_conditional_encoder1():
    inputs = tf.keras.Input(shape = (25,66,1))
    #x = tf.keras.layers.Conv2D(filters=32, kernel_size=3, strides=(2, 2), activation='relu')(inputs)
    #x = tf.keras.layers.Conv2D(filters=64, kernel_size=3, strides=(2, 2), activation='relu')(x)
    x = tf.keras.layers.Flatten()(inputs)
    
    return tf.keras.Model(inputs=inputs,outputs=[x])

# gets flattened feature maps, and one hot label vector and outputs mu and rho
def get_conditional_encoder2(latent_dim,input_size):
    inputs = tf.keras.Input(shape = (input_size + 11,))
    x = tf.keras.layers.Dense(units=400, activation='relu')(inputs)
    x = tf.keras.layers.Dense(units=200, activation='relu')(x)
    x = tf.keras.layers.Dense(units=50, activation='relu')(x)    
    mu = tf.keras.layers.Dense(units=latent_dim)(x)
    rho = tf.keras.layers.Dense(units=latent_dim)(x)

    return  tf.keras.Model(inputs=inputs,outputs=[mu,rho])

# classical vae decoder
def get_conditional_decoder(latent_dim):
    z = tf.keras.Input(shape = (latent_dim+11,))
    x= tf.keras.layers.Dense(units=50, activation='relu')(z)
    x= tf.keras.layers.Dense(units=200, activation='relu')(x)
    x= tf.keras.layers.Dense(units=400, activation='relu')(x)
    x= tf.keras.layers.Dense(units=1650, activation='softmax')(x)
    decoded_img=tf.keras.layers.Reshape(target_shape=(25,66, 1))(x)  
    return tf.keras.Model(inputs=z,outputs=[decoded_img])

class Conditional_VAE(tf.keras.Model):
    def __init__(self,latent_dim):
        super().__init__()
        self.latent_dim = latent_dim
        self.encoder_block1 = get_conditional_encoder1()
        # 2304 is specific to conv layers, not the best practice to hardcode it
        self.encoder_block2 = get_conditional_encoder2(latent_dim=latent_dim,input_size=1650)
        self.decoder_block = get_conditional_decoder(latent_dim)

    def call(self,img,labels):
        # encoder q(z|x,y)
        enc1_output = self.encoder_block1(img)
        # concat feature maps and one hot label vector
        img_lbl_concat = np.concatenate((enc1_output,labels),axis=1)
        z_mu,z_rho = self.encoder_block2(img_lbl_concat)

        # sampling
        epsilon = tf.random.normal(shape=z_mu.shape,mean=0.0,stddev=1.0)
        z = z_mu + tf.math.softplus(z_rho) * epsilon

        # decoder p(x|z,y)
        z_lbl_concat = np.concatenate((z,labels),axis=1)
        decoded_img = self.decoder_block(z_lbl_concat)

        return z_mu,z_rho,decoded_img




## Helper functions

In [2]:
import numpy as np
import pandas as pd
import tensorflow as tf


# closed form kl loss computation between variational posterior q(z|x) and unit Gaussian prior p(z) 
def kl_loss(z_mu,z_rho):
    sigma_squared = tf.math.softplus(z_rho) ** 2
    kl_1d = -0.5 * (1 + tf.math.log(sigma_squared) - z_mu ** 2 - sigma_squared)

    # sum over sample dim, average over batch dim
    kl_batch = tf.reduce_mean(tf.reduce_sum(kl_1d,axis=1))

    return kl_batch

def elbo(z_mu,z_rho,decoded_img,original_img):
    # reconstruction loss
    mse = tf.reduce_mean(tf.reduce_sum(tf.square(original_img - decoded_img),axis=1))
    # kl loss
    kl = kl_loss(z_mu,z_rho)

    return mse,kl



def train(latent_dim,beta,epochs,train_ds):

    model = Conditional_VAE(latent_dim)

    optimizer = tf.keras.optimizers.Adam(learning_rate = 0.001)

    kl_loss_tracker = tf.keras.metrics.Mean(name='kl_loss')
    mse_loss_tracker = tf.keras.metrics.Mean(name='mse_loss')


    for epoch in range(epochs):

        label_list = None
        z_mu_list = None    

        for _,(imgs,labels) in train_ds.enumerate():
            
            # training loop
            with tf.GradientTape() as tape:
                # forward pass
                z_mu,z_rho,decoded_imgs = model(imgs,labels)

                # compute loss
                mse,kl = elbo(z_mu,z_rho,decoded_imgs,imgs)
                loss = mse + beta * kl
            
            # compute gradients
            gradients = tape.gradient(loss,model.variables)

            # update weights
            optimizer.apply_gradients(zip(gradients, model.variables))

            # update metrics
            kl_loss_tracker.update_state(kl)
            mse_loss_tracker.update_state(mse)

            # save encoded means and labels for latent space visualization
            if label_list is None:
                label_list = labels
            else:
                label_list = np.concatenate((label_list,labels))
                
            if z_mu_list is None:
                z_mu_list = z_mu
            else:
                z_mu_list = np.concatenate((z_mu_list,z_mu),axis=0)

    
        # display metrics at the end of each epoch.
        epoch_kl,epoch_mse = kl_loss_tracker.result(),mse_loss_tracker.result()
        print(f'epoch: {epoch}, mse: {epoch_mse:.4f}, kl_div: {epoch_kl:.4f}')

        # reset metric states
        kl_loss_tracker.reset_state()
        mse_loss_tracker.reset_state()

    return model,z_mu_list,label_list


## Ingest data from csv

In [3]:
from ast import literal_eval
from IPython.display import display

def prepare_sequence_data():
    def read_data_from_csv(file:str = 'prepared_data_182627.csv'):
        df = pd.read_csv(file)
        df['Sequences'] = df['Sequences'].apply(literal_eval)
        df['Durations'] = df['Durations'].apply(literal_eval)
        one_hot = pd.get_dummies(df.Group).astype(int)
        df['Group_encoded'] = one_hot.values.tolist()

        # Print the encoding information
        encoded_words = one_hot.columns
        info = {"Encoded Words": []}
        for idx, word in enumerate(encoded_words, 1):
            info["Encoded Words"].append({"Column": idx, "Word": word})

        return df, info
        
    
    df, encoding_info = read_data_from_csv()
    
    # Display the encoding information
    display(encoding_info)

    x_train = np.array([x for x in df.Sequences.values])    
    x_train = np.expand_dims(x_train, axis=3)
    x_train = tf.cast(x_train, dtype=tf.float32)
    
    y_train = [x for x in df.Group_encoded.values]
    
    train_ds = tf.data.Dataset.from_tensor_slices((x_train, y_train))
    train_ds = train_ds.shuffle(1000).batch(64)
    train_ds = train_ds.prefetch(tf.data.AUTOTUNE)

    return train_ds




In [4]:
train_ds = prepare_sequence_data()
train_ds

{'Encoded Words': [{'Column': 1, 'Word': 'Abdomen'},
  {'Column': 2, 'Word': 'Basic'},
  {'Column': 3, 'Word': 'Brain'},
  {'Column': 4, 'Word': 'Breast'},
  {'Column': 5, 'Word': 'Cardiac'},
  {'Column': 6, 'Word': 'Hip'},
  {'Column': 7, 'Word': 'Knee'},
  {'Column': 8, 'Word': 'Shoulder'},
  {'Column': 9, 'Word': 'Spine'},
  {'Column': 10, 'Word': 'Unknown'},
  {'Column': 11, 'Word': 'other'}]}

<_PrefetchDataset element_spec=(TensorSpec(shape=(None, 25, 66, 1), dtype=tf.float32, name=None), TensorSpec(shape=(None, 11), dtype=tf.int32, name=None))>

## Training

In [5]:
import logging
tf.get_logger().setLevel(logging.ERROR)

beta = 1e-11
epochs = 100
latent_dim = 15

train_ds = prepare_sequence_data()

model,z_mu_list,label_list = train(latent_dim,beta,epochs,train_ds) 

{'Encoded Words': [{'Column': 1, 'Word': 'Abdomen'},
  {'Column': 2, 'Word': 'Basic'},
  {'Column': 3, 'Word': 'Brain'},
  {'Column': 4, 'Word': 'Breast'},
  {'Column': 5, 'Word': 'Cardiac'},
  {'Column': 6, 'Word': 'Hip'},
  {'Column': 7, 'Word': 'Knee'},
  {'Column': 8, 'Word': 'Shoulder'},
  {'Column': 9, 'Word': 'Spine'},
  {'Column': 10, 'Word': 'Unknown'},
  {'Column': 11, 'Word': 'other'}]}

epoch: 0, mse: 0.1027, kl_div: 1.5884
epoch: 1, mse: 0.0991, kl_div: 1.5875
epoch: 2, mse: 0.0998, kl_div: 1.5906
epoch: 3, mse: 0.1006, kl_div: 1.5872
epoch: 4, mse: 0.1002, kl_div: 1.5902
epoch: 5, mse: 0.0978, kl_div: 1.5904
epoch: 6, mse: 0.1018, kl_div: 1.5872
epoch: 7, mse: 0.1004, kl_div: 1.5897
epoch: 8, mse: 0.1019, kl_div: 1.5856
epoch: 9, mse: 0.0974, kl_div: 1.5894
epoch: 10, mse: 0.0930, kl_div: 1.5892
epoch: 11, mse: 0.0896, kl_div: 1.5902
epoch: 12, mse: 0.0911, kl_div: 1.5875
epoch: 13, mse: 0.0898, kl_div: 1.5894
epoch: 14, mse: 0.0892, kl_div: 1.5898
epoch: 15, mse: 0.0899, kl_div: 1.5881
epoch: 16, mse: 0.0902, kl_div: 1.5892
epoch: 17, mse: 0.0881, kl_div: 1.5895
epoch: 18, mse: 0.0896, kl_div: 1.5893
epoch: 19, mse: 0.0869, kl_div: 1.5873
epoch: 20, mse: 0.0888, kl_div: 1.5904
epoch: 21, mse: 0.0866, kl_div: 1.5890
epoch: 22, mse: 0.0867, kl_div: 1.5891
epoch: 23, mse: 0.0865, kl_div: 1.5907
epoch: 24, mse: 0.0869, kl_div: 1.5903
epoch: 25, mse: 0.0860, kl_div: 1.5

KeyboardInterrupt: 

In [5]:
def find_max(a):
    max_position = np.unravel_index(np.argmax(a), a.shape)       
    result = np.zeros_like(a)
    if max(a)> sum(a) * 0.2:
        result[max_position] = 1
    return result.astype(int)

In [6]:
label_onehot = np.zeros((1,11))
label_onehot[:,7] = 1.0

z = tf.random.normal(shape=(1,model.encoder_block2.output[0].shape[1]),mean=0.0,stddev=1.0)
z_lbl_concat = np.concatenate((z,label_onehot),axis=1)
z_lbl_concat
result = model.decoder_block(z_lbl_concat).numpy().reshape(-1, 25,66)
result
[list(find_max(y)) for x in result for y in x]

[[1,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0],
 [0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  1,
  0,
  0,
  0],
 [0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  1,
  0],
 [0,
  