In [10]:
import pandas as pd
import hopsworks
import joblib
import datetime
from datetime import datetime
import numpy as np

import dataframe_image as dfi
import torch
import torch.nn as nn
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error

In [11]:
class NeuralNetwork(nn.Module):
    def __init__(self, input_size):
        super(NeuralNetwork, self).__init__()
        self.fc1 = nn.Linear(input_size, 128)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.2)
        self.fc2 = nn.Linear(128, 64)
        self.output = nn.Linear(64, 1)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.output(x)
        return x

In [12]:
project = hopsworks.login()
fs = project.get_feature_store()

dataset_api = project.get_dataset_api()

mr = project.get_model_registry()
model = mr.get_model("flight_delay_model", version=2)
model_dir = model.download()
model = joblib.load(model_dir + "/flight_delay_model.pkl")

feature_group = fs.get_feature_group(name="flight_data_v3")
feature_view = fs.get_feature_view(name="flight_data_v3")

Connection closed.
Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/197786
Connected. Call `.close()` to terminate connection gracefully.
Connected. Call `.close()` to terminate connection gracefully.
Downloading model artifact (0 dirs, 1 files)... DONE



## Fitting the scaler to the training data

In [14]:
# fit the scaler
X_train, X_test, y_train, y_test = feature_view.get_train_test_split(training_dataset_version=1)
scaler = StandardScaler()
X_train_tensor = torch.tensor(X_train.values, dtype=torch.float32)
X_train_scaled = scaler.fit_transform(X_train_tensor)

Finished: Reading data from Hopsworks, using ArrowFlight (48.07s) 


## Monitoring

In [15]:

batch_data = feature_view.get_batch_data()
batch_data = torch.tensor(batch_data.values, dtype=torch.float32)
batch_data = scaler.transform(batch_data)
batch_data = torch.tensor(batch_data, dtype=torch.float32)

preds = model(batch_data)

window_length = 15
latest_preds = preds[-window_length:]
latest_preds = latest_preds.detach().numpy()
latest_preds.ravel()
latest_pred = float(latest_preds[-1])

print("Delay_predicted: " + str(latest_pred))

df = feature_group.read()

latest_labels = df[-window_length:]["dep_delay_new"]
latest_labels = latest_labels.to_numpy()
latest_label = float(latest_labels[-1])

print(latest_labels)
print("Delay_actual: " + str(latest_label))


loss = mean_squared_error(latest_labels, latest_preds)
print("Running MSE (n = 15): " + str(loss))


Finished: Reading data from Hopsworks, using ArrowFlight (46.14s) 




Delay_predicted: 7.680978775024414
Finished: Reading data from Hopsworks, using ArrowFlight (47.50s) 
[48.  0.  0.  4. 44. 34.  0.  0.  0. 96.  0.  0.  0.  0.  0.]
Delay_actual: 0.0
Running MSE (n = 15): 13003.942971068702


In [16]:
monitor_fg = fs.get_or_create_feature_group(
    name="flight_delay_predictions",
    version=1,
    primary_key=[
        "datetime",
    ],
    description="Flight delay Prediction/Outcome Monitoring",
)
now = datetime.now().strftime("%m/%d/%Y, %H:%M:%S")
data = {
    "datetime": [now],
    "prediction": [latest_pred],
    "label": [latest_label],
    "mse": [loss],
}

monitor_df = pd.DataFrame(data)
monitor_fg.insert(monitor_df, write_options={"wait_for_job": False}) # set this to True if you want to run it faster (async) but you will not be able to run the next cell

Uploading Dataframe: 0.00% |          | Rows 0/1 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: flight_delay_predictions_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/197786/jobs/named/flight_delay_predictions_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x7f8313b8c370>, None)

## Add to history

In [31]:
history_df = monitor_fg.read()
history_df = pd.concat([history_df, monitor_df])

df_recent = history_df.tail(5)
dfi.export(df_recent, "./recent_delay_performance.png", table_conversion="matplotlib")
dataset_api.upload("./recent_delay_performance.png", "Resources/images", overwrite=True)



Finished: Reading data from Hopsworks, using ArrowFlight (2.57s) 


Uploading: 0.000%|          | 0/20703 elapsed<00:00 remaining<?

'Resources/images/recent_delay_performance.png'