<img src="https://github.com/rjpost20/Anomalous-Bank-Transactions-Detection-Project/blob/main/data/AdobeStock_319163865.jpeg?raw=true">
Image by <a href="https://stock.adobe.com/contributor/200768506/andsus?load_type=author&prev_url=detail" >AndSus</a> on Adobe Stock

# Phase 5 Project: *Detecting Anomalous Financial Transactions*

## Notebook 1: Intro, EDA and Preprocessing

### By Ryan Posternak

Flatiron School, Full-Time Live NYC<br>
Project Presentation Date: August 25th, 2022<br>
Instructor: Joseph Mata

## Goal: 

*This is a project for learning purposes. The *** is not involved with this project in any way.*

<br>

# Overview and Business Understanding

<br>

# Data Understanding

<br>

# Imports, Reading in Data, and Exploratory Data Analysis

### Google colab compatibility downloads

In [1]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz 
!tar xf spark-3.3.0-bin-hadoop3.tgz
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.0-bin-hadoop3"
!pip install pyspark==3.3.0
!pip install -q findspark
import findspark
findspark.init()

[33m0% [Working][0m            Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
[33m0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com (185.1[0m[33m0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com (185.1[0m[33m0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.91.39)][0m                                                                               Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
[33m0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Waiting for headers] [Wait[0m                                                                               Hit:3 http://archive.ubuntu.com/ubuntu bionic InRelease
[33m0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Waiting for headers] [Wait[0m                                                                               Get:4 https://developer.downlo

In [43]:
from google.colab import drive, files
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


### Import libraries, packages and modules

In [3]:
import numpy as np
import pandas as pd
pd.set_option('display.max_columns', None)
from itertools import chain
import os

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, IntegerType, DoubleType, TimestampType
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report

import matplotlib.pyplot as plt
plt.style.use('seaborn')
import seaborn as sns
# import matplotlib_inline.backend_inline
# matplotlib_inline.backend_inline.set_matplotlib_formats('retina')
from IPython.display import HTML, display
%matplotlib inline

In [4]:
helper_functions = files.upload()
from helper_functions import spark_resample

Saving helper_functions.py to helper_functions.py


In [None]:
# Check colab GPU info

gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
    print('Not connected to a GPU')
else:
    print(gpu_info)

In [None]:
# Set text to wrap in Google colab notebook

def set_css():
    display(HTML('''
    <style>
      pre {
          white-space: pre-wrap;
      }
    </style>
    '''))
get_ipython().events.register('pre_run_cell', set_css)

In [5]:
# Initialize Spark Session

# spark = SparkSession.builder.master('local[*]').getOrCreate()
spark = SparkSession.builder\
        .master("local[*]")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

spark.version

'3.3.0'

### Description of Features

**Dataset 1 – Transactions:**

`MessageId` - Globally unique identifier within this dataset for individual transactions<br>
`UETR` - The Unique End-to-end Transaction Reference—a 36-character string enabling traceability of all individual transactions associated with a single end-to-end transaction<br>
`TransactionReference` - Unique identifier for an individual transaction<br>
`Timestamp` - Time at which the individual transaction was initiated<br>
`Sender` - Institution (bank) initiating/sending the individual transaction<br>
`Receiver` - Institution (bank) receiving the individual transaction<br>
`OrderingAccount` - Account identifier for the originating ordering entity (individual or organization) for end-to-end transaction<br>
`OrderingName` - Name for the originating ordering entity<br>
`OrderingStreet` - Street address for the originating ordering entity<br>
`OrderingCountryCityZip` - Remaining address details for the originating ordering entity<br>
`BeneficiaryAccount` - Account identifier for the final beneficiary entity (individual or organization) for end-to-end transaction<br>
`BeneficiaryName` - Name for the final beneficiary entity<br>
`BeneficiaryStreet` - Street address for the final beneficiary entity<br>
`BeneficiaryCountryCityZip` - Remaining address details for the final beneficiary entity<br>
`SettlementDate` - Date the individual transaction was settled<br>
`SettlementCurrency` - Currency used for transaction<br>
`SettlementAmount` - Value of the transaction net of fees/transfer charges/forex<br>
`InstructedCurrency` - Currency of the individual transaction as instructed to be paid by the Sender<br>
`InstructedAmount` - Value of the individual transaction as instructed to be paid by the Sender<br>
`Label` - Boolean indicator of whether the transaction is anomalous or not. This is the target variable for the prediction task.<br>
<br>
**Dataset 2 – Banks:**

`Bank` - Identifier for the bank<br>
`Account` - Identifier for the account<br>
`Name` - Name of the account<br>
`Street` - Street address associated with the account<br>
`CountryCityZip` - Remaining address details associated with the account<br>
`Flags` - Enumerated data type indicating potential issues or special features that have been associated with an account. Flag definitions are below:<br>
00 - No flags<br>
01 - Account closed<br>
03 - Account recently opened<br>
04 - Name mismatch<br>
05 - Account under monitoring<br>
06 - Account suspended<br>
07 - Account frozen<br>
08 - Non-transaction account<br>
09 - Beneficiary deceased<br>
10 - Invalid company ID<br>
11 - Invalid individual ID<br>

### Read in Data

In [6]:
# Read in transactions training and testing data csv files to Spark DataFrames - Colab
train_df = spark.read.csv('/content/drive/MyDrive/Colab Notebooks/transaction_train_dataset.csv', header=True, inferSchema=True)
test_df = spark.read.csv('/content/drive/MyDrive/Colab Notebooks/transaction_test_dataset.csv', header=True, inferSchema=True)

# Read in transactions training and testing data csv files to Spark DataFrames - Jupyter
# train_df = spark.read.csv('data/transaction_train_dataset.csv', header=True, inferSchema=True)
# test_df = spark.read.csv('data/transaction_test_dataset.csv', header=True, inferSchema=True)

# Read in banks data csv file to a Spark DataFrame
# banks_df = spark.read.csv('/content/drive/MyDrive/Colab Notebooks/bank_dataset.csv', header=True, inferSchema=True)

# Persist in memory
# train_df = train_df.cache()
# test_df = test_df.cache()
# banks_df = banks_df.cache()

### Initial EDA

In [None]:
# Print shape of dataframes
print(f"train_df:  {train_df.count()} Rows, {len(train_df.columns)} Columns")
print(f"test_df:  {test_df.count()} Rows, {len(test_df.columns)} Columns")
# print(f"banks_df:  {banks_df.count():,} Rows, {len(banks_df.columns)} Columns")

In [None]:
# Print schema of dataframe
train_df.printSchema()

In [None]:
# banks_df.printSchema()

In [None]:
# Display first row of train_df
train_df.show(n=1, vertical=True, truncate=False)

In [None]:
# Display first 5 rows of banks dataframe
# banks_df.show(n=5, truncate=False)

In [None]:
# Print number of null/missing values in each column of train_df
train_df_null = train_df.select([F.count(F.when(F.col(c).isNull() | F.isnan(c), c))\
                                 .alias(c) for c in train_df.columns if c != 'Timestamp'])

print('Number of null/missing values per column:\n')
train_df_null.show(vertical=True, truncate=False)

In [None]:
# # Print number of null/missing values in each column of banks_df
# banks_df_null = banks_df.select([F.count(F.when(F.col(c).isNull() | F.isnan(c), c))\
#                                  .alias(c) for c in banks_df.columns])

# print('Number of null/missing values per column:\n')
# banks_df_null.show(vertical=True, truncate=False)

In [None]:
# Print number of unique values in each column of train_df; sample 1% of df for efficiency
train_df_unique = train_df.sample(False, 0.01).agg(*(F.countDistinct(F.col(c)) for c in train_df.columns))

print(f"Number of unique values per column (in sample of 1% of dataframe):\n")
train_df_unique.show(vertical=True, truncate=False)

In [None]:
# Print number of unique values in each column in banks_df; sample 10% of df for efficiency
# banks_df_unique = banks_df.sample(False, 0.1).agg(*(F.countDistinct(F.col(c)) for c in banks_df.columns))

# print(f"Number of unique values per column (in sample of 10% of dataframe):\n")
# banks_df_unique.show(vertical=True, truncate=False)

In [None]:
# Display Pandas style summary statistics table of numeric columns in train_df
num_cols = [item[0] for item in train_df.dtypes if item[1] == 'int' or item[1] == 'double']

train_df.select(num_cols).summary().show(truncate=False)


In [None]:
# Display value counts for 'Label' column (classification target) in train_df
class_counts = train_df.groupBy('Label').count().withColumn('percent', F.col('count')/train_df.count())

class_counts.show(truncate=10)

**Remarks:**
- It looks like this is an extremely imbalanced dataset - only about 0.1% of the data is in the positive class. We will need to address this class imbalance as part of the modeling process.

<br>

## Detailed EDA

In [None]:
# Sample 2% of train_df for visualizations (approximately 94k observations)
viz_df = train_df.sample(withReplacement=False, fraction=0.02, seed=42).toPandas()

### Visualize target class distributions of sender and receiver banks used in transactions

In [None]:
# Display unique senders in training dataset
print(f"train_df, {train_df.select('Sender').distinct().count()} unique senders:")
train_df.select('Sender').distinct().show(5)

In [None]:
# Define figure and axes
fig, (ax1, ax2) = plt.subplots(nrows=1, ncols=2, figsize=(22, 14))

# Plot countplot of non-anomalous transactions
sns.countplot(y='Sender', data=viz_df[viz_df.Label == 0], ax=ax1, palette='muted', 
              order=viz_df[viz_df.Label == 0]['Sender'].value_counts().index) # Order descending

# Set color palette to match values in y-axis above
ax2_palette = {'DPSUFRPP': '#4878D0', 'WVOLDEMM': '#EE854A', 'ZOUOGB22': '#6ACC64', 'ABVVUS6S': '#956CB4'}

# Plot countplot of anomalous transactions
sns.countplot(y='Sender', data=viz_df[viz_df.Label == 1], ax=ax2, palette=ax2_palette, 
              order=viz_df[viz_df.Label == 1]['Sender'].value_counts().index) # Order descending

ax1.set_title('Sender Banks of Non-Anomalous Transactions (Label 0)', fontsize=16)
ax1.set_xlabel('Count', fontsize=14)
ax1.set_ylabel('Institution (Bank)', fontsize=14)
ax2.set_title('Sender Banks of Anomalous Transactions (Label 1)', fontsize=16)
ax2.set_xlabel('Count', fontsize=14)
ax2.set_ylabel('Institution (Bank)', fontsize=14);

In [None]:
# Display unique receivers in training dataset
print(f"train_df, {train_df.select('Receiver').distinct().count()} unique receivers:")
train_df.select('Receiver').distinct().show(5)

In [None]:
# Define figure and axes
fig, (ax1, ax2) = plt.subplots(nrows=1, ncols=2, figsize=(22, 14))

# Set consistent colors across y-axis values
palette = sns.color_palette('muted', as_cmap=True)*2
palette_map = {val: color for val, color in zip(viz_df[viz_df.Label == 0]['Receiver'].value_counts().index, palette)}

# Plot countplot of non-anomalous transactions
ax1_plot = sns.countplot(y='Receiver', data=viz_df[viz_df.Label == 0], ax=ax1, palette=palette_map, 
              order=viz_df[viz_df.Label == 0]['Receiver'].value_counts().index)  # Order descending

# Update palette_map with values not found above
for val, color in zip(viz_df[viz_df.Label == 1]['Receiver'].value_counts().index, palette):
    if val not in palette_map:
        palette_map[val] = 'silver'  # Assign values not found above to silver


# Plot countplot of anomalous transactions
ax2_plot = sns.countplot(y='Receiver', data=viz_df[viz_df.Label == 1], ax=ax2, palette=palette_map, 
              order=viz_df[viz_df.Label == 1]['Receiver'].value_counts().index)  # Order descending

ax1.set_title('Receiver Banks of Non-Anomalous Transactions (Label 0)', fontsize=16)
ax1.set_xlabel('Count', fontsize=14)
ax1.set_ylabel('Institution (Bank)', fontsize=14)
ax2.set_title('Receiver Banks of Anomalous Transactions (Label 1)', fontsize=16)
ax2.set_xlabel('Count', fontsize=14)
ax2.set_ylabel('Institution (Bank)', fontsize=14);

**Remarks:**
- It looks like the choice of sender bank is very informative in terms of determining whether a transaction is anomalous or not, while the choice of receiver bank is not nearly as valuable.
- Only 4 out of 16 sender banks tend to be utilized in anomalous transactions, while nearly all are utilized in non-anomalous transactions.
- Looking at receiver banks, 12 out of 16 tend to be utilized for both anomalous and non-anomalous transactions, and in roughly equal distributions.
- There is no need to choose between sender and receiver banks when selecting our features; we can engineer features in sender-receiver bank combinations.

## Visualize target class distributions of instructed and settlement currencies used in transactions

In [None]:
# Display unique instructed currencies used in transactions
print(f"train_df, {train_df.select('InstructedCurrency').distinct().count()} unique instructed currencies:")
train_df.select('InstructedCurrency').distinct().show()

In [None]:
# Define figure and axes
fig, (ax1, ax2) = plt.subplots(nrows=1, ncols=2, figsize=(22, 7))

# Set consistent colors across x-axis values
ax1_palette = {'USD': 'dodgerblue', 'EUR': '#003399', 'GBP':'#C8102E', 'JPY': 'tan'}
ax2_palette = {'USD': 'dodgerblue', 'EUR': '#003399', 'GBP':'#C8102E', 'JPY': 'tan', \
               'AUD': 'lightseagreen', 'CAD': 'gray', 'NZD': 'plum', 'INR': '#FF9933'}

# Plot countplot of non-anomalous transactions
sns.countplot(x='InstructedCurrency', data=viz_df[viz_df.Label == 0], ax=ax1, 
              order=viz_df[viz_df.Label == 0]['InstructedCurrency'].value_counts().index,  # Order descending
              palette=ax1_palette)

# Plot countplot of anomalous transactions
sns.countplot(x='InstructedCurrency', data=viz_df[viz_df.Label == 1], ax=ax2, 
              order=viz_df[viz_df.Label == 1]['InstructedCurrency'].value_counts().index,  # Order descending
              palette=ax2_palette)

# Print percentages on top of bars (ax1)
for p in ax1.patches:
    txt = str(round(p.get_height() / viz_df[viz_df.Label == 0].shape[0]*100, 1)) + '%'
    txt_x = p.get_x()+0.31
    txt_y = p.get_height()+400
    ax1.text(txt_x,txt_y,txt)

# Print percentages on top of bars (ax2)
for p in ax2.patches:
    txt = str(round(p.get_height() / viz_df[viz_df.Label == 1].shape[0]*100, 1)) + '%'
    txt_x = p.get_x()+0.25
    txt_y = p.get_height()+0.5
    ax2.text(txt_x,txt_y,txt)

ax1.set_title('Instructed Currencies of Non-Anomalous Transactions (Label 0)', fontsize=16)
ax1.set_xlabel('Instructed Currency', fontsize=14)
ax1.set_ylabel('Count', fontsize=14)
ax2.set_title('Instructed Currencies of Anomalous Transactions (Label 1)', fontsize=16)
ax2.set_xlabel('Instructed Currency', fontsize=14)
ax2.set_ylabel('Count', fontsize=14);

In [None]:
# Display unique settlement currencies used in transactions
print(f"train_df, {train_df.select('SettlementCurrency').distinct().count()} unique settlement currencies:")
train_df.select('SettlementCurrency').distinct().show()

In [None]:
# Define figure and axes
fig, (ax1, ax2) = plt.subplots(nrows=1, ncols=2, figsize=(22, 7))

# Set consistent color palettes
ax1_palette = {'USD': 'dodgerblue', 'EUR': '#003399', 'GBP':'#C8102E', 'JPY': 'tan'}
ax2_palette = {'USD': 'dodgerblue', 'EUR': '#003399', 'GBP':'#C8102E', 'JPY': 'tan', \
               'AUD': 'lightseagreen', 'CAD': 'gray', 'NZD': 'plum', 'INR': '#FF9933'}

# Plot countplot of non-anomalous transactions
sns.countplot(x='SettlementCurrency', data=viz_df[viz_df.Label == 0], ax=ax1, 
              order=viz_df[viz_df.Label == 0]['SettlementCurrency'].value_counts().index, # Order descending
              palette=ax1_palette)

# Plot countplot of anomalous transactions
sns.countplot(x='SettlementCurrency', data=viz_df[viz_df.Label == 1], ax=ax2, 
              order=viz_df[viz_df.Label == 1]['SettlementCurrency'].value_counts().index, # Order descending
              palette=ax2_palette)

# Print percentages on top of bars (ax1)
for p in ax1.patches:
    txt = str(round(p.get_height() / viz_df[viz_df.Label == 0].shape[0]*100, 1)) + '%'
    txt_x = p.get_x()+0.31
    txt_y = p.get_height()+400
    ax1.text(txt_x,txt_y,txt)

# Print percentages on top of bars (ax2)
for p in ax2.patches:
    txt = str(round(p.get_height() / viz_df[viz_df.Label == 1].shape[0]*100, 1)) + '%'
    txt_x = p.get_x()+0.31
    txt_y = p.get_height()+0.5
    ax2.text(txt_x,txt_y,txt)

ax1.set_title('Settlement Currencies of Non-Anomalous Transactions (Label 0)', fontsize=16)
ax1.set_xlabel('Settlement Currency', fontsize=14)
ax1.set_ylabel('Count', fontsize=14)
ax2.set_title('Settlement Currencies of Anomalous Transactions (Label 1)', fontsize=16)
ax2.set_xlabel('Settlement Currency', fontsize=14)
ax2.set_ylabel('Count', fontsize=14);

**Remarks:**
- Instructed currencies seems to be more informative in terms of being correlated with whether or not a transaction is anomalous.
- Among instructed currencies, we see the opposite trend as we saw with chosen banks; anomalous transactions tend to use a broader selection of instructed currencies, rather than a more narrow selection as we saw with chosen sender banks.
- Among settlement currencies, we see the same four currencies being utilized among both target classes, but in slightly different frequencies.
- We will keep the instructed currencies (and one hot encode them) as a feature in the final dataset and drop the settlement currencies.

<br>

# Preprocessing & Feature Engineering

Steps:
1. Drop duplicate transaction rows
2. Train/test split
3. Create `SenderHourFreq` feature
4. Create `SenderCurrencyFreq` and `SenderCurrencyAmtAvg` features
5. Create `SenderReceiverFreq` feature

In [None]:
# # Create temporary tables for join
# train_df.createOrReplaceTempView('train_df_sql')
# banks_df.createOrReplaceTempView('banks_df_sql')


# # SQL to join dataframes
# join_sql =  """SELECT train_df_sql.*, banks_df_sql.Flags AS OrderingAccFlags
#             FROM train_df_sql 
#             LEFT JOIN banks_df_sql 
#             ON train_df_sql.OrderingAccount = banks_df_sql.Account
#             """
# # Perform SQL join
# joined_df = spark.sql(join_sql)

In [None]:
# Print shape of joined dataframe
# print(f"{joined_df.count():,} Rows, {len(joined_df.columns)} Columns")

### Drop intermediary transactions (only keep one row per end-to-end transaction)

In [None]:
# Print count of unique transactions (as identified by UETR codes)
print('train_df:')
train_df.select(F.countDistinct('UETR')).show()
print('test_df:')
test_df.select(F.countDistinct('UETR')).show()

In [None]:
# Print total number of combined rows with duplicate UETR values (meaning sender routed transaction through one or more intermediary banks)
print('Total number of rows with intermediary transactions in train_df:')
train_df.select('UETR').groupBy('UETR')\
    .count()\
    .where(F.col('count') > 1)\
    .select(F.sum('count'))\
    .show()

In [7]:
# Drop rows with duplicate UETR codes, keeping the first occurence (sorted by Timestamp)
train_df = train_df.orderBy('Timestamp').coalesce(1).dropDuplicates(subset = ['UETR'])
test_df = test_df.orderBy('Timestamp').coalesce(1).dropDuplicates(subset = ['UETR'])

# Ensure no duplicates
assert train_df.groupBy(train_df.UETR).count().where(F.col('count') > 1).count() == 0
assert test_df.groupBy(test_df.UETR).count().where(F.col('count') > 1).count() == 0

print(f"train_df: {train_df.count()} rows")
print(f"test_df: {test_df.count()} rows")

In [None]:
# Show value counts for 'Label' column (classification target) in new train and test dataframes
class_counts_train = train_df.groupBy('Label').count().withColumn('percent', F.col('count')/train_df.count())
class_counts_test = test_df.groupBy('Label').count().withColumn('percent', F.col('count')/test_df.count())

print('train_df:')
class_counts_train.show(truncate=10)
print('test_df:')
class_counts_test.show(truncate=10)

## Feature Engineering

### Create `SenderHourFreq` feature: transaction hour frequency for each sender

This feature will tell us the frequency with which each sender initiated transactions for each hour of the day. This should capture the signal of the correlation between the sender and target class as well as the correlation between transaction hour and target class.

In [8]:
# Define UDF to extract hour from timestamp
hour = F.udf(lambda x: x.hour, IntegerType())

# Create new column of transaction hours
train_df = train_df.withColumn('Hour', hour(train_df.Timestamp))
test_df = test_df.withColumn('Hour', hour(test_df.Timestamp))

# Create list of unique senders
senders = train_df.select('Sender').toPandas()['Sender'].unique()

# Create column of senders concatenated with hours
train_df = train_df.withColumn('SenderHour', F.concat(F.col('Sender'), F.col('Hour').cast(StringType())))
test_df = test_df.withColumn('SenderHour', F.concat(F.col('Sender'), F.col('Hour').cast(StringType())))

pd_df = train_df.select('Sender', 'Hour').toPandas()

# Create dictionary of sender hour frequency values to map from sender hour values
sender_hour_frequency = {}
for sender in senders:
    sender_rows = pd_df[pd_df['Sender'] == sender]
    for hour in range(24):
        sender_hour_frequency[sender + str(hour)] = len(sender_rows[sender_rows['Hour'] == hour])

# Create new column in train and test dataframes with sender_hour_frequency dictionary
mapping_expr = F.create_map([F.lit(x) for x in chain(*sender_hour_frequency.items())])

train_df = train_df.withColumn('SenderHourFreq', mapping_expr[F.col('SenderHour')])
test_df = test_df.withColumn('SenderHourFreq', mapping_expr[F.col('SenderHour')])

### Create `SenderCurrencyFreq` and `SenderCurrencyAmtAvg` features: transaction currency frequency and average transaction amount per currency for each sender

These features will tell us the frequency with which each sender initiated transactions for each currency, in the case of the first feature. For the second feature, it will tell us the average amount with which each sender sent each currency. These features may also be correlated with anomalous transactions.

In [9]:
# Create column of senders concatenated with instructed currencies
train_df = train_df.withColumn('SenderCurrency', F.concat(F.col('Sender'), F.col('InstructedCurrency')))
test_df = test_df.withColumn('SenderCurrency', F.concat(F.col('Sender'), F.col('InstructedCurrency')))

pd_train_df = train_df.select('SenderCurrency', 'InstructedAmount').toPandas()
pd_test_df = test_df.select('SenderCurrency', 'InstructedAmount').toPandas()

# Create dictionary of sender currency frequency values to map from sender currency values
sender_currency_freq = {}
# Create dictionary of average sender currency values to map from sender currency values
sender_currency_avg = {}

for sc in set(
    list(pd_train_df['SenderCurrency'].unique()) + list(pd_test_df['SenderCurrency'].unique())
):
    sender_currency_freq[sc] = len(pd_train_df[pd_train_df['SenderCurrency'] == sc])
    sender_currency_avg[sc] = pd_train_df[pd_train_df['SenderCurrency'] == sc][
        "InstructedAmount"
    ].mean()

# Create new column in train and test dataframes with sender_currency_freq dictionary
mapping_expr = F.create_map([F.lit(x) for x in chain(*sender_currency_freq.items())])

train_df = train_df.withColumn('SenderCurrencyFreq', mapping_expr[F.col('SenderCurrency')])
test_df = test_df.withColumn('SenderCurrencyFreq', mapping_expr[F.col('SenderCurrency')])

# Create new column in train and test dataframes with sender_currency_avg dictionary
mapping_expr = F.create_map([F.lit(x) for x in chain(*sender_currency_avg.items())])

train_df = train_df.withColumn('SenderCurrencyAmtAvg', mapping_expr[F.col('SenderCurrency')])
test_df = test_df.withColumn('SenderCurrencyAmtAvg', mapping_expr[F.col('SenderCurrency')])

### Create `SenderReceiverFreq` feature: sender-receiver combination frequency for each sender and receiver

In [10]:
# Create column of senders concatenated with receivers
train_df = train_df.withColumn('SenderReceiver', F.concat(F.col('Sender'), F.col('Receiver')))
test_df = test_df.withColumn('SenderReceiver', F.concat(F.col('Sender'), F.col('Receiver')))

# Create dictionary of sender receiver frequency values to map from sender receiver values
sender_receiver_freq = {}

pd_train_df = train_df.select('SenderReceiver').toPandas()
pd_test_df = test_df.select('SenderReceiver').toPandas()

for sr in set(
    list(pd_train_df['SenderReceiver'].unique()) + list(pd_test_df['SenderReceiver'].unique())
):
    sender_receiver_freq[sr] = len(pd_train_df[pd_train_df['SenderReceiver'] == sr])

# Create new column in train and test dataframes with sender_receiver_freq dictionary
mapping_expr = F.create_map([F.lit(x) for x in chain(*sender_receiver_freq.items())])

train_df = train_df.withColumn('SenderReceiverFreq', mapping_expr[F.col('SenderReceiver')])
test_df = test_df.withColumn('SenderReceiverFreq', mapping_expr[F.col('SenderReceiver')])

### Drop unused categorical columns

We're going to drop all categorical columns here, save for the one we are one hot encoding which is `InstructedCurrency`

In [11]:
cols_to_drop = [
    'Timestamp',
    'UETR',
    'Sender',
    'Receiver',
    'TransactionReference',
    'OrderingAccount',
    'OrderingName',
    'OrderingStreet',
    'OrderingCountryCityZip',
    'BeneficiaryAccount',
    'BeneficiaryName',
    'BeneficiaryStreet',
    'BeneficiaryCountryCityZip',
    'SettlementDate',
    'SettlementCurrency',
    'SenderHour',
    'SenderCurrency',
    'SenderReceiver'
]

train_df = train_df.drop(*cols_to_drop)
test_df = test_df.drop(*cols_to_drop)

<br>

# Resample Training Dataset

As we saw above, the training dataset is extremely imbalanced in regards to target class distribution. In order to improve modeling performance, we'll rebalance the dataset through a combination of undersampling the majority class (non-amomalous transactions) and oversampling the minority class (anomalous transactions). We will take a 10% sample of the non-anomalous transactions, without replacement, and a 1,000% sample of anomalous transactions, with replacement. This means that we should have approximately 450k observations after resampling, and the class imbalance will increase to about 90%/10% non-anomalous to anomalous transactions.

In [12]:
resampled_df = spark_resample(train_df, undersample_fraction=0.1, oversample_fraction=10.0, 
                              class_field='Label', pos_class=1, shuffle=True, random_state=42)

In [13]:
# Print shape of resampled dataframe
print(f"resampled_df:  {resampled_df.count()} Rows, {len(resampled_df.columns)} Columns")

resampled_df:  454052 Rows, 10 Columns


In [None]:
# Preview resampled dataframe
resampled_df.show(3, vertical=True)

In [None]:
# Display value counts for 'Label' column (classification target) of resampled dataframe
resampled_class_counts = resampled_df.groupBy('Label').count().withColumn('percent', F.col('count')/resampled_df.count())

resampled_class_counts.show(truncate=10)

### Save resampled training dataframe and preprocessed test dataframe as CSV files

In [None]:
# resampled_df.coalesce(1).write.csv('/content/drive/MyDrive/Colab Notebooks/resampled_df.csv', header=True)
# test_df.coalesce(1).write.csv('/content/drive/MyDrive/Colab Notebooks/test_df_preprocessed.csv', header=True)