# Feature Pipeline using Synthetic Data


**Note**: you may get an error when installing hopsworks on Colab, and it is safe to ignore it.

## 🗒️ This notebook is divided in 2 sections:
1. Reading the synthetic credit card data and feature engineeing,
2. Write the Pandas DataFrames to the feature groups in the feature store.


In [1]:
%%capture
!pip install  hopsworks
!pip install -U faker --quiet

In [17]:
%%capture
!pip install confluent_kafka

##After you run the above cell, you should rerun the session and then move on to next steps ❗

In [2]:
!git clone https://github.com/naziherrahel/Software-Development-Technologies.git

Cloning into 'Software-Development-Technologies'...
remote: Enumerating objects: 601, done.[K
remote: Counting objects: 100% (148/148), done.[K
remote: Compressing objects: 100% (94/94), done.[K
remote: Total 601 (delta 105), reused 83 (delta 53), pack-reused 453 (from 2)[K
Receiving objects: 100% (601/601), 14.44 MiB | 18.71 MiB/s, done.
Resolving deltas: 100% (302/302), done.


In [3]:
%cd /content/Software-Development-Technologies/src/02-module

/content/Software-Development-Technologies/src/02-module


In [4]:
!ls


1_backfill_cc_feature_groups.ipynb  2_cc_feature_pipeline.ipynb  scripts  sml  test_sml  titanic


In [5]:
import pandas as pd
import datetime
import hopsworks
from sml import synthetic_data
import random
pd.options.mode.chained_assignment = None

In [6]:
start_time = (datetime.datetime.now() - datetime.timedelta(hours=24)).strftime("%Y-%m-%d %H:%M:%S")
print(start_time)

2025-03-21 12:22:55


In [7]:
#end_time = (datetime.datetime.now() - datetime.timedelta(hours=24)).strftime("%Y-%m-%d %H:%M:%S")
end_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(end_time)

2025-03-22 12:22:56


In [8]:
synthetic_data.FRAUD_RATIO = random.uniform(0.001, 0.005)
synthetic_data.TOTAL_UNIQUE_USERS = 1000
synthetic_data.TOTAL_UNIQUE_TRANSACTIONS = 54000
synthetic_data.CASH_WITHRAWAL_CARDS_TOTAL = 2000
synthetic_data.TOTAL_UNIQUE_CASH_WITHDRAWALS = 200
synthetic_data.START_DATE=start_time
synthetic_data.END_DATE=end_time

credit_cards = synthetic_data.generate_list_credit_card_numbers()
credit_cards_df = synthetic_data.create_credit_cards_as_df(credit_cards)
profiles_df = synthetic_data.create_profiles_as_df(credit_cards)
trans_df = synthetic_data.create_transactions_as_df(credit_cards)

## <span style="color:#ff5f27;"> 🛠️ Feature Engineering </span>

Fraudulent transactions can differ from regular ones in many different ways. Typical red flags would for instance be a large transaction volume/frequency in the span of a few hours. It could also be the case that elderly people in particular are targeted by fraudsters. To facilitate model learning you will create additional features based on these patterns. In particular, you will create two types of features:
1. **Features that aggregate data from different data sources**. This could for instance be the age of a customer at the time of a transaction, which combines the `birthdate` feature from `profiles.csv` with the `datetime` feature from `transactions.csv`.
2. **Features that aggregate data from multiple time steps**. An example of this could be the transaction frequency of a credit card in the span of a few hours, which is computed using a window function.

Let's start with the first category.

In [9]:
fraud_labels = trans_df.copy()[["tid", "cc_num", "datetime", "fraud_label"]]
fraud_labels

Unnamed: 0,tid,cc_num,datetime,fraud_label
0,b817a13bbecaefaa5652dc385fc59ed4,4494402666200582,2025-03-21 12:22:58,0
1,5354a0034a2ab021d6889582e34d8bb6,4911213295316334,2025-03-21 12:22:59,0
2,49bd175a59e4082105246402fcccdc7d,4363516603317171,2025-03-21 12:23:00,0
3,95d5587717fab0a06da38f6850ed3974,4788295948802745,2025-03-21 12:23:02,0
4,349dcb0421bc4febf1100754dd004ba5,4054203902042800,2025-03-21 12:23:05,0
...,...,...,...,...
60081,6329e7202dfe0df9cbcd1d1d1c117946,4894014437714472,2025-04-09 00:35:06,0
60082,bc35f3be61827db1e3381bd260b639cd,4894014437714472,2025-04-12 02:35:06,0
60083,e90d17f62f941ab564c7750ecfe36a43,4894014437714472,2025-04-15 04:35:06,0
60084,5b21e5069796c0308867638a65e9e621,4894014437714472,2025-04-18 06:35:06,0


In [10]:
from sml import cc_features

fraud_labels.datetime = fraud_labels.datetime.map(lambda x: cc_features.date_to_timestamp(x))
fraud_labels

Unnamed: 0,tid,cc_num,datetime,fraud_label
0,b817a13bbecaefaa5652dc385fc59ed4,4494402666200582,1742559778000,0
1,5354a0034a2ab021d6889582e34d8bb6,4911213295316334,1742559779000,0
2,49bd175a59e4082105246402fcccdc7d,4363516603317171,1742559780000,0
3,95d5587717fab0a06da38f6850ed3974,4788295948802745,1742559782000,0
4,349dcb0421bc4febf1100754dd004ba5,4054203902042800,1742559785000,0
...,...,...,...,...
60081,6329e7202dfe0df9cbcd1d1d1c117946,4894014437714472,1744158906000,0
60082,bc35f3be61827db1e3381bd260b639cd,4894014437714472,1744425306000,0
60083,e90d17f62f941ab564c7750ecfe36a43,4894014437714472,1744691706000,0
60084,5b21e5069796c0308867638a65e9e621,4894014437714472,1744958106000,0


In [11]:
trans_df

Unnamed: 0,tid,datetime,cc_num,category,amount,latitude,longitude,city,country,fraud_label
0,b817a13bbecaefaa5652dc385fc59ed4,2025-03-21 12:22:58,4494402666200582,Electronics,456.97,36.025060,-86.779170,Brentwood Estates,US,0
1,5354a0034a2ab021d6889582e34d8bb6,2025-03-21 12:22:59,4911213295316334,Grocery,71.93,33.410120,-91.061770,Greenville,US,0
2,49bd175a59e4082105246402fcccdc7d,2025-03-21 12:23:00,4363516603317171,Restaurant/Cafeteria,10.87,33.036990,-117.291980,Encinitas,US,0
3,95d5587717fab0a06da38f6850ed3974,2025-03-21 12:23:02,4788295948802745,Grocery,79.64,29.845760,-90.106740,Estelle,US,0
4,349dcb0421bc4febf1100754dd004ba5,2025-03-21 12:23:05,4054203902042800,Holliday/Travel,43.42,40.605380,-73.755130,Far Rockaway,US,0
...,...,...,...,...,...,...,...,...,...,...
60081,6329e7202dfe0df9cbcd1d1d1c117946,2025-04-09 00:35:06,4894014437714472,Cash Withdrawal,62.36,34.239010,-119.044274,Camarillo,US,0
60082,bc35f3be61827db1e3381bd260b639cd,2025-04-12 02:35:06,4894014437714472,Cash Withdrawal,79.94,34.248261,-119.041913,Camarillo,US,0
60083,e90d17f62f941ab564c7750ecfe36a43,2025-04-15 04:35:06,4894014437714472,Cash Withdrawal,2.69,34.243697,-119.033705,Camarillo,US,0
60084,5b21e5069796c0308867638a65e9e621,2025-04-18 06:35:06,4894014437714472,Cash Withdrawal,16.80,34.234965,-119.040798,Camarillo,US,0


In [12]:
trans_df.drop(['fraud_label'], inplace = True, axis=1)

In [13]:
trans_df = cc_features.card_owner_age(trans_df, profiles_df)
trans_df = cc_features.expiry_days(trans_df, credit_cards_df)
trans_df = cc_features.activity_level(trans_df, 1)



In [14]:
window_len = 4
window_aggs_df = cc_features.aggregate_activity_by_hour(trans_df, window_len)

Next, you will create features that for each credit card aggregate data from multiple time steps.

Yoy will start by computing the distance between consecutive transactions, lets call it `loc_delta`.
Here you will use the [Haversine distance](https://scikit-learn.org/stable/modules/generated/sklearn.metrics.pairwise.haversine_distances.html?highlight=haversine#sklearn.metrics.pairwise.haversine_distances) to quantify the distance between two longitude and latitude coordinates.

Next lets compute windowed aggregates. Here you will use 4-hour windows, but feel free to experiment with different window lengths by setting `window_len` below to a value of your choice.

In [15]:
project = hopsworks.login()
fs = project.get_feature_store()


Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1213684


To create a feature group you need to give it a name and specify a primary key. It is also good to provide a description of the contents of the feature group and a version number, if it is not defined it will automatically be incremented to `1`.

In [16]:
trans_fg = fs.get_feature_group(name="cc_trans_fraud", version=2)
trans_fg.insert(trans_df, write_options={"wait_for_job" : False})

Uploading Dataframe: 100.00% |██████████| Rows 60086/60086 | Elapsed Time: 00:13 | Remaining Time: 00:00


Launching job: cc_trans_fraud_2_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1213684/jobs/named/cc_trans_fraud_2_offline_fg_materialization/executions


(Job('cc_trans_fraud_2_offline_fg_materialization', 'SPARK'), None)

In [17]:
window_aggs_fg = fs.get_feature_group(name=f"cc_trans_fraud_{window_len}h", version=2)
window_aggs_fg.insert(window_aggs_df, write_options={"wait_for_job" : False})

Uploading Dataframe: 100.00% |██████████| Rows 60086/60086 | Elapsed Time: 00:07 | Remaining Time: 00:00


Launching job: cc_trans_fraud_4h_2_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1213684/jobs/named/cc_trans_fraud_4h_2_offline_fg_materialization/executions


(Job('cc_trans_fraud_4h_2_offline_fg_materialization', 'SPARK'), None)

In [18]:

labels_fg = fs.get_feature_group(name="transactions_fraud_label", version=2)
labels_fg.insert(fraud_labels)

Uploading Dataframe: 100.00% |██████████| Rows 60086/60086 | Elapsed Time: 00:07 | Remaining Time: 00:00


Launching job: transactions_fraud_label_2_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1213684/jobs/named/transactions_fraud_label_2_offline_fg_materialization/executions


(Job('transactions_fraud_label_2_offline_fg_materialization', 'SPARK'), None)