# <span style="font-width:bold; font-size: 3rem; color:#1EB182;"><img src="images/icon102.png" width="38px"></img> **Hopsworks Feature Store** </span><span style="font-width:bold; font-size: 3rem; color:#333;">- Part 03: Model training & UI Exploration</span>

<span style="font-width:bold; font-size: 1.4rem;">In this last notebook, we will train a model on the dataset we created in the previous tutorial. We will train our model using standard Python and Scikit-learn, although it could just as well be trained with other machine learning frameworks such as PySpark, TensorFlow, and PyTorch. We will also show some of the exploration that can be done in Hopsworks, notably the search functions and the lineage. </span>

## **🗒️ This notebook is divided in 3 main sections:** 
1. **Loading the training data**
2. **Train the model**
3. **Explore feature groups and views** via the UI.

![tutorial-flow](images/03_model.png)

In [1]:
import hsfs

conn = hsfs.connection()
fs = conn.get_feature_store()

Connected. Call `.close()` to terminate connection gracefully.


In [2]:
feature_view = fs.get_feature_view("transactions_view", 1)

## <span style="color:#ff5f27;"> ✨ Load Training Data </span>

First, we'll need to fetch the training dataset that we created in the previous notebook. We will use January - February data training and testing.

In [3]:
import pandas as pd
from sklearn.linear_model import LogisticRegression

train_jan_feb_x, train_jan_feb_y = feature_view.get_training_data(1)
train_mar_x, train_mar_y = feature_view.get_training_data(2)



In [4]:
train_jan_feb_x[train_jan_feb_y.fraud_label==0]

Unnamed: 0,amount,age_at_transaction,days_until_card_expires,loc_delta,loc_delta_mavg
0,0.002478,0.451919,0.873791,0.106689,0.108080
1,0.000247,0.628477,0.512349,0.045635,0.046230
2,0.191015,0.033169,0.853372,0.236723,0.239810
3,0.002814,0.387324,0.307864,0.099704,0.156792
4,0.001679,0.632300,0.813488,0.000029,0.000030
...,...,...,...,...,...
38978,0.003110,0.665616,0.738367,0.000054,0.000055
38979,0.001577,0.578103,0.563284,0.101315,0.102636
38980,0.001344,0.574697,0.763167,0.467712,0.348457
38981,0.000972,0.614414,0.075533,0.000055,0.000055


In [5]:
non_fraud = train_jan_feb_x[train_jan_feb_y.fraud_label!=1]

In [6]:
non_fraud

Unnamed: 0,amount,age_at_transaction,days_until_card_expires,loc_delta,loc_delta_mavg
0,0.002478,0.451919,0.873791,0.106689,0.108080
1,0.000247,0.628477,0.512349,0.045635,0.046230
2,0.191015,0.033169,0.853372,0.236723,0.239810
3,0.002814,0.387324,0.307864,0.099704,0.156792
4,0.001679,0.632300,0.813488,0.000029,0.000030
...,...,...,...,...,...
38978,0.003110,0.665616,0.738367,0.000054,0.000055
38979,0.001577,0.578103,0.563284,0.101315,0.102636
38980,0.001344,0.574697,0.763167,0.467712,0.348457
38981,0.000972,0.614414,0.075533,0.000055,0.000055


We will train a model to predict `fraud_label` given the rest of the features.

Let's check the distribution of our target label.

In [7]:
train_jan_feb_y.value_counts(normalize=True)

fraud_label
0              0.999487
1              0.000513
dtype: float64

In [8]:
train_mar_y.value_counts(normalize=True)

fraud_label
0              1.0
dtype: float64

Notice that the distribution is extremely skewed, which is natural considering that fraudulent transactions make up a tiny part of all transactions. Thus we should somehow address the class imbalance. There are many approaches for this, such as weighting the loss function, over- or undersampling, creating synthetic data, or modifying the decision threshold. In this example, we'll use the simplest method which is to just supply a class weight parameter to our learning algorithm. The class weight will affect how much importance is attached to each class, which in our case means that higher importance will be placed on positive (fraudulent) samples.

## <span style="color:#ff5f27;"> 🏃 Train Model</span>

Next we'll train a model. Here, we set the class weight of the positive class to be twice as big as the negative class.

In [9]:
import tensorflow as tf

def windowed_dataset(dataset, window_size, batch_size):
    ds = dataset.window(window_size, shift=1, drop_remainder=True)
    ds = ds.flat_map(lambda x: x.batch(window_size))
    return ds.batch(batch_size,True).prefetch(1)

training_dataset = tf.data.Dataset.from_tensor_slices(tf.cast(non_fraud[["loc_delta"]].values, tf.float32))
training_dataset = windowed_dataset(training_dataset, window_size=32, batch_size=32)
training_dataset

class GanEncAnomalyDetector(tf.keras.Model):
    
    def __init__(self, input_dim):
        super(GanEncAnomalyDetector, self).__init__()
        
        self.input_dim = input_dim
        self.latent_dim = [4, 4]         
        self.d_steps = 3
        self.gp_weight = 10 
        
        self.encoder = self.make_encoder_model(self.input_dim)
        self.generator = self.make_generator(self.input_dim, self.latent_dim)
        self.discriminator = self.make_discriminator_model(self.input_dim)

        self.mse = tf.keras.losses.MeanSquaredError()
        
        self.epoch_e_loss_avg = tf.keras.metrics.Mean(name="epoch_e_loss_avg")
        self.epoch_d_loss_avg = tf.keras.metrics.Mean(name="epoch_d_loss_avg")
        self.epoch_g_loss_avg = tf.keras.metrics.Mean(name="epoch_g_loss_avg")
        self.epoch_a_score_avg = tf.keras.metrics.Mean(name="epoch_a_score_avg")

        @property
        def metrics(self):
            return [
                self.epoch_e_loss_avg,
                self.epoch_d_loss_avg,
                self.epoch_g_loss_avg,
                self.epoch_a_score_avg,
            ]

    # define model architectures
    def make_encoder_model(self, input_dim):
        inputs = tf.keras.layers.Input(shape=(input_dim[0],input_dim[1]))
        x = tf.keras.layers.Conv1D(filters = 16, kernel_size= 1,padding='same', kernel_initializer="uniform")(inputs)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.LeakyReLU(alpha=0.2)(x)    
        x = tf.keras.layers.MaxPooling1D(pool_size=2, padding='same')(x)
        x = tf.keras.layers.Conv1D(filters = 8, kernel_size= 1,padding='same',  kernel_initializer="uniform")(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.LeakyReLU(alpha=0.2)(x)    
        x = tf.keras.layers.MaxPooling1D(pool_size=2, padding='same')(x)    
        x = tf.keras.layers.Conv1D(filters = 4, kernel_size= 1,padding='same',  kernel_initializer="uniform")(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.LeakyReLU(alpha=0.2)(x)    
        x = tf.keras.layers.MaxPooling1D(pool_size=2, padding='same')(x)    
        encoder = tf.keras.Model(inputs=inputs, outputs=x, name="encoder_model")
        return encoder

    def make_generator(self, input_dim, latent_dim):
        latent_inputs = tf.keras.layers.Input(shape=(latent_dim[0],latent_dim[1]))
        x = tf.keras.layers.Conv1D(filters = 4, kernel_size= 1,padding='same', kernel_initializer="uniform")(latent_inputs) 
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.LeakyReLU(alpha=0.2)(x)    
        x = tf.keras.layers.UpSampling1D(2)(x) 
        x = tf.keras.layers.Conv1D(filters = 8, kernel_size= 1,padding='same', kernel_initializer="uniform")(x) 
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.LeakyReLU(alpha=0.2)(x)    
        x = tf.keras.layers.UpSampling1D(2)(x) 
        x = tf.keras.layers.Conv1D(filters = 16, kernel_size= 1,padding='same', kernel_initializer="uniform")(x) 
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.LeakyReLU(alpha=0.2)(x)    
        x = tf.keras.layers.UpSampling1D(2)(x) 
        x = tf.keras.layers.Conv1D(filters = input_dim[1], kernel_size= 1,padding='same', kernel_initializer="uniform")(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.LeakyReLU(alpha=0.2)(x)    
        generator = tf.keras.Model(inputs=latent_inputs, outputs=x, name="generator_model")        
        return generator

    def make_discriminator_model(self, input_dim):
        inputs = tf.keras.layers.Input(shape=(input_dim[0],input_dim[1]))
        x = tf.keras.layers.Conv1D(filters = 128, kernel_size= 1,padding='same', kernel_initializer="uniform")(inputs)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.LeakyReLU(alpha=0.2)(x)    
        x = tf.keras.layers.MaxPooling1D(pool_size=2, padding='same')(x)
        x = tf.keras.layers.Conv1D(filters = 64, kernel_size= 1,padding='same', kernel_initializer="uniform")(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.LeakyReLU(alpha=0.2)(x)    

        # dense output layer
        x = tf.keras.layers.Flatten()(x)    
        x = tf.keras.layers.LeakyReLU(0.2)(x)
        x = tf.keras.layers.Dense(128)(x)
        x = tf.keras.layers.LeakyReLU(0.2)(x)
        prediction = tf.keras.layers.Dense(1)(x)
        discriminator = tf.keras.Model(inputs=inputs, outputs=prediction, name="discriminator_model" )               
        return discriminator
        
    # Training function
    @tf.function
    def train_step(self, real_data):
        if isinstance(real_data, tuple):
            real_data = real_data[0]

        # Get the batch size
        batch_size = tf.shape(real_data)[0]

        # For each batch, we are going to perform the
        # following steps as laid out in the original paper:
        # 1. Train the generator and get the generator loss
        # 1a. Train the encoder and get the encoder loss
        # 2. Train the discriminator and get the discriminator loss
        # 3. Calculate the gradient penalty
        # 4. Multiply this gradient penalty with a constant weight factor
        # 5. Add the gradient penalty to the discriminator loss
        # 6. Return the generator and discriminator losses as a loss dictionary

        # Train the discriminator first. The original paper recommends training
        # the discriminator for `x` more steps (typically 5) as compared to
        # one step of the generator. Here we will train it for 3 extra steps
        # as compared to 5 to reduce the training time.
        for i in range(self.d_steps):
            # Get the latent vector
            random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim[0], self.latent_dim[1])), 
            with tf.GradientTape() as tape:
                # Generate fake data from the latent vector
                fake_data = self.generator(random_latent_vectors, training=True)

                #(somewhere here step forward?)
                # Get the logits for the fake data
                fake_logits = self.discriminator(fake_data, training=True)
                # Get the logits for the real data
                real_logits = self.discriminator(real_data, training=True)

                # Calculate the discriminator loss using the fake and real sample logits
                d_cost = self.discriminator_loss(real_sample=real_logits, fake_sample=fake_logits)
                # Calculate the gradient penalty
                gp = self.gradient_penalty(real_data, fake_data)
                # Add the gradient penalty to the original discriminator loss
                d_loss = d_cost + gp * self.gp_weight

            # Get the gradients w.r.t the discriminator loss
            d_gradient = tape.gradient(d_loss, self.discriminator.trainable_variables)
            # Update the weights of the discriminator using the discriminator optimizer
            self.d_optimizer.apply_gradients(
                zip(d_gradient, self.discriminator.trainable_variables)
            )

        # Train the generator
        # Get the latent vector
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim[0], self.latent_dim[1]))
        with tf.GradientTape() as tape:
            # Generate fake data using the generator
            generated_data = self.generator(random_latent_vectors, training=True)
            # Get the discriminator logits for fake data
            gen_sample_logits = self.discriminator(generated_data, training=True)
            # Calculate the generator loss
            g_loss = self.generator_loss(gen_sample_logits)

        # Get the gradients w.r.t the generator loss
        gen_gradient = tape.gradient(g_loss, self.generator.trainable_variables)
        # Update the weights of the generator using the generator optimizer
        self.g_optimizer.apply_gradients(
            zip(gen_gradient, self.generator.trainable_variables)
        )

        # Train the encoder
        with tf.GradientTape() as tape:
            generated_data = self.generator(random_latent_vectors, training=True)
            # Compress generate fake data from the latent vector
            encoded_fake_data = self.encoder(generated_data, training=True)
            # Reconstruct encoded generate fake data
            generator_reconstructed_encoded_fake_data = self.generator(encoded_fake_data, training=True)
            # Encode the latent vector
            encoded_random_latent_vectors = self.encoder(tf.random.normal(shape=(batch_size, self.input_dim[0], self.input_dim[1])), 
                                                         training=True)
            # Calculate encoder loss
            e_loss = self.encoder_loss(generated_data, generator_reconstructed_encoded_fake_data)

        # Get the gradients w.r.t the generator loss
        enc_gradient = tape.gradient(e_loss, self.encoder.trainable_variables)
        # Update the weights of the generator using the generator optimizer
        self.e_optimizer.apply_gradients(
            zip(enc_gradient, self.encoder.trainable_variables)
        )

        anomaly_score = self.compute_anomaly_score(real_data)

        self.epoch_d_loss_avg.update_state(d_loss)
        self.epoch_g_loss_avg.update_state(g_loss)
        self.epoch_e_loss_avg.update_state(e_loss)
        self.epoch_a_score_avg.update_state(anomaly_score["anomaly_score"])

        return {"d_loss": d_loss, "g_loss": g_loss, "e_loss": e_loss, "anomaly_score": anomaly_score["anomaly_score"]}

    @tf.function
    def test_step(self, input):
        if isinstance(input, tuple):
            input = input[0]
        
        batch_size = tf.shape(input)[0]
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim[0], self.latent_dim[1]))
        # Generate fake data using the generator
        generated_data = self.generator(random_latent_vectors, training=False)
        # Get the discriminator logits for fake data
        gen_sample_logits = self.discriminator(generated_data, training=False)
        # Calculate the generator loss
        g_loss = self.generator_loss(gen_sample_logits)

        
        # Compress generate fake data from the latent vector
        encoded_fake_data = self.encoder(generated_data, training=False)
        # Reconstruct encoded generate fake data
        generator_reconstructed_encoded_fake_data = self.generator(encoded_fake_data, training=False)

        # Calculate encoder loss
        e_loss = self.encoder_loss(generated_data, generator_reconstructed_encoded_fake_data)
        
        anomaly_score = self.compute_anomaly_score(input)
        return {
            "g_loss": g_loss,
            "e_loss": e_loss,
            "anomaly_score": anomaly_score["anomaly_score"]
        }
    
    # define custom server function
    @tf.function
    def serve_function(self, input):
        return self.compute_anomaly_score(input)

    def call(self, input):
        if isinstance(input, tuple):
            input = input[0]
        
        encoded = self.encoder(input)
        decoded = self.generator(encoded)
        anomaly_score = self.compute_anomaly_score(input)
        return anomaly_score["anomaly_score"], decoded

    def compile(self):
        super(GanEncAnomalyDetector, self).compile()     
        # Define optimizers
        self.e_optimizer = tf.keras.optimizers.SGD(lr=0.00001, clipnorm=0.01)        
        self.d_optimizer = tf.keras.optimizers.SGD(lr=0.00001, clipnorm=0.01)
        self.g_optimizer = tf.keras.optimizers.SGD(lr=0.00001, clipnorm=0.01)

    def gradient_penalty(self, real_data, fake_data):
        """ Calculates the gradient penalty.
        This loss is calculated on an interpolated sample
        and added to the discriminator loss.
        """
        # Get the interpolated sample
        real_data_shape = tf.shape(real_data)
        alpha = tf.random.normal(shape=[real_data_shape[0], real_data_shape[1], real_data_shape[2]], mean=0.0, stddev=2.0, dtype=tf.dtypes.float32)
        #alpha = tf.random_uniform([self.batch_size, 1], minval=-2, maxval=2, dtype=tf.dtypes.float32)
        interpolated = (alpha * real_data) + ((1 - alpha) * fake_data)

        with tf.GradientTape() as gp_tape:
            gp_tape.watch(interpolated)
            # 1. Get the discriminator output for this interpolated sample.
            pred = self.discriminator(interpolated, training=True)

        # 2. Calculate the gradients w.r.t to this interpolated sample.
        grads = gp_tape.gradient(pred, [interpolated])[0]
        # 3. Calculate the norm of the gradients.
        norm = tf.sqrt(tf.reduce_sum(tf.square(grads), axis=[-2, -1]))
        gp = tf.reduce_mean((norm - 1.0) ** 2)
        return gp    
        
    def encoder_loss(self,generated_fake_data, generator_reconstructed_encoded_fake_data):
        generator_reconstracted_data = tf.cast(generator_reconstructed_encoded_fake_data, tf.float32)
        loss = self.mse(generated_fake_data, generator_reconstracted_data)
        beta_cycle_gen = 10.0
        loss = loss * beta_cycle_gen
        return loss

    # Define the loss functions for the discriminator,
    # which should be (fake_loss - real_loss).
    # We will add the gradient penalty later to this loss function.
    def discriminator_loss(self, real_sample, fake_sample):
        real_loss = tf.reduce_mean(real_sample)
        fake_loss = tf.reduce_mean(fake_sample)
        return fake_loss - real_loss

    # Define the loss functions for the generator.
    def generator_loss(self, fake_sample):
        return -tf.reduce_mean(fake_sample)
    
    def compute_anomaly_score(self, input):
        """anomaly score.
          See https://arxiv.org/pdf/1905.11034.pdf for more details
        """
        # Encode the real data
        encoded_real_data = self.encoder(input, training=False)
        # Reconstruct encoded real data
        generator_reconstructed_encoded_real_data = self.generator(encoded_real_data, training=False)
        # Calculate distance between real and reconstructed data (Here may be step forward?)
        gen_rec_loss_predict = self.mse(input,generator_reconstructed_encoded_real_data)

        # # Compute anomaly score
        # real_to_orig_dist_predict = tf.math.reduce_sum(tf.math.pow(encoded_random_latent - encoded_real_data, 2), axis=[-1])
        # anomaly_score = (gen_rec_loss_predict * self.anomaly_alpha) + ((1 - self.anomaly_alpha) * real_to_orig_dist_predict)
        anomaly_score = gen_rec_loss_predict
        return {'anomaly_score': anomaly_score} 
    

Let's see how well it performs on our validation data.

In [10]:
model = GanEncAnomalyDetector([32, 1])
model.compile()

In [11]:
for layer in model.layers:
    print(layer.name, layer.output_shape)

encoder_model (None, 4, 4)
generator_model (None, 32, 1)
discriminator_model (None, 1)


In [None]:
history = model.fit(training_dataset,
                    epochs=5,
                    verbose=0,
                    validation_data=training_dataset,
                    validation_steps=1,                    
                   )
history_dict = history.history
history_dict

In [None]:
history_dict

## <span style="color:#ff5f27;">  Use the model to score transactions </span>
We trained model based on January - February data. Now lets retrieve March data and score whether transactions are fraudulend or not   


In [7]:
from datetime import datetime
date_format = "%Y-%m-%d %H:%M:%S"
# Create training datasets based event time filter
start_time = int(float(datetime.strptime("2022-01-03 00:00:01", date_format).timestamp()) * 1000)
end_time = int(float(datetime.strptime("2022-03-31 23:59:59", date_format).timestamp()) * 1000)

feature_view.init_batch_scoring(1)
march_transactions = feature_view.get_batch_data(start_time = start_time,  end_time = end_time)

2022-05-31 21:10:33,992 INFO: USE `fraud_simplified_featurestore`
2022-05-31 21:10:34,773 INFO: WITH right_fg0 AS (SELECT *
FROM (SELECT `fg1`.`category` `category`, `fg1`.`amount` `amount`, `fg1`.`age_at_transaction` `age_at_transaction`, `fg1`.`days_until_card_expires` `days_until_card_expires`, `fg1`.`loc_delta` `loc_delta`, `fg1`.`cc_num` `join_pk_cc_num`, `fg1`.`datetime` `join_evt_datetime`, `fg0`.`trans_volume_mstd` `trans_volume_mstd`, `fg0`.`trans_volume_mavg` `trans_volume_mavg`, `fg0`.`trans_freq` `trans_freq`, `fg0`.`loc_delta_mavg` `loc_delta_mavg`, RANK() OVER (PARTITION BY `fg1`.`cc_num`, `fg1`.`datetime` ORDER BY `fg0`.`datetime` DESC) pit_rank_hopsworks
FROM `fraud_simplified_featurestore`.`transactions_1` `fg1`
INNER JOIN `fraud_simplified_featurestore`.`transactions_4h_aggs_1` `fg0` ON `fg1`.`cc_num` = `fg0`.`cc_num` AND `fg1`.`datetime` >= `fg0`.`datetime`
WHERE `fg1`.`datetime` >= 1641168001000 AND `fg1`.`datetime` <= 1648771199000) NA
WHERE `pit_rank_hopsworks` = 

In [8]:
march_transactions

Unnamed: 0,category,amount,age_at_transaction,days_until_card_expires,loc_delta,trans_volume_mstd,trans_volume_mavg,trans_freq,loc_delta_mavg
0,4,0.003120,0.091487,0.117163,0.000000,0.003120,0.003120,0.003120,0.000000
1,2,0.002173,0.091506,0.116883,0.122200,0.002173,0.002173,0.002173,0.132292
2,4,0.000008,0.091513,0.116773,0.120125,0.000008,0.000008,0.000008,0.130046
3,4,0.000047,0.091518,0.116696,0.000000,0.000028,0.000028,0.000028,0.065023
4,4,0.000659,0.091615,0.115229,0.040270,0.000659,0.000659,0.000659,0.043596
...,...,...,...,...,...,...,...,...,...
102971,0,0.000736,0.357286,0.467677,0.228904,0.000736,0.000736,0.000736,0.247809
102972,0,0.002816,0.357321,0.467147,0.166719,0.002816,0.002816,0.002816,0.180488
102973,0,0.002934,0.357325,0.467088,0.166874,0.002875,0.002875,0.002875,0.180572
102974,0,0.010322,0.357392,0.466076,0.001149,0.010322,0.010322,0.010322,0.001244


In [9]:
predictions = clf.predict(march_transactions)

In [10]:
predictions

array([0, 0, 0, ..., 0, 0, 0])

## <span style="color:#ff5f27;"> 👓  Exploration</span>
In the Hopsworks feature store, the metadata allows for multiple levels of explorations and review. Here we will show a few of those capacities. 

### 🔎 <b>Search</b> 
Using the search function in the ui, you can query any aspect of the feature groups and training data that was previously created. In the gif below we show how the tag we added in the first section can be searched to get all the feature groups with the `PII` tag value.

### 📊 <b>Statistics</b> 
We can also enable statistics in one or all the feature groups here we commented the command so that it wouldnt run for too long. 

In [None]:
#trans_fg = fs.get_feature_group()
#trans_fg.update_statistics_config(
#       enabled=True, 
#       correlations=False, 
#       histograms=False, 
#)


### ⛓️ <b> Lineage </b> 
In all the feature groups and feature view you can look at the relation between each abstractions; what feature group created which training dataset and that is used in which model.
This allows for a clear undestanding of the pipeline in relation to each element. 

## <span style="color:#ff5f27;"> 🎁  Wrapping things up </span>

We have now performed a simple training with training data that we have created in the feature store. This concludes the fisrt module and introduction to the core aspect of the feauture store. In the second module we will introduce streaming and external feature groups for a similar fraud use case.