In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('FraudDetectionFeatures').getOrCreate()

In [0]:
df = spark.read.csv('/Volumes/banking/banking/banking_data/bank_transactions_data_2.csv', header=True, inferSchema=True)

In [0]:
display(df)

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [0]:
df = df.withColumn('TransactionDate', col('TransactionDate').cast('Timestamp'))
df = df.withColumn('TransactionDate_insecs', unix_timestamp(col('TransactionDate')))

In [0]:
window_1h = Window.partitionBy('AccountID').orderBy('TransactionDate_insecs').rangeBetween(-3600, -1)
window_24h = Window.partitionBy('AccountID').orderBy('TransactionDate_insecs').rangeBetween(-86400, -1)
df = df.withColumn('txn_count_1h', 
                   count('TransactionID').over(window_1h)).withColumn('txn_amount_sum_1h', sum('TransactionAmount').over(window_1h)).withColumn('txn_count_24h', count('TransactionID').over(window_24h)).withColumn('txn_amount_sum_24h', sum('TransactionAmount').over(window_24h))


In [0]:
display(df)

In [0]:
df = df.withColumn('account_avg_amount', avg('TransactionAmount').over(Window.partitionBy('AccountID'))).withColumn('amount_deviation_from_avg', round(col('TransactionAmount')-col('account_avg_amount'), 2))

In [0]:
df = df.fillna(0, subset=['txn_count_1h', 'txn_amount_sum_1h', 'txn_count_24h','txn_amount_sum_24h'])

In [0]:
display(df)

In [0]:
history_window = Window.partitionBy('AccountID').orderBy('TransactionDate_insecs').rowsBetween(Window.unboundedPreceding, -1)
df = df.withColumn('historic_devices', collect_list('DeviceID').over(history_window))

In [0]:
df = df.withColumn('is_new_device', when(size(col('historic_devices')) == 0, lit(1)).otherwise(when(col('DeviceID').isin('historic_devices'), lit(1)).otherwise(lit(0))))

In [0]:
df = df.withColumn('historic_locations', collect_list('Location').over(history_window))
df = df.withColumn('is_new_location', when(size(col('historic_locations')) == 0, lit(1)).otherwise(when(col('Location').isin('historic_locations'), lit(1)).otherwise(lit(0))))

df = df.drop('historic_devices', 'historic_locations')

In [0]:
display(df)

In [0]:
high_login_attempts = 3
df = df.withColumn('high_login_attempts_flag', when(col('LoginAttempts') >= high_login_attempts, lit(1)).otherwise(lit(0)))

In [None]:
final_features_df = df.select("TransactionID", "AccountID", "TransactionDate",
    "txn_count_1h", "txn_amount_sum_1h", "txn_count_24h", "txn_amount_sum_24h", 
    "amount_deviation_from_avg",
    "is_new_device", "is_new_location",
    "high_login_attempts_flag")

In [None]:
display(final_features_df)