***Connect to Hopsworks Feature Store***

In [None]:
import hsfs
conn = hsfs.connection(
    host="020d3ad0-7247-11ee-8227-d9815d55065e.cloud.hopsworks.ai",                                # DNS of your Feature Store instance
    project="myproject",                      # Name of your Hopsworks Feature Store project
    hostname_verification=False,                     # Disable for self-signed certificates
    api_key_value="clsslXKEqyCMAKNb.1As1OohStIij78WicPSkC1Zkumwj8vxHxoWoyZJ1EbtFArrd1oI2ZsuglgMNjfDl"          # Feature store API key value 
)
fs = conn.get_feature_store()           # Get the project's default feature store"

***Imports***

In [None]:
import json
from math import radians
from datetime import datetime

import pandas as pd
import numpy as np

from hsfs.feature import Feature
import pyspark.sql.functions as F

***Data loading***

In [None]:
credit_cards_df = pd.read_csv("https://repo.hops.works/master/hopsworks-tutorials/data/card_fraud_data/credit_cards.csv")
profiles_df = pd.read_csv("https://repo.hops.works/master/hopsworks-tutorials/data/card_fraud_data/profiles.csv", parse_dates=["birthdate"])
trans_df_raw = pd.read_csv("https://repo.hops.works/master/hopsworks-tutorials/data/card_fraud_data/transactions.csv", parse_dates=["datetime"])

***Data preparation***

In [None]:
age_df = trans_df_raw.merge(profiles_df, on="cc_num", how="left")
trans_df_raw["age_at_transaction"] = (age_df["datetime"] - age_df["birthdate"]) / np.timedelta64(1, "Y")

card_expiry_df = trans_df_raw.merge(credit_cards_df, on="cc_num", how="left")
card_expiry_df["expires"] = pd.to_datetime(card_expiry_df["expires"], format="%m/%y")
trans_df_raw["days_until_card_expires"] = (card_expiry_df["expires"] - card_expiry_df["datetime"]) / np.timedelta64(1, "D")

trans_df_raw.sort_values("datetime", inplace=True)
trans_df_raw[["longitude", "latitude"]] = trans_df_raw[["longitude", "latitude"]].applymap(radians)


def haversine(long, lat):
    """Compute Haversine distance between each consecutive coordinate in (long, lat)."""

    long_shifted = long.shift()
    lat_shifted = lat.shift()
    long_diff = long_shifted - long
    lat_diff = lat_shifted - lat

    a = np.sin(lat_diff/2.0)**2
    b = np.cos(lat) * np.cos(lat_shifted) * np.sin(long_diff/2.0)**2
    c = 2*np.arcsin(np.sqrt(a + b))

    return c


trans_df_raw["loc_delta"] = trans_df_raw.groupby("cc_num") \
    .apply(lambda x : haversine(x["longitude"], x["latitude"])) \
    .reset_index(level=0, drop=True) \
    .fillna(0)

window_len = "4h"
cc_group = trans_df_raw[["cc_num", "amount", "datetime"]].groupby("cc_num").rolling(window_len, on="datetime")

df_4h_mavg = pd.DataFrame(cc_group.mean())
df_4h_mavg.columns = ["trans_volume_mavg", "datetime"]
df_4h_mavg = df_4h_mavg.reset_index(level=["cc_num"])
df_4h_mavg = df_4h_mavg.drop(columns=["cc_num", "datetime"])
df_4h_mavg = df_4h_mavg.sort_index()

df_4h_std = pd.DataFrame(cc_group.mean())
df_4h_std.columns = ["trans_volume_mstd", "datetime"]
df_4h_std = df_4h_std.reset_index(level=["cc_num"])
df_4h_std = df_4h_std.drop(columns=["cc_num", "datetime"])
df_4h_std = df_4h_std.fillna(0)
df_4h_std = df_4h_std.sort_index()

window_aggs_df_raw = df_4h_std.merge(df_4h_mavg,left_index=True, right_index=True)

df_4h_count = pd.DataFrame(cc_group.mean())
df_4h_count.columns = ["trans_freq", "datetime"]
df_4h_count = df_4h_count.reset_index(level=["cc_num"])
df_4h_count = df_4h_count.drop(columns=["cc_num", "datetime"])
df_4h_count = df_4h_count.sort_index()
window_aggs_df_raw = window_aggs_df_raw.merge(df_4h_count,left_index=True, right_index=True)

cc_group = trans_df_raw[["cc_num", "loc_delta", "datetime"]].groupby("cc_num").rolling(window_len, on="datetime").mean()
df_4h_loc_delta_mavg = pd.DataFrame(cc_group)
df_4h_loc_delta_mavg.columns = ["loc_delta_mavg", "datetime"]
df_4h_loc_delta_mavg = df_4h_loc_delta_mavg.reset_index(level=["cc_num"])
df_4h_loc_delta_mavg = df_4h_loc_delta_mavg.drop(columns=["cc_num", "datetime"])
df_4h_loc_delta_mavg = df_4h_loc_delta_mavg.sort_index()
window_aggs_df_raw = window_aggs_df_raw.merge(df_4h_loc_delta_mavg,left_index=True, right_index=True)

window_aggs_df_raw = window_aggs_df_raw.merge(trans_df_raw[["cc_num", "datetime"]].sort_index(),left_index=True, right_index=True)
window_aggs_df_raw.tail()

trans_df_raw.datetime = trans_df_raw.datetime.values.astype(np.int64) // 10 ** 6
window_aggs_df_raw.datetime = window_aggs_df_raw.datetime.values.astype(np.int64) // 10 ** 6

***Feature group creation***

In [None]:
# Feature Groups
trans_fg = fs.get_or_create_feature_group(
    name="transactions_fraud_batch_fg",
    version=1,
    description="Transaction data",
    primary_key=["cc_num"],
    event_time="datetime"
)

_, _ = trans_fg.insert(trans_df_raw)

trans_feature_descriptions = [
    {"name": "tid", "description": "Transaction id"},
    {"name": "datetime", "description": "Transaction time"},
    {"name": "cc_num", "description": "Number of the credit card performing the transaction"},
    {"name": "category", "description": "Expense category"},
    {"name": "amount", "description": "Dollar amount of the transaction"},
    {"name": "latitude", "description": "Transaction location latitude"},
    {"name": "longitude", "description": "Transaction location longitude"},
    {"name": "city", "description": "City in which the transaction was made"},
    {"name": "country", "description": "Country in which the transaction was made"},
    {"name": "fraud_label", "description": "Whether the transaction was fraudulent or not"},
    {"name": "age_at_transaction", "description": "Age of the card holder when the transaction was made"},
    {"name": "days_until_card_expires", "description": "Card validity days left when the transaction was made"},
    {"name": "loc_delta", "description": "Haversine distance between this transaction location and the previous transaction location from the same card"},
]

for dictionary in trans_feature_descriptions:
    trans_fg.update_feature_description(dictionary["name"],
                                        dictionary["description"])

window_aggs_fg = fs.get_or_create_feature_group(
    name=f"transactions_{window_len}_aggs_fraud_batch_fg",
    version=1,
    description=f"Aggregate transaction data over {window_len} windows.",
    primary_key=["cc_num"],
    event_time="datetime"
)

_, _ = window_aggs_fg.insert(window_aggs_df_raw)

window_aggs_feature_descriptions = [
    {"name": "datetime", "description": "Transaction time"},
    {"name": "cc_num", "description": "Number of the credit card performing the transaction"},
    {"name": "loc_delta_mavg", "description": "Moving average of location difference between consecutive transactions from the same card"},
    {"name": "trans_freq", "description": "Moving average of transaction frequency from the same card"},
    {"name": "trans_volume_mavg", "description": "Moving average of transaction volume from the same card"},
    {"name": "trans_volume_mstd", "description": "Moving standard deviation of transaction volume from the same card"},
]

for dictionary in window_aggs_feature_descriptions:
    window_aggs_fg.update_feature_description(dictionary["name"],
                                              dictionary["description"])

window_aggs_fg_fetched = fs.get_or_create_feature_group(name=f"transactions_{window_len}_aggs_fraud_batch_fg",
                                                        version=1,)

# Query
trans_cols_select = ["fraud_label", "category", "amount",
                     "age_at_transaction", "days_until_card_expires",
                     "loc_delta"]
window_aggs_cols_except = ["cc_num"]


query = trans_fg.select(trans_cols_select) \
                .join(window_aggs_fg.select_except(window_aggs_cols_except))

***Feature view creation***

In [None]:
# Transformation functions
min_max_scaler_code_expected = '{\n  "module_imports": "",\n  "transformer_code": "def min_max_scaler(value, min_value, max_value):\\n    if value is None:\\n        return None\\n    elif float(max_value - min_value) == float(0):\\n        return float(0)\\n    else:\\n        return float((value - min_value) / (max_value - min_value))\\n"\n}'
label_encoder_code_expected = '{"module_imports": "", "transformer_code": "# label encoder\\ndef label_encoder(value, value_to_index):\\n    # define a mapping of values to integers\\n    return value_to_index[value]"}'

min_max_scaler = fs.get_transformation_function(name="min_max_scaler")
label_encoder = fs.get_transformation_function(name="label_encoder")

min_max_scaler_dict = min_max_scaler.to_dict()
label_encoder_dict = label_encoder.to_dict()

transformation_functions = {
    "category": label_encoder,
    "amount": min_max_scaler,
    "trans_volume_mavg": min_max_scaler,
    "trans_volume_mstd": min_max_scaler,
    "trans_freq": min_max_scaler,
    "loc_delta": min_max_scaler,
    "loc_delta_mavg": min_max_scaler,
    "age_at_transaction": min_max_scaler,
    "days_until_card_expires": min_max_scaler,
}

# Feature View
query = trans_fg.select(trans_cols_select) \
                .join(window_aggs_fg.select_except(window_aggs_cols_except))

feature_view = fs.get_or_create_feature_view(
        name='fraud_batch_fv',
        version=1,
        query=query,
        labels=["fraud_label"],
        transformation_functions=transformation_functions
    )

***Create training data***

In [None]:
# Data splits
VALIDATION_SIZE = 0.2
TEST_SIZE = 0.1

td_version, td_job = feature_view.create_train_validation_test_split(
    description = 'transactions fraud batch training dataset',
    data_format = 'csv',
    validation_size = VALIDATION_SIZE,
    test_size = TEST_SIZE,
    write_options = {'wait_for_job': True}
)

X_train, X_val, X_test, y_train, y_val, y_test = feature_view.get_train_validation_test_split(1)

# get_batch_data
date_format = "%Y-%m-%d %H:%M:%S"
# Retrieve part of training dataset using event time filter
start_time = int(float(datetime.strptime("2022-03-01 00:00:00", date_format).timestamp()) * 1000)
end_time = int(float(datetime.strptime("2022-03-31 23:59:59", date_format).timestamp()) * 1000)

feature_view.init_batch_scoring(1)

batch_data = feature_view.get_batch_data(start_time=start_time, end_time=end_time)