Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions integrations/mage_ai/functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import numpy as np

# Define a function to compute Haversine distance between consecutive coordinates
def haversine(long, lat):
"""Compute Haversine distance between each consecutive coordinate in (long, lat)."""

# Shift the longitude and latitude columns to get consecutive values
long_shifted = long.shift()
lat_shifted = lat.shift()

# Calculate the differences in longitude and latitude
long_diff = long_shifted - long
lat_diff = lat_shifted - lat

# Haversine formula to compute distance
a = np.sin(lat_diff/2.0)**2
b = np.cos(lat) * np.cos(lat_shifted) * np.sin(long_diff/2.0)**2
c = 2*np.arcsin(np.sqrt(a + b))

return c
14 changes: 14 additions & 0 deletions integrations/mage_ai/mage_tutorial/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
.DS_Store
.file_versions
.gitkeep
.log
.logs/
.mage_temp_profiles
.preferences.yaml
.variables/
__pycache__/
docker-compose.override.yml
logs/
mage-ai.db
mage_data/
secrets/
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import hopsworks
from mage_ai.data_preparation.shared.secrets import get_secret_value
if 'custom' not in globals():
from mage_ai.data_preparation.decorators import custom
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test


@custom
def transform_custom(*args, **kwargs):
"""
args: The output from any upstream parent blocks (if applicable)

Returns:
Anything (e.g. data frame, dictionary, array, int, str, etc.)
"""
TEST_SIZE = 0.2
# Specify the window length as "4h"
window_len = "4h"

# Specify your data exporting logic here
project = hopsworks.login(
api_key_value=get_secret_value('HOPSWORKS_API_KEY'),
)

fs = project.get_feature_store()

trans_fg = fs.get_feature_group(
name="transactions",
version=1,
)

window_aggs_fg = fs.get_feature_group(
name=f"transactions_{window_len}_aggs",
version=1,
)

# Select features for training data.
query = trans_fg.select(["fraud_label", "category", "amount", "age_at_transaction", "days_until_card_expires", "loc_delta"])\
.join(window_aggs_fg.select_except(["cc_num"]))

# Load transformation functions.
label_encoder = fs.get_transformation_function(name="label_encoder")

# Map features to transformations.
transformation_functions = {
"category": label_encoder,
}

# Get or create the 'transactions_view' feature view
feature_view = fs.get_or_create_feature_view(
name='transactions_view',
version=1,
query=query,
labels=["fraud_label"],
transformation_functions=transformation_functions,
)

return print('✅ Done')

@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import hopsworks
from mage_ai.data_preparation.shared.secrets import get_secret_value
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter


@data_exporter
def inference(data, *args, **kwargs):
"""
Deployment inference.

Args:
data: The output from the upstream parent block
args: The output from any additional upstream blocks (if applicable)
"""
# Specify your data exporting logic here
project = hopsworks.login(
api_key_value=get_secret_value('HOPSWORKS_API_KEY'),
)

# get Hopsworks Model Serving
ms = project.get_model_serving()

# get deployment object
deployment = ms.get_deployment("fraud")

# Start the deployment and wait for it to be running, with a maximum waiting time of 480 seconds
deployment.start(await_running=480)

# Make predictions using the deployed model
predictions = deployment.predict(
inputs=[4700702588013561],
)
print(f'⛳️ Prediction: {predictions}')

deployment.stop()

print('🔮 Deployment is stopped!')
151 changes: 151 additions & 0 deletions integrations/mage_ai/mage_tutorial/data_exporters/model_building.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import hopsworks
import xgboost as xgb
import pandas as pd
import os
from sklearn.metrics import confusion_matrix
from sklearn.metrics import f1_score
from matplotlib import pyplot
import seaborn as sns
import joblib
from hsml.schema import Schema
from hsml.model_schema import ModelSchema
from mage_ai.data_preparation.shared.secrets import get_secret_value
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter


def prepare_training_data(X_train, X_test, y_train, y_test):
# Sort the training features DataFrame 'X_train' based on the 'datetime' column
X_train = X_train.sort_values("datetime")

# Reindex the target variable 'y_train' to match the sorted order of 'X_train' index
y_train = y_train.reindex(X_train.index)

# Sort the test features DataFrame 'X_test' based on the 'datetime' column
X_test = X_test.sort_values("datetime")

# Reindex the target variable 'y_test' to match the sorted order of 'X_test' index
y_test = y_test.reindex(X_test.index)

# Drop the 'datetime' column from the training features DataFrame 'X_train'
X_train.drop(["datetime"], axis=1, inplace=True)

# Drop the 'datetime' column from the test features DataFrame 'X_test'
X_test.drop(["datetime"], axis=1, inplace=True)

return X_train, X_test, y_train, y_test


@data_exporter
def train_model(data, *args, **kwargs):
"""
Train an XGBoost classifier for fraud detection and save it in the Hopsworks Model Registry.

Args:
data: The output from the upstream parent block
args: The output from any additional upstream blocks (if applicable)
"""
TEST_SIZE = 0.2

# Specify your data exporting logic here
project = hopsworks.login(
api_key_value=get_secret_value('HOPSWORKS_API_KEY'),
)

fs = project.get_feature_store()

# Get the 'transactions_view' feature view
feature_view = fs.get_feature_view(
name='transactions_view',
version=1,
)

X_train, X_test, y_train, y_test = feature_view.train_test_split(
description='transactions fraud training dataset',
test_size=TEST_SIZE,
)

X_train, X_test, y_train, y_test = prepare_training_data(
X_train,
X_test,
y_train,
y_test,
)
X_train.to_csv(f'X_train.csv')

# Create an XGBoost classifier
model = xgb.XGBClassifier()

# Fit XGBoost classifier to the training data
model.fit(X_train, y_train)

# Predict the training data using the trained classifier
y_pred_train = model.predict(X_train)

# Predict the test data using the trained classifier
y_pred_test = model.predict(X_test)

# Compute f1 score
metrics = {
"f1_score": f1_score(y_test, y_pred_test, average='macro')
}

# Calculate and print the confusion matrix for the test predictions
results = confusion_matrix(y_test, y_pred_test)
print(results)

# Create a DataFrame for the confusion matrix results
df_cm = pd.DataFrame(
results,
['True Normal', 'True Fraud'],
['Pred Normal', 'Pred Fraud'],
)

# Create a heatmap using seaborn with annotations
cm = sns.heatmap(df_cm, annot=True)

# Get the figure and display it
fig = cm.get_figure()

# Create a Schema for the input features using the values of X_train
input_schema = Schema(X_train.values)

# Create a Schema for the output using y_train
output_schema = Schema(y_train)

# Create a ModelSchema using the defined input and output schemas
model_schema = ModelSchema(
input_schema=input_schema,
output_schema=output_schema,
)

# Convert the model schema to a dictionary for inspection
model_schema.to_dict()

# Specify the directory name for saving the model and related artifacts
model_dir = "quickstart_fraud_model"

# Check if the directory already exists; if not, create it
if not os.path.isdir(model_dir):
os.mkdir(model_dir)

# Save the trained XGBoost classifier to a joblib file in the specified directory
joblib.dump(model, model_dir + '/xgboost_model.pkl')

# Save the confusion matrix heatmap figure to an image file in the specified directory
fig.savefig(model_dir + "/confusion_matrix.png")

# Get the model registry
mr = project.get_model_registry()

# Create a Python model named "fraud" in the model registry
fraud_model = mr.python.create_model(
name="fraud",
metrics=metrics, # Specify the metrics used to evaluate the model
model_schema=model_schema, # Use the previously defined model schema
input_example=[4700702588013561], # Provide an input example for testing deployments
description="Quickstart Fraud Predictor", # Add a description for the model
)

# Save the model to the specified directory
fraud_model.save(model_dir)
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import hopsworks
import os
import time
from mage_ai.data_preparation.shared.secrets import get_secret_value
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter


@data_exporter
def deploy_model(data, *args, **kwargs):
"""
Deploys the trained XGBoost classifier.

Args:
data: The output from the upstream parent block
args: The output from any additional upstream blocks (if applicable)
"""
# Specify your data exporting logic here
project = hopsworks.login(
api_key_value=get_secret_value('HOPSWORKS_API_KEY'),
)

fs = project.get_feature_store()
# Get the model registry
mr = project.get_model_registry()

# Get model object
fraud_model = mr.get_model(
name="fraud",
version=1,
)
print('Model is here!')

# Get the dataset API from the project
dataset_api = project.get_dataset_api()

# Specify the file to upload ("predict_example.py") to the "Models" directory, and allow overwriting
uploaded_file_path = dataset_api.upload(
"predictor_script.py",
"Models",
overwrite=True,
)

# Construct the full path to the uploaded predictor script
predictor_script_path = os.path.join(
"/Projects",
project.name,
uploaded_file_path,
)

# Deploy the fraud model
deployment = fraud_model.deploy(
name="fraud", # Specify the deployment name
script_file=predictor_script_path, # Provide the path to the predictor script
)

print("Deployment is warming up...")
time.sleep(45)
Loading