### 1. Spark Session and Context

In [None]:
!pip install pyspark



In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.sql.functions import hour, minute
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [2]:
spark = SparkSession.builder\
    .appName('Anti Money Laundering Detection') \
        .getOrCreate()

In [5]:
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

Mounted at /content/drive


In [14]:
!ls '/content/drive/My Drive/Colab Notebooks/'

 AML_code_0.0.ipynb		   'home credit project.ipynb'			 Untitled
 AML_data.csv			   'Intel image classification practice.ipynb'	 Untitled0.ipynb
 cab_demand_prediction_v0.6.ipynb  'Kaggle RSNA competition.ipynb'		 Untitled1.ipynb
'Copy of optimizer_plots.ipynb'     Lab1_Linear_Regression_6381.ipynb
'Detecting credit fraud.ipynb'	   'Skin cancer image classification.ipynb'


In [15]:
file_path = '/content/drive/My Drive/Colab Notebooks/AML_data.csv'

df = spark.read.csv(file_path,header=True,
                      inferSchema = True)
df.show(5)

+-------------------+----------+--------------+----------------+--------+----------------+-----------------+--------------------+----------------------+------------+-------------+--------------------+
|               Time|      Date|Sender_account|Receiver_account|  Amount|Payment_currency|Received_currency|Sender_bank_location|Receiver_bank_location|Payment_type|Is_laundering|     Laundering_type|
+-------------------+----------+--------------+----------------+--------+----------------+-----------------+--------------------+----------------------+------------+-------------+--------------------+
|2024-11-20 10:35:19|2022-10-07|    8724731955|      2769355426| 1459.15|       UK pounds|        UK pounds|                  UK|                    UK|Cash Deposit|            0|Normal_Cash_Deposits|
|2024-11-20 10:35:20|2022-10-07|    1491989064|      8401255335| 6019.64|       UK pounds|           Dirham|                  UK|                   UAE|Cross-border|            0|      Normal_Fan_

In [16]:
df = df.withColumn("hour",hour(df['Time']))
df = df.withColumn("minute",minute(df['Time']))
df.show(5)

+-------------------+----------+--------------+----------------+--------+----------------+-----------------+--------------------+----------------------+------------+-------------+--------------------+----+------+
|               Time|      Date|Sender_account|Receiver_account|  Amount|Payment_currency|Received_currency|Sender_bank_location|Receiver_bank_location|Payment_type|Is_laundering|     Laundering_type|hour|minute|
+-------------------+----------+--------------+----------------+--------+----------------+-----------------+--------------------+----------------------+------------+-------------+--------------------+----+------+
|2024-11-20 10:35:19|2022-10-07|    8724731955|      2769355426| 1459.15|       UK pounds|        UK pounds|                  UK|                    UK|Cash Deposit|            0|Normal_Cash_Deposits|  10|    35|
|2024-11-20 10:35:20|2022-10-07|    1491989064|      8401255335| 6019.64|       UK pounds|           Dirham|                  UK|                   

In [17]:
df = df.na.drop()  # Drop rows with null values

In [18]:
## Converting categorical features to numerical values

currency_indexer = StringIndexer(inputCol="Payment_currency", outputCol="Payment_currency_index")
rcurrency_indexer = StringIndexer(inputCol="Received_currency", outputCol="Received_currency_index")
location_indexer = StringIndexer(inputCol="Sender_bank_location", outputCol="Sender_bank_location_index")
rlocation_indexer = StringIndexer(inputCol="Receiver_bank_location", outputCol="Receiver_bank_location_index")
Payment_type_indexer = StringIndexer(inputCol="Payment_type", outputCol="Payment_type_index")
Laundering_type_indexer = StringIndexer(inputCol="Laundering_type", outputCol="Laundering_type_index")

In [19]:
assembler = VectorAssembler(
    inputCols = ['Amount', 'hour','minute',
         'Payment_currency_index', 'Received_currency_index',
         'Sender_bank_location_index','Receiver_bank_location_index',
         'Payment_type_index','Laundering_type_index'],
         outputCol = "features"
)

In [20]:
classifier = DecisionTreeClassifier(labelCol= "Is_laundering",
                                    featuresCol = "features")

In [21]:
# Create a pipeline with all the stages
pipeline = Pipeline(stages=[currency_indexer, rcurrency_indexer, location_indexer, rlocation_indexer,
                            Payment_type_indexer, Laundering_type_indexer, assembler, classifier])


In [22]:
# train test split

train_df, test_df = df.randomSplit([0.8,0.2], seed =42)

In [23]:
# Fit the pipeline on the training data
model = pipeline.fit(train_df)

In [24]:
## Predictions on test set
predictions = model.transform(test_df)

In [25]:
## Model evaluation

evaluator = MulticlassClassificationEvaluator(
    labelCol = "Is_laundering", predictionCol = "prediction",
    metricName = "accuracy"
)

accuracy = evaluator.evaluate(predictions)

print(f'Test accuracy: ', accuracy)

predictions.show(10)

Test accuracy:  1.0
+-------------------+----------+--------------+----------------+--------+----------------+-----------------+--------------------+----------------------+---------------+-------------+--------------------+----+------+----------------------+-----------------------+--------------------------+----------------------------+------------------+---------------------+--------------------+---------------+-----------+----------+
|               Time|      Date|Sender_account|Receiver_account|  Amount|Payment_currency|Received_currency|Sender_bank_location|Receiver_bank_location|   Payment_type|Is_laundering|     Laundering_type|hour|minute|Payment_currency_index|Received_currency_index|Sender_bank_location_index|Receiver_bank_location_index|Payment_type_index|Laundering_type_index|            features|  rawPrediction|probability|prediction|
+-------------------+----------+--------------+----------------+--------+----------------+-----------------+--------------------+-----------

In [27]:
# Save the trained pipeline
model.save("/content/drive/My Drive/saved_pipeline")
print("Pipeline saved successfully.")

Pipeline saved successfully.


> Done