In [1]:
spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log
22,application_1609605496842_0004,pyspark,idle,Link,Link


SparkSession available as 'spark'.
<pyspark.sql.session.SparkSession object at 0x7f2c9e97f410>

## Import modules

In [2]:
import hashlib
from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType, StringType
import hsfs
from hops import hdfs

## Define utility functions

In [3]:
def action_2_code(input_str):
    x = input_str.split("-")[0]
    if (x == "CASH_IN"):
        node_type = 0
    elif (x == "CASH_OUT"):
        node_type = 1
    elif (x == "DEBIT"):
        node_type = 2
    elif (x == "PAYMENT"):
        node_type = 3
    elif (x == "TRANSFER"):
        node_type = 4
    elif (x == "DEPOSIT"):
        node_type = 4        
    else:
        node_type = 99
    return node_type

def timestamp_2_time(x):
    dt_obj = datetime.strptime(str(x), '%Y-%m-%d %H:%M:%S')
    return dt_obj.strftime("%b-%d") 

action_2_code_udf = F.udf(action_2_code)
timestamp_2_time_udf = F.udf(timestamp_2_time)

## Create a connection to Hopsworks feature store (hsfs)

In [4]:
# Create a connection
connection = hsfs.connection()
# Get the feature store handle for the project's feature store
fs = connection.get_feature_store()

Connected. Call `.close()` to terminate connection gracefully.

## Load transactions datasets as spark dataframe and perform feature engineering

In [5]:
transactions_df = spark.read\
             .option("inferSchema", "true")\
             .option("header", "true")\
             .format("csv")\
             .load("hdfs:///Projects/{}/Resources/transactions.csv".format(hdfs.project_name()))

In [6]:
transactions_df.show()

+-------+----------------+--------+-------------------+------+--------+--------+--------+
|tran_id|         tx_type|base_amt|     tran_timestamp|is_sar|alert_id|     src|     dst|
+-------+----------------+--------+-------------------+------+--------+--------+--------+
|    496| TRANSFER-FanOut|  858.77|2020-01-01 00:00:00| false|      -1|3aa9646b|1e46e726|
|   1342| TRANSFER-Mutual|  386.86|2020-01-01 00:00:00| false|      -1|49203bc3|a74d1101|
|   1580| TRANSFER-FanOut|  616.43|2020-01-02 00:00:00| false|      -1|616d4505|99af2455|
|   2866| TRANSFER-FanOut|  146.44|2020-01-02 00:00:00| false|      -1|39be1ea2|e7ec7bdb|
|   3997| TRANSFER-Mutual|  439.09|2020-01-03 00:00:00| false|      -1|e2e0d938|afc399a9|
|   5518| TRANSFER-FanOut|   361.0|2020-01-04 00:00:00| false|      -1|75c9a805|d7a317f6|
|   7340| TRANSFER-Mutual|  768.98|2020-01-06 00:00:00| false|      -1|c14f4989|733a496b|
|   9376| TRANSFER-FanOut|   943.4|2020-01-07 00:00:00| false|      -1|576eb672|aa49b0eb|
|  10362|T

In [7]:
transactions_df = transactions_df.withColumn('tx_type', action_2_code_udf(F.col('tx_type')))\
                                 .withColumn('tran_timestamp', timestamp_2_time_udf(F.col('tran_timestamp')).cast(StringType()))\
                                 .withColumnRenamed("orig_acct","source")\
                                 .withColumnRenamed("bene_acct","target")\
                                 .select("src","dst","tran_timestamp","tran_id","tx_type","base_amt")
transactions_df.show()

+--------+--------+--------------+-------+-------+--------+
|     src|     dst|tran_timestamp|tran_id|tx_type|base_amt|
+--------+--------+--------------+-------+-------+--------+
|3aa9646b|1e46e726|        Jan-01|    496|      4|  858.77|
|49203bc3|a74d1101|        Jan-01|   1342|      4|  386.86|
|616d4505|99af2455|        Jan-02|   1580|      4|  616.43|
|39be1ea2|e7ec7bdb|        Jan-02|   2866|      4|  146.44|
|e2e0d938|afc399a9|        Jan-03|   3997|      4|  439.09|
|75c9a805|d7a317f6|        Jan-04|   5518|      4|   361.0|
|c14f4989|733a496b|        Jan-06|   7340|      4|  768.98|
|576eb672|aa49b0eb|        Jan-07|   9376|      4|   943.4|
|847a9cf6|b070a6bb|        Jan-08|  10362|      4|   668.3|
|12a388ff|586377aa|        Jan-08|  10817|      4|  139.84|
|b36f9c84|1b467848|        Jan-08|  11317|      4|  499.47|
|362e42e0|385afb8b|        Jan-09|  11748|      4|  357.96|
|572014da|acd60eca|        Jan-10|  13285|      4|   630.9|
|5ff2d9a7|31976e38|        Jan-11|  1483

## Create transactions feature group metadata and save it in to hsfs

In [None]:
transactions_fg = fs.create_feature_group(name="transactions_fg",
                                       version=1,
                                       primary_key=["tran_id"],
#                                       partition_key=["tran_timestamp"],   
                                       description="transactions features",
                                       time_travel_format=None,                                        
                                       statistics_config=False)
transactions_fg.save(transactions_df)

## Load alert transactions datasets as spark dataframe and perform feature engineering

In [None]:
alert_transactions = spark.read\
             .option("inferSchema", "true")\
             .option("header", "true")\
             .format("csv")\
             .load("hdfs:///Projects/{}/Resources/alert_transactions.csv".format(hdfs.project_name()))
alert_transactions.show()

In [None]:
alert_transactions = alert_transactions.select("alert_id","alert_type","is_sar","tran_id").orderBy("tran_id")
alert_transactions.show()

## Create alert transactions feature group metadata and save it in to hsfs 

In [None]:
alert_transactions_fg = fs.create_feature_group(name="alert_transactions_fg",
                                       version=1,
                                       primary_key=["tran_id"],
#                                       partition_key=["alert_type"],         
                                       description="alert transactions",
                                       time_travel_format=None,                                        
                                       statistics_config=False)
alert_transactions_fg.save(alert_transactions_fg)