## **Installing the needed libraries**

In [2]:
!pip install pyspark
from pyspark.sql import SparkSession

    tinycss2 (>=1.1.0<1.2) ; extra == 'css'
             ~~~~~~~~^[0m[33m
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.0.1[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


## **Loading the libraries**

In [5]:
import os
import json
import zipfile
from pyspark.sql import SparkSession
import random
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType, DoubleType
from pyspark.sql import functions as F
from pyspark.sql.functions import col, sum, when, count, avg, to_timestamp, hour, dayofweek
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.sql import Window
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.ml.stat import Correlation
import numpy as np
import pandas as pd
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

## **Loading the dataset**

According to the dataset owner, the data is divided in 6 datasets into two groups of three:

* Group HI has a relatively higher illicit ratio (more laundering).
* Group LI has a relatively lower illicit ratio (less laundering).

Both HI and LI internally have three sets of data: small, medium, and large. Also, provides two files for each of the six datasets:

* A list of transactions in CSV format
* A text file list of laundering transactions.

So, we have a larga dataset, but for our project, we will use the following files:

* HI-Medium_Trans.csv
* HI-Medium_Patterns.txt
* LI-Medium_Trans.csv
* LI-Medium_Patterns.txt

Transaction files and Pattern files, each they will be merged in a single DataFrame.

### **Transactions Dataset**

In [6]:
import os

os.environ["JAVA_HOME"] = "/usr/local/opt/openjdk@11"
os.environ["PATH"] = f'{os.environ["JAVA_HOME"]}/bin:' + os.environ["PATH"]


In [7]:
# # Creating a SparkSession to manipulate the datasets
# spark = SparkSession.builder.appName("AML_CSVReader").getOrCreate()
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("AMLDataProcessing") \
    .config("spark.jars", "lib/snowflake-jdbc-3.13.33.jar,lib/spark-snowflake_2.12-2.11.0-spark_3.3.jar") \
    .getOrCreate()


/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/pyspark/bin/spark-class: line 71: /usr/local/opt/openjdk@11/bin/java: No such file or directory
/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/pyspark/bin/spark-class: line 97: CMD: bad array subscript
head: illegal line count -- -1


PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.

In [5]:
# ************** Define schema for the transactions. Not applied, for now. *******
'''
schema = StructType([
    StructField("Timestamp", StringType(), True),
    StructField("From_Bank", StringType(), True),
    StructField("From_Account", StringType(), True),
    StructField("To_Bank", StringType(), True),
    StructField("To_Account", StringType(), True),
    StructField("Amount_Received", FloatType(), True),
    StructField("Receiving_Currency", StringType(), True),
    StructField("Amount_Paid", FloatType(), True),
    StructField("Payment_Currency", StringType(), True),
    StructField("Payment_Format", StringType(), True)
])
'''

'\nschema = StructType([\n    StructField("Timestamp", StringType(), True),\n    StructField("From_Bank", StringType(), True),\n    StructField("From_Account", StringType(), True),\n    StructField("To_Bank", StringType(), True),\n    StructField("To_Account", StringType(), True),\n    StructField("Amount_Received", FloatType(), True),\n    StructField("Receiving_Currency", StringType(), True),\n    StructField("Amount_Paid", FloatType(), True),\n    StructField("Payment_Currency", StringType(), True),\n    StructField("Payment_Format", StringType(), True)\n])\n'

### High Ilicit - Medium

In [None]:
# hi_medium_df = spark.read.csv("data/LI-Medium_Trans.csv", header=True, inferSchema=True)

# hi_medium_df.show()

                                                                                

+----------------+---------+---------+-------+---------+---------------+------------------+-----------+----------------+--------------+-------------+
|       Timestamp|From Bank| Account2|To Bank| Account4|Amount Received|Receiving Currency|Amount Paid|Payment Currency|Payment Format|Is Laundering|
+----------------+---------+---------+-------+---------+---------------+------------------+-----------+----------------+--------------+-------------+
|2022/09/01 00:17|       20|800104D70|     20|800104D70|        6794.63|         US Dollar|    6794.63|       US Dollar|  Reinvestment|            0|
|2022/09/01 00:02|     3196|800107150|   3196|800107150|        7739.29|         US Dollar|    7739.29|       US Dollar|  Reinvestment|            0|
|2022/09/01 00:17|     1208|80010E430|   1208|80010E430|        1880.23|         US Dollar|    1880.23|       US Dollar|  Reinvestment|            0|
|2022/09/01 00:03|     1208|80010E650|     20|80010E6F0|    7.3966883E7|         US Dollar|7.3966883

25/03/14 01:09:43 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Timestamp, From Bank, Account, To Bank, Account, Amount Received, Receiving Currency, Amount Paid, Payment Currency, Payment Format, Is Laundering
 Schema: Timestamp, From Bank, Account2, To Bank, Account4, Amount Received, Receiving Currency, Amount Paid, Payment Currency, Payment Format, Is Laundering
Expected: Account2 but found: Account
CSV file: file:///kaggle/input/ibm-transactions-for-anti-money-laundering-aml/HI-Medium_Trans.csv


### 
Low Ilicit - Medium

In [None]:
# Read the CSV file into a Spark DataFrame
hi_small_df = spark.read.csv("/Users/shashidharbabu/Documents/01. SJSU/Semester 02/DATA 228 - Big Data Technologies /Anti-Money-Laundering-System/data/HI-Small_Trans.csv", header=True, inferSchema=True)

# Display the first few rows of the DataFrame
hi_small_df.show()

                                                                                

+----------------+---------+---------+-------+---------+---------------+------------------+-----------+----------------+--------------+-------------+
|       Timestamp|From Bank| Account2|To Bank| Account4|Amount Received|Receiving Currency|Amount Paid|Payment Currency|Payment Format|Is Laundering|
+----------------+---------+---------+-------+---------+---------------+------------------+-----------+----------------+--------------+-------------+
|2022/09/01 00:15|       20|800104D70|     20|800104D70|        8095.07|         US Dollar|    8095.07|       US Dollar|  Reinvestment|            0|
|2022/09/01 00:18|     3196|800107150|   3196|800107150|        7739.29|         US Dollar|    7739.29|       US Dollar|  Reinvestment|            0|
|2022/09/01 00:23|     1208|80010E430|   1208|80010E430|        2654.22|         US Dollar|    2654.22|       US Dollar|  Reinvestment|            0|
|2022/09/01 00:19|     3203|80010EA80|   3203|80010EA80|       13284.41|         US Dollar|   13284.

25/03/14 01:11:29 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Timestamp, From Bank, Account, To Bank, Account, Amount Received, Receiving Currency, Amount Paid, Payment Currency, Payment Format, Is Laundering
 Schema: Timestamp, From Bank, Account2, To Bank, Account4, Amount Received, Receiving Currency, Amount Paid, Payment Currency, Payment Format, Is Laundering
Expected: Account2 but found: Account
CSV file: file:///kaggle/input/ibm-transactions-for-anti-money-laundering-aml/LI-Medium_Trans.csv


In [None]:
# Combine both datasets
trans_df = hi_small_df

### **Patterns Dataset**

In [9]:
# Initialize Spark session (if not already initialized)
spark = SparkSession.builder.appName("AML_TXTReader").getOrCreate()

25/03/14 01:12:42 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


### High Ilicit - Medium

In [None]:
# Read the file into a Spark DataFrame
hi_patterns_df = spark.read.text("/Users/shashidharbabu/Documents/01. SJSU/Semester 02/DATA 228 - Big Data Technologies /Anti-Money-Laundering-System/data/HI-Small_Patterns.txt")

# Display the first few rows of the DataFrame
hi_patterns_df.show()

+--------------------+
|               value|
+--------------------+
|BEGIN LAUNDERING ...|
|2022/09/01 05:14,...|
|2022/09/03 13:09,...|
|2022/09/01 07:40,...|
|2022/09/01 14:19,...|
|2022/09/02 12:40,...|
|2022/09/03 06:34,...|
|END LAUNDERING AT...|
|                    |
|BEGIN LAUNDERING ...|
|2022/09/01 00:19,...|
|2022/09/01 19:35,...|
|2022/09/02 02:58,...|
|2022/09/02 18:02,...|
|2022/09/03 07:16,...|
|2022/09/03 11:39,...|
|2022/09/03 12:04,...|
|2022/09/04 07:27,...|
|2022/09/04 08:38,...|
|2022/09/05 13:23,...|
+--------------------+
only showing top 20 rows



### Low Ilicit - Medium

In [11]:
# Read the file into a Spark DataFrame
li_patterns_df = spark.read.text("../input/ibm-transactions-for-anti-money-laundering-aml/LI-Medium_Patterns.txt")

# Display the first few rows of the DataFrame
li_patterns_df.show()

+--------------------+
|               value|
+--------------------+
|BEGIN LAUNDERING ...|
|2022/09/01 00:29,...|
|2022/09/04 12:49,...|
|2022/09/01 12:28,...|
|2022/09/04 13:39,...|
|2022/09/01 14:26,...|
|2022/09/04 15:34,...|
|2022/09/02 15:52,...|
|2022/09/04 16:27,...|
|2022/09/02 17:41,...|
|2022/09/05 08:36,...|
|2022/09/03 08:04,...|
|2022/09/05 13:36,...|
|2022/09/03 15:18,...|
|2022/09/05 15:45,...|
|2022/09/03 16:50,...|
|2022/09/05 16:00,...|
|2022/09/03 22:43,...|
|2022/09/05 17:50,...|
|2022/09/03 23:22,...|
+--------------------+
only showing top 20 rows



In [12]:
# Combine both datasets
patterns_df = hi_patterns_df.union(li_patterns_df)

## **Exploring the dataset**

### Identify Laundering Patterns:
Each laundering attempt begins with BEGIN LAUNDERING ATTEMPT - [PATTERN] and ends with END LAUNDERING ATTEMPT.

Used regex to extract pattern types and transaction details.

In [13]:
# Step 1: Extract Pattern_Type where there is "BEGIN LAUNDERING ATTEMPT"
patterns_df = patterns_df.withColumn(
    "Pattern_Type",
    F.when(F.col("value").rlike("BEGIN LAUNDERING ATTEMPT - (.+)"),
           F.regexp_extract(F.col("value"), "BEGIN LAUNDERING ATTEMPT - (.+)", 1))
     .otherwise(None)
)

# Step 2: Forward fill the Pattern_Type to propagate it down until "END LAUNDERING ATTEMPT"
window_spec = Window.orderBy(F.monotonically_increasing_id()).rowsBetween(Window.unboundedPreceding, 0)
patterns_df = patterns_df.withColumn(
    "Pattern_Type",
    F.last("Pattern_Type", True).over(window_spec)
)

# Step 3: Filter out rows with "END LAUNDERING ATTEMPT" as they only mark the end of an attempt
patterns_df = patterns_df.filter(~F.col("value").contains("END LAUNDERING ATTEMPT"))

In [14]:
# Show the DataFrame without truncating long strings
patterns_df.show(truncate=False)

25/03/14 01:15:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/14 01:15:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/14 01:15:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/14 01:15:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/14 01:15:46 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+------------------------------------------------------------------------------------------------+-------------------+
|value                                                                                           |Pattern_Type       |
+------------------------------------------------------------------------------------------------+-------------------+
|BEGIN LAUNDERING ATTEMPT - STACK                                                                |STACK              |
|2022/09/01 05:14,00952,8139F54E0,0111632,8062C56E0,5331.44,US Dollar,5331.44,US Dollar,ACH,1    |STACK              |
|2022/09/03 13:09,0111632,8062C56E0,008456,81363F620,5602.59,US Dollar,5602.59,US Dollar,ACH,1   |STACK              |
|2022/09/01 07:40,0118693,823D5EB90,013729,801CF2E60,1400.54,US Dollar,1400.54,US Dollar,ACH,1   |STACK              |
|2022/09/01 14:19,013729,801CF2E60,0123621,81A7090F0,1467.94,US Dollar,1467.94,US Dollar,ACH,1   |STACK              |
|2022/09/02 12:40,0024750,81363F410,0213834,8087

In [15]:
from pyspark.sql import functions as F

# Remove any text after the colon in Pattern_Type if it exists
patterns_df = patterns_df.withColumn(
    "Pattern_Type",
    F.regexp_replace(F.col("Pattern_Type"), ":.*", "")
)

# Display the DataFrame (use .show() in local PySpark)
patterns_df.show(truncate=False)

25/03/14 01:16:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/14 01:16:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/14 01:16:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/14 01:16:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/14 01:16:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+------------------------------------------------------------------------------------------------+------------+
|value                                                                                           |Pattern_Type|
+------------------------------------------------------------------------------------------------+------------+
|BEGIN LAUNDERING ATTEMPT - STACK                                                                |STACK       |
|2022/09/01 05:14,00952,8139F54E0,0111632,8062C56E0,5331.44,US Dollar,5331.44,US Dollar,ACH,1    |STACK       |
|2022/09/03 13:09,0111632,8062C56E0,008456,81363F620,5602.59,US Dollar,5602.59,US Dollar,ACH,1   |STACK       |
|2022/09/01 07:40,0118693,823D5EB90,013729,801CF2E60,1400.54,US Dollar,1400.54,US Dollar,ACH,1   |STACK       |
|2022/09/01 14:19,013729,801CF2E60,0123621,81A7090F0,1467.94,US Dollar,1467.94,US Dollar,ACH,1   |STACK       |
|2022/09/02 12:40,0024750,81363F410,0213834,808757B00,16898.29,US Dollar,16898.29,US Dollar,ACH,1|STACK 

In [16]:
# Filter to get only transaction lines and ignore start/end laundering attempt lines
laundering_transactions = patterns_df.filter(patterns_df.value.rlike(r'\d{4}/\d{2}/\d{2}'))

# Unpersist patterns_df to free memory
patterns_df.unpersist()

# Cache laundering_transactions for reuse
laundering_transactions.cache()

# Display the filtered DataFrame (use .show() instead of display())
laundering_transactions.show(truncate=False)

25/03/14 01:16:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/14 01:16:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/14 01:16:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/14 01:16:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/14 01:16:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
[Stage 16:>                                                         (0 + 1) / 1]

+------------------------------------------------------------------------------------------------+------------+
|value                                                                                           |Pattern_Type|
+------------------------------------------------------------------------------------------------+------------+
|2022/09/01 05:14,00952,8139F54E0,0111632,8062C56E0,5331.44,US Dollar,5331.44,US Dollar,ACH,1    |STACK       |
|2022/09/03 13:09,0111632,8062C56E0,008456,81363F620,5602.59,US Dollar,5602.59,US Dollar,ACH,1   |STACK       |
|2022/09/01 07:40,0118693,823D5EB90,013729,801CF2E60,1400.54,US Dollar,1400.54,US Dollar,ACH,1   |STACK       |
|2022/09/01 14:19,013729,801CF2E60,0123621,81A7090F0,1467.94,US Dollar,1467.94,US Dollar,ACH,1   |STACK       |
|2022/09/02 12:40,0024750,81363F410,0213834,808757B00,16898.29,US Dollar,16898.29,US Dollar,ACH,1|STACK       |
|2022/09/03 06:34,0213834,808757B00,000,800073EF0,17607.19,US Dollar,17607.19,US Dollar,ACH,1    |STACK 

                                                                                

In [17]:
# Updated Code for Better Readability
columns = [
    "Timestamp", "From_Bank", "From_Account", "To_Bank", "To_Account",
    "Amount_Received", "Receiving_currency", "Amount_paid",
    "Payment_currency", "Payment_Format", "isLaundering"
]

for idx, col_name in enumerate(columns):
    laundering_transactions = laundering_transactions.withColumn(col_name, F.split(F.col("value"), ",").getItem(idx))

In [18]:
# Display the results
laundering_transactions.show(truncate=False)

+------------------------------------------------------------------------------------------------+------------+----------------+---------+------------+-------+----------+---------------+------------------+-----------+----------------+--------------+------------+
|value                                                                                           |Pattern_Type|Timestamp       |From_Bank|From_Account|To_Bank|To_Account|Amount_Received|Receiving_currency|Amount_paid|Payment_currency|Payment_Format|isLaundering|
+------------------------------------------------------------------------------------------------+------------+----------------+---------+------------+-------+----------+---------------+------------------+-----------+----------------+--------------+------------+
|2022/09/01 05:14,00952,8139F54E0,0111632,8062C56E0,5331.44,US Dollar,5331.44,US Dollar,ACH,1    |STACK       |2022/09/01 05:14|00952    |8139F54E0   |0111632|8062C56E0 |5331.44        |US Dollar         |5331.4

In [19]:
laundering_transactions = laundering_transactions.select("Timestamp", "From_Bank", "Pattern_Type", "isLaundering")

#  Display the results
laundering_transactions.show(truncate=False)

+----------------+---------+------------+------------+
|Timestamp       |From_Bank|Pattern_Type|isLaundering|
+----------------+---------+------------+------------+
|2022/09/01 05:14|00952    |STACK       |1           |
|2022/09/03 13:09|0111632  |STACK       |1           |
|2022/09/01 07:40|0118693  |STACK       |1           |
|2022/09/01 14:19|013729   |STACK       |1           |
|2022/09/02 12:40|0024750  |STACK       |1           |
|2022/09/03 06:34|0213834  |STACK       |1           |
|2022/09/01 00:19|0134266  |CYCLE       |1           |
|2022/09/01 19:35|0036925  |CYCLE       |1           |
|2022/09/02 02:58|0119211  |CYCLE       |1           |
|2022/09/02 18:02|0132965  |CYCLE       |1           |
|2022/09/03 07:16|0137089  |CYCLE       |1           |
|2022/09/03 11:39|0216618  |CYCLE       |1           |
|2022/09/03 12:04|0024083  |CYCLE       |1           |
|2022/09/04 07:27|0038110  |CYCLE       |1           |
|2022/09/04 08:38|0225015  |CYCLE       |1           |
|2022/09/0

In [20]:
# Count empty strings in each column
empty_string_counts = laundering_transactions.select(
    [sum(when(col(c) == "", 1).otherwise(0)).alias(c) for c in laundering_transactions.columns])

# Show the result
empty_string_counts.show(truncate=False)

+---------+---------+------------+------------+
|Timestamp|From_Bank|Pattern_Type|isLaundering|
+---------+---------+------------+------------+
|0        |0        |0           |0           |
+---------+---------+------------+------------+



In [21]:
laundering_transactions.createOrReplaceTempView("combined")

In [22]:
# Query using spark.sql()
result_df = spark.sql("""
    SELECT Pattern_Type, COUNT(Pattern_Type) AS count
    FROM combined
    GROUP BY Pattern_Type
""")

# Show the result
result_df.show(truncate=False)

+--------------+-----+
|Pattern_Type  |count|
+--------------+-----+
|STACK         |4601 |
|CYCLE         |2518 |
|FAN-IN        |2644 |
|GATHER-SCATTER|4830 |
|BIPARTITE     |2623 |
|FAN-OUT       |2617 |
|SCATTER-GATHER|4874 |
|RANDOM        |1945 |
+--------------+-----+



In [23]:
laundering_transactions.cache().groupBy("isLaundering").count().show()

+------------+-----+
|isLaundering|count|
+------------+-----+
|           1|26652|
+------------+-----+

