In [None]:
import hopsworks

In [None]:
# Logging into hopsworks
project = hopsworks.login()

In [None]:
# Fetching feature store
fs = project.get_feature_store()

# Creating feature groups

In [None]:
# Importing on-demand transformation functions
from features.transaction_features import fetch_and_create_transactions_features
from features.credict_card_features import fetch_and_create_credit_card_features
from hopsworks.hsfs.feature import Feature

In [None]:
# Creating feature group
transactions_fg = fs.get_or_create_feature_group(name="transactions_feature_group", 
                                    version=1, 
                                    online_enabled=True,
                                    primary_key=["cc_num"],
                                    features=[
                                        Feature(name="tid", type="string"),
                                    ],
                                    event_time="transaction_time",
                                    transformation_functions=[
                                        fetch_and_create_transactions_features("tid").alias("cc_num", "category", "transaction_time", "amount", "transaction_city", "transaction_country", "fraud_label") # Creating on-demand transformation by attaching transformation functions to Feature Group 
                                    ]) 

In [None]:
# Saving feature group
transactions_fg.save()

In [None]:
# Creating feature group
credit_cards_fg = fs.get_or_create_feature_group(name="credit_cards_feature_group", 
                                    version=1, 
                                    online_enabled=True,
                                    primary_key=["cc_num"],
                                    transformation_functions=[
                                        fetch_and_create_credit_card_features("cc_num", "current_datetime").alias("cc_num", "days_to_expiry", "age_at_transaction", "sex", "city", "country") # Creating on-demand transformation by attaching transformation functions to Feature Group
                                    ]) 

In [None]:
# Saving feature group
credit_cards_fg.save()

# Creating a feature view

In [None]:
# Select the features required for the feature view
query = transactions_fg.select(['tid', 'cc_num', 'category', 'amount', 'transaction_city', 'transaction_country', 'fraud_label', 'transaction_time']).join(
            credit_cards_fg.select(['days_to_expiry', 'age_at_transaction', 'sex', 'city','country']), prefix="fg2_")

In [None]:
# Creating feature view
fv = fs.get_or_create_feature_view(name="fraud_feature_view", 
                                   version=1, 
                                   query=query,
                                   labels=["fraud_label"],
                                  logging_enabled=True)

# Fetch Saved model and save model to registry

In [None]:
# Fetching model registry
mr = project.get_model_registry()

In [None]:
# Create a Python model in the model registry
fraud_model = mr.python.create_model(
    name="fraud_model", 
    description="test description", # Add a description for the model
    input_example=[4467360740682089, "51d90e9721e699f24382bf9dd10da420"],     # Example input for testing deployments
    feature_view=fv,            # Add a feature view to the model
    training_dataset_version=1
)

# Save the pre-trained model to the specified model directory
fraud_model.save('fraud_batch_model/xgb_classifier.pkl')


# Deploy model 

Creating a predictor file that is used to make predictions in the deployed model.

In [None]:
%%writefile predict_example_new.py

import os
import numpy as np
import hopsworks
import joblib
from xgboost import XGBClassifier
from datetime import datetime
import psycopg2


class Predict(object):

    def __init__(self, project, model, async_logger):
        """Initializes the serving state, reads a trained model"""
        fs = project.get_feature_store()

        self.feature_view = fs.get_feature_view(name="fraud_feature_view", version=1)

        # Initialize feature logging for collecting transformed features.
        self.feature_view.init_serving(feature_logger=async_logger)

        self.hopsworks_model = model
        self.model = joblib.load(os.environ["MODEL_FILES_PATH"] + "/xgb_classifier.pkl")

        print("Initialization Complete")

    def predict(self, inputs):
        """Serves a prediction request usign a trained model"""
        cc_num = inputs[0][0]
        tid = inputs[0][1]

        # TODO : psycopg2 seems to get diconnected this should be moved into init if it was working properly
        conn = psycopg2.connect(
            database="test_db",
            user="admin",
            host="10.2.1.149",
            password="admin",
            port=5432,
        )

        feature_vector = self.feature_view.get_feature_vector(
            {"cc_num": cc_num}, # Key to retrieve pre-computed features from the online feature store
            passed_features={"tid": tid}, # Features to use "as-is" in the feature vector

            request_parameters={ # Parameters to pass to the on-demand transfromations
                "cc_num": cc_num,
                "current_datetime": datetime.now(),
            },
            transformation_context={"connection": conn}, # Additional context to provide to the on-demand tranformations
            return_type="pandas"
        )

        # Drop the primary keys from the vector that is provided to the model
        # The primary keys are neded to for feature logging.
        parsed_feature_vector = feature_vector[
            [
                "category",
                "amount",
                "transaction_city",
                "transaction_country",
                "fg2_days_to_expiry",
                "fg2_age_at_transaction",
                "fg2_sex",
                "fg2_city",
                "fg2_country",
            ]
        ]

        predictions = self.model.predict(
            parsed_feature_vector
        ).tolist()  # Numpy Arrays are not JSON serializable

        # Logging the feature vector, predictions and the model. 
        # The same feature vector is provided as untransformed and transformed since there is no model-dependent transformation in this case.
        self.feature_view.log(
            untransformed_features=feature_vector.values.tolist(),
            transformed_features=feature_vector.values.tolist(),
            predictions=[predictions],
            model=self.hopsworks_model,
        )

        return predictions

In [None]:
import os

# Get the dataset API for the current project
dataset_api = project.get_dataset_api()

# Specify the local file path of the Python script to be uploaded
local_script_path = "predict_example_new.py"

# Upload the Python script to the "Models", and overwrite if it already exists
uploaded_file_path = dataset_api.upload(local_script_path, "Models", overwrite=True)

# Create the full path to the uploaded script for future reference
predictor_script_path = os.path.join("/Projects", project.name, uploaded_file_path)

In [None]:
# Deploy the fraud model
deployment = fraud_model.deploy(
    name="fraudeployment",  # Specify a name for the deployment
    script_file=predictor_script_path,  # Provide the path to the Python script for prediction
)

In [None]:
# Starting the deployment
deployment.start()

In [None]:
# Making predictions on the deployment.
deployment.predict(inputs=[[4148299918528368, '201209bc29f918f28956da351d95ba37']])