
 Initialize Spark

In [1]:
from pyspark.sql import SparkSession

# Initialize Spark session
spk = SparkSession.builder \
    .appName("BigBlackMoneyAnalysis") \
    .getOrCreate()

# Load the dataset
ab = spk.read.csv("Big_Black_Money_Dataset.csv", header=True, inferSchema=True)
ab

DataFrame[Transaction ID: string, Country: string, Amount (USD): double, Transaction Type: string, Date of Transaction: string, Person Involved: string, Industry: string, Destination Country: string, Reported by Authority: boolean, Source of Money: string, Money Laundering Risk Score: int, Shell Companies Involved: int, Financial Institution: string, Tax Haven Country: string]

DAG Visualizations Code

In [16]:
!pip install pyspark findspark pyngrok
from pyngrok import ngrok, conf
import getpass
import findspark
findspark.init()
# Set up ngrok
print("Enter your authtoken, which can be copied from https://dashboard.ngrok.com/get-started/your-authtoken")
auth_token = getpass.getpass()
conf.get_default().auth_token = auth_token
ui_port = 4040

try:
    # Connect ngrok to Spark UI
    public_url = ngrok.connect(ui_port).public_url
    print(f" * ngrok tunnel \"{public_url}\" -> \"http://127.0.0.1:{ui_port}\"")
except Exception as e:
    print("Error setting up ngrok tunnel:", e)

Enter your authtoken, which can be copied from https://dashboard.ngrok.com/get-started/your-authtoken
··········




 * ngrok tunnel "https://fc89-34-75-137-85.ngrok-free.app" -> "http://127.0.0.1:4040"


In [42]:
spk.stop()

In [37]:
!curl http:http://127.0.0.1:4040

curl: (3) URL using bad/illegal format or missing URL


In [38]:
from pyngrok import ngrok

# Get a list of all active tunnels
tunnels = ngrok.get_tunnels()

# Disconnect each tunnel
for tunnel in tunnels:
    ngrok.disconnect(tunnel.public_url)

# Kill the ngrok process to ensure all tunnels are closed
ngrok.kill()



Distributed datacleaning  or preprocessing

1. converting date column to timestamp

In [2]:
from pyspark.sql.functions import col

# Convert 'Date of Transaction' to timestamp
ab = ab.withColumn("Date of Transaction", col("Date of Transaction").cast("timestamp"))

2. Handling missing Values

In [3]:
from pyspark.sql.functions import mean

# Impute missing values in 'Amount (USD)' with the mean amount
m_amount = ab.select(mean("Amount (USD)")).collect()[0][0]
ab = ab.na.fill({"Amount (USD)": m_amount})

3. Removing Duplicates

In [4]:
# Remove duplicates based on 'Transaction ID'
ab = ab.dropDuplicates(["Transaction ID"])

4.Detecting and removing Outliers

In [5]:
quant = ab.approxQuantile("Amount (USD)", [0.25, 0.75], 0.05)
iqr = quant[1] - quant[0]
lr_bound = quant[0] - 1.5 * iqr
up_bound = quant[1] + 1.5 * iqr

# Filter out outliers
ab = ab.filter((col("Amount (USD)") >= lr_bound) & (col("Amount (USD)") <= up_bound))

5. Feature Engineering is by creating risk category

In [6]:
from pyspark.sql.functions import when

# Create a new column 'Risk Category' based on 'Money Laundering Risk Score'
ab = ab.withColumn("Risk Category", when(col("Money Laundering Risk Score") >= 7, "High")
                                      .when(col("Money Laundering Risk Score") >= 4, "Medium")
                                      .otherwise("Low"))

6. One-Hot encoding for categorical variables

In [7]:
from pyspark.sql import functions as F
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

# Convert Boolean column to String
ab = ab.withColumn("Reported by Authority", F.col("Reported by Authority").cast("string"))

# List of categorical columns
categorical_columns = [
    'Country',
    'Transaction Type',
    'Person Involved',
    'Industry',
    'Destination Country',
    'Reported by Authority',  # Now converted to string
    'Source of Money',
    'Shell Companies Involved',
    'Financial Institution',
    'Tax Haven Country'
]

# Create stages for the pipeline
stages = []

for column in categorical_columns:
    # Create a StringIndexer for each categorical column
    indexer = StringIndexer(inputCol=column, outputCol=f"{column}Index")

    # Create a OneHotEncoder for each indexed column
    encoder = OneHotEncoder(inputCols=[f"{column}Index"], outputCols=[f"{column}Vec"])

    # Add both stages to the list
    stages.extend([indexer, encoder])

# Create and fit the pipeline
pipeline = Pipeline(stages=stages)
ab = pipeline.fit(ab).transform(ab)

# Show some transformed columns
ab.select([f"{column}Vec" for column in categorical_columns]).show()


+-------------+-------------------+-------------------+-------------+----------------------+------------------------+------------------+---------------------------+------------------------+--------------------+
|   CountryVec|Transaction TypeVec| Person InvolvedVec|  IndustryVec|Destination CountryVec|Reported by AuthorityVec|Source of MoneyVec|Shell Companies InvolvedVec|Financial InstitutionVec|Tax Haven CountryVec|
+-------------+-------------------+-------------------+-------------+----------------------+------------------------+------------------+---------------------------+------------------------+--------------------+
|(9,[3],[1.0])|      (4,[1],[1.0])|(6285,[5863],[1.0])|(6,[4],[1.0])|         (9,[7],[1.0])|           (1,[0],[1.0])|     (5,[0],[1.0])|              (9,[4],[1.0])|       (498,[282],[1.0])|       (5,[1],[1.0])|
|(9,[7],[1.0])|      (4,[0],[1.0])|(6285,[2616],[1.0])|    (6,[],[])|             (9,[],[])|           (1,[0],[1.0])|     (5,[1],[1.0])|              (9,[5]

7. Normalizing the Amount (USD)

In [18]:
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import MinMaxScaler, VectorAssembler

# Ensure the column is numeric
ab = ab.withColumn("Amount (USD)", ab["Amount (USD)"].cast(DoubleType()))

# Handle nulls if necessary
ab = ab.na.drop(subset=["Amount (USD)"])

# Assemble the vector
ass = VectorAssembler(inputCols=["Amount (USD)"], outputCol="AmountVec")
ab = ass.transform(ab)

# Check the output of the assembler
ab.select("AmountVec").show(truncate=False)

# Apply MinMaxScaler
sclr = MinMaxScaler(inputCol="AmountVec", outputCol="NormalizedAmount")
ab = sclr.fit(ab).transform(ab)

# Check the final output
ab.select("NormalizedAmount").show(truncate=False)


+-------------+
|AmountVec    |
+-------------+
|[3267530.479]|
|[4965766.725]|
|[94167.50048]|
|[386420.1412]|
|[643378.4264]|
|[4921056.454]|
|[3262817.984]|
|[4687204.61] |
|[903905.9486]|
|[4174994.676]|
|[2675518.773]|
|[3370300.35] |
|[729587.7725]|
|[2973516.598]|
|[3669888.192]|
|[3740050.645]|
|[3262817.984]|
|[3865495.571]|
|[2168678.373]|
|[1486877.067]|
+-------------+
only showing top 20 rows

+---------------------+
|NormalizedAmount     |
+---------------------+
|[0.6535306150923231] |
|[0.9931906077238586] |
|[0.01883420672153462]|
|[0.07728692790641858]|
|[0.12868051314122814]|
|[0.9842482180214933] |
|[0.6525880807301304] |
|[0.9374760944156055] |
|[0.18078797255931206]|
|[0.835030264032452]  |
|[0.5351238314829343] |
|[0.6740853604693712] |
|[0.1459230292693967] |
|[0.5947256326352304] |
|[0.7340051769530299] |
|[0.7480381940465719] |
|[0.6525880807301304] |
|[0.7731281205754533] |
|[0.4337519481923427] |
|[0.2973865708088409] |
+---------------------+
only showing t

8. Aggeration Transactions per Persons

In [8]:
from pyspark.sql.functions import count, sum

# Aggregate number of transactions and total amount per person
prsn_agg_ab = ab.groupBy("Person Involved").agg(
    count("*").alias("Num_Transactions"),
    sum("Amount (USD)").alias("Total_Amount")
)

9. Windowing Function for Rolling Average Amount

In [9]:
from pyspark.sql.window import Window
from pyspark.sql.functions import avg

winspec = Window.partitionBy("Country").orderBy("Date of Transaction").rowsBetween(-3, 0)

# Calculate rolling average amount for each country over last 4 transactions
ab= ab.withColumn("Rolling_Avg_Amount", avg("Amount (USD)").over(winspec))

10. Splitting the dataset into train and test

In [10]:
train_data, test_data = ab.randomSplit([0.8, 0.2], seed=42)

##**Machine Learning Algorithms with PySpark**




In [11]:
import time
execution_times = []
precisions = []
accuracies = []
f1_scores = []
recalls = []

Logistic Regression

In [17]:
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import time

spark = SparkSession.builder.appName("MoneyLaunderingModel").getOrCreate()
data = spark.read.csv("/content/Big_Black_Money_Dataset.csv", header=True, inferSchema=True)
data = data.drop("features").dropna()
data = data.withColumn("Reported by Authority", col("Reported by Authority").cast("int"))

categorical_columns = ['Transaction Type', 'Person Involved', 'Industry',
                       'Destination Country', 'Source of Money', 'Financial Institution', 'Tax Haven Country']
numerical_columns = ['Amount (USD)', 'Shell Companies Involved']

indexers = [StringIndexer(inputCol=col, outputCol=col + "_index") for col in categorical_columns]
encoders = [OneHotEncoder(inputCol=col + "_index", outputCol=col + "_onehot") for col in categorical_columns]
assembler_input = numerical_columns + [col + "_onehot" for col in categorical_columns]
assembler = VectorAssembler(inputCols=assembler_input, outputCol='features')
log_reg = LogisticRegression(featuresCol='features', labelCol='Reported by Authority')
pipeline = Pipeline(stages=indexers + encoders + [assembler, log_reg])

execution_times = []
start_time = time.time()

try:
    model = pipeline.fit(data)
except Exception as e:
    print("Error during pipeline fitting:", e)

predictions_log_reg = model.transform(data)
end_time = time.time()
execution_time = end_time - start_time

confusion_matrix = predictions_log_reg.groupBy('Reported by Authority', 'prediction').count().orderBy('Reported by Authority', 'prediction')
print("Confusion Matrix:")
confusion_matrix.show()

evaluator = MulticlassClassificationEvaluator(labelCol='Reported by Authority', predictionCol='prediction')

accuracy = evaluator.evaluate(predictions_log_reg)
print(f"Accuracy: {accuracy:.4f}")

precision = evaluator.evaluate(predictions_log_reg, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions_log_reg, {evaluator.metricName: "weightedRecall"})
f1_score = evaluator.evaluate(predictions_log_reg, {evaluator.metricName: "f1"})

print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1_score:.4f}")

execution_times.append(execution_time)
print(f"Execution Time: {execution_time} seconds")


Confusion Matrix:
+---------------------+----------+-----+
|Reported by Authority|prediction|count|
+---------------------+----------+-----+
|                    0|       0.0| 7648|
|                    0|       1.0|  226|
|                    1|       0.0|  270|
|                    1|       1.0| 1701|
+---------------------+----------+-----+

Accuracy: 0.9494
Precision: 0.9492
Recall: 0.9496
F1 Score: 0.9494
Execution Time: 21.83720302581787 seconds


Naive Bayes

In [15]:
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.classification import NaiveBayes
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
import time
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

spark = SparkSession.builder.appName("MoneyLaunderingModel").getOrCreate()
data = spark.read.csv("/content/Big_Black_Money_Dataset.csv", header=True, inferSchema=True)
data = data.drop("features").dropna()
categorical_columns = ['Transaction Type', 'Person Involved', 'Industry',
                       'Destination Country', 'Source of Money', 'Financial Institution', 'Tax Haven Country']
numerical_columns = ['Amount (USD)', 'Money Laundering Risk Score', 'Shell Companies Involved']
indexers = [StringIndexer(inputCol=col, outputCol=col + "_index") for col in categorical_columns]
encoders = [OneHotEncoder(inputCol=col + "_index", outputCol=col + "_onehot") for col in categorical_columns]
assembler_input = numerical_columns + [col + "_onehot" for col in categorical_columns]
assembler = VectorAssembler(inputCols=assembler_input, outputCol='features')
nb = NaiveBayes(featuresCol='features', labelCol='Money Laundering Risk Score', modelType="multinomial")
pipeline = Pipeline(stages=indexers + encoders + [assembler, nb])
execution_times = []
start_time = time.time()

try:
    model = pipeline.fit(data)
except Exception as e:
    print("Error during pipeline fitting:", e)

predictions_nb = model.transform(data)
end_time = time.time()
execution_time = end_time - start_time

confusion_matrix = predictions_nb.groupBy('Money Laundering Risk Score', 'prediction').count().orderBy('Money Laundering Risk Score', 'prediction')
print("Confusion Matrix:")
confusion_matrix.show()

evaluator = BinaryClassificationEvaluator(labelCol='Money Laundering Risk Score', rawPredictionCol='prediction')
accuracy = evaluator.evaluate(predictions_nb)
print(f"Accuracy: {accuracy:.4f}")

execution_times.append(execution_time)
print(f"Execution Time: {execution_time} seconds")

evaluator_precision = MulticlassClassificationEvaluator(
    labelCol="Money Laundering Risk Score",
    predictionCol="prediction",
    metricName="weightedPrecision"
)
precision = evaluator_precision.evaluate(predictions_nb)
print(f"Precision: {precision:.2f}")

evaluator_recall = MulticlassClassificationEvaluator(
    labelCol="Money Laundering Risk Score",
    predictionCol="prediction",
    metricName="weightedRecall"
)
recall = evaluator_recall.evaluate(predictions_nb)
print(f"Recall: {recall:.2f}")

evaluator_f1 = MulticlassClassificationEvaluator(
    labelCol="Money Laundering Risk Score",
    predictionCol="prediction",
    metricName="f1"
)
f1_score = evaluator_f1.evaluate(predictions_nb)
print(f"F1 Score: {f1_score:.2f}")


root
 |-- Transaction ID: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Amount (USD): double (nullable = true)
 |-- Transaction Type: string (nullable = true)
 |-- Date of Transaction: string (nullable = true)
 |-- Person Involved: string (nullable = true)
 |-- Industry: string (nullable = true)
 |-- Destination Country: string (nullable = true)
 |-- Reported by Authority: boolean (nullable = true)
 |-- Source of Money: string (nullable = true)
 |-- Money Laundering Risk Score: integer (nullable = true)
 |-- Shell Companies Involved: integer (nullable = true)
 |-- Financial Institution: string (nullable = true)
 |-- Tax Haven Country: string (nullable = true)

Confusion Matrix:
+---------------------------+----------+-----+
|Money Laundering Risk Score|prediction|count|
+---------------------------+----------+-----+
|                          0|       1.0|    6|
|                          0|       4.0|    1|
|                          1|       1.0|  862|
|       

3. Support Vector Machine

In [18]:
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.classification import LinearSVC
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import time

spark = SparkSession.builder.appName("MoneyLaunderingModel").getOrCreate()
data = spark.read.csv("/content/Big_Black_Money_Dataset.csv", header=True, inferSchema=True)
data = data.drop("features").dropna()
data = data.withColumn("Reported by Authority", col("Reported by Authority").cast("int"))

categorical_columns = ['Transaction Type', 'Person Involved', 'Industry',
                       'Destination Country', 'Source of Money', 'Financial Institution', 'Tax Haven Country']
numerical_columns = ['Amount (USD)', 'Shell Companies Involved']

indexers = [StringIndexer(inputCol=col, outputCol=col + "_index") for col in categorical_columns]
encoders = [OneHotEncoder(inputCol=col + "_index", outputCol=col + "_onehot") for col in categorical_columns]
assembler_input = numerical_columns + [col + "_onehot" for col in categorical_columns]
assembler = VectorAssembler(inputCols=assembler_input, outputCol='features')
svm = LinearSVC(featuresCol='features', labelCol='Reported by Authority')
pipeline = Pipeline(stages=indexers + encoders + [assembler, svm])

execution_times = []
start_time = time.time()

try:
    model = pipeline.fit(data)
except Exception as e:
    print("Error during pipeline fitting:", e)

predictions_svm = model.transform(data)
end_time = time.time()
execution_time = end_time - start_time

confusion_matrix = predictions_svm.groupBy('Reported by Authority', 'prediction').count().orderBy('Reported by Authority', 'prediction')
print("Confusion Matrix:")
confusion_matrix.show()

evaluator = BinaryClassificationEvaluator(labelCol='Reported by Authority', rawPredictionCol='prediction')
accuracy = evaluator.evaluate(predictions_svm)
print(f"Accuracy: {accuracy:.4f}")

execution_times.append(execution_time)
print(f"Execution Time: {execution_time} seconds")

evaluator_precision = MulticlassClassificationEvaluator(
    labelCol="Reported by Authority",
    predictionCol="prediction",
    metricName="weightedPrecision"
)
precision = evaluator_precision.evaluate(predictions_svm)
print(f"Precision: {precision:.4f}")

evaluator_recall = MulticlassClassificationEvaluator(
    labelCol="Reported by Authority",
    predictionCol="prediction",
    metricName="weightedRecall"
)
recall = evaluator_recall.evaluate(predictions_svm)
print(f"Recall: {recall:.4f}")

evaluator_f1 = MulticlassClassificationEvaluator(
    labelCol="Reported by Authority",
    predictionCol="prediction",
    metricName="f1"
)
f1_score = evaluator_f1.evaluate(predictions_svm)
print(f"F1 Score: {f1_score:.4f}")


Confusion Matrix:
+---------------------+----------+-----+
|Reported by Authority|prediction|count|
+---------------------+----------+-----+
|                    0|       0.0| 7722|
|                    0|       1.0|  152|
|                    1|       0.0|  274|
|                    1|       1.0| 1697|
+---------------------+----------+-----+

Accuracy: 0.9208
Execution Time: 30.536436319351196 seconds
Precision: 0.9561
Recall: 0.9567
F1 Score: 0.9562


4.KNN

In [19]:
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.classification import LinearSVC
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import time

spark = SparkSession.builder.appName("MoneyLaunderingModel").getOrCreate()
data = spark.read.csv("/content/Big_Black_Money_Dataset.csv", header=True, inferSchema=True)
data = data.drop("features").dropna()
data = data.withColumn("Reported by Authority", col("Reported by Authority").cast("int"))

categorical_columns = ['Transaction Type', 'Person Involved', 'Industry',
                       'Destination Country', 'Source of Money', 'Financial Institution', 'Tax Haven Country']
numerical_columns = ['Amount (USD)', 'Shell Companies Involved']

indexers = [StringIndexer(inputCol=col, outputCol=col + "_index") for col in categorical_columns]
encoders = [OneHotEncoder(inputCol=col + "_index", outputCol=col + "_onehot") for col in categorical_columns]
assembler_input = numerical_columns + [col + "_onehot" for col in categorical_columns]
assembler = VectorAssembler(inputCols=assembler_input, outputCol='features')
svm = LinearSVC(featuresCol='features', labelCol='Reported by Authority')
pipeline = Pipeline(stages=indexers + encoders + [assembler, svm])

execution_times = []
start_time = time.time()

try:
    model = pipeline.fit(data)
except Exception as e:
    print("Error during pipeline fitting:", e)

predictions_svm = model.transform(data)
end_time = time.time()
execution_time = end_time - start_time

confusion_matrix = predictions_svm.groupBy('Reported by Authority', 'prediction').count().orderBy('Reported by Authority', 'prediction')
print("Confusion Matrix:")
confusion_matrix.show()

evaluator = BinaryClassificationEvaluator(labelCol='Reported by Authority', rawPredictionCol='prediction')
accuracy = evaluator.evaluate(predictions_svm)
print(f"Accuracy: {accuracy:.4f}")

execution_times.append(execution_time)
print(f"Execution Time: {execution_time} seconds")

evaluator_precision = MulticlassClassificationEvaluator(
    labelCol="Reported by Authority",
    predictionCol="prediction",
    metricName="weightedPrecision"
)
precision = evaluator_precision.evaluate(predictions_svm)
print(f"Precision: {precision:.4f}")

evaluator_recall = MulticlassClassificationEvaluator(
    labelCol="Reported by Authority",
    predictionCol="prediction",
    metricName="weightedRecall"
)
recall = evaluator_recall.evaluate(predictions_svm)
print(f"Recall: {recall:.4f}")

evaluator_f1 = MulticlassClassificationEvaluator(
    labelCol="Reported by Authority",
    predictionCol="prediction",
    metricName="f1"
)
f1_score = evaluator_f1.evaluate(predictions_svm)
print(f"F1 Score: {f1_score:.4f}")


Confusion Matrix:
+---------------------+----------+-----+
|Reported by Authority|prediction|count|
+---------------------+----------+-----+
|                    0|       0.0| 7722|
|                    0|       1.0|  152|
|                    1|       0.0|  274|
|                    1|       1.0| 1697|
+---------------------+----------+-----+

Accuracy: 0.9208
Execution Time: 21.688189268112183 seconds
Precision: 0.9561
Recall: 0.9567
F1 Score: 0.9562


5.SGD

In [20]:
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.classification import LinearSVC
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import time

spark = SparkSession.builder.appName("MoneyLaunderingModel").getOrCreate()
data = spark.read.csv("/content/Big_Black_Money_Dataset.csv", header=True, inferSchema=True)
data = data.drop("features").dropna()
data = data.withColumn("Reported by Authority", col("Reported by Authority").cast("int"))

categorical_columns = ['Transaction Type', 'Person Involved', 'Industry',
                       'Destination Country', 'Source of Money', 'Financial Institution', 'Tax Haven Country']
numerical_columns = ['Amount (USD)', 'Shell Companies Involved']

indexers = [StringIndexer(inputCol=col, outputCol=col + "_index") for col in categorical_columns]
encoders = [OneHotEncoder(inputCol=col + "_index", outputCol=col + "_onehot") for col in categorical_columns]
assembler_input = numerical_columns + [col + "_onehot" for col in categorical_columns]
assembler = VectorAssembler(inputCols=assembler_input, outputCol='features')
svm = LinearSVC(featuresCol='features', labelCol='Reported by Authority')
pipeline = Pipeline(stages=indexers + encoders + [assembler, svm])

execution_times = []
start_time = time.time()

try:
    model = pipeline.fit(data)
except Exception as e:
    print("Error during pipeline fitting:", e)

predictions_svm = model.transform(data)
end_time = time.time()
execution_time = end_time - start_time

confusion_matrix = predictions_svm.groupBy('Reported by Authority', 'prediction').count().orderBy('Reported by Authority', 'prediction')
print("Confusion Matrix:")
confusion_matrix.show()

evaluator = BinaryClassificationEvaluator(labelCol='Reported by Authority', rawPredictionCol='prediction')
accuracy = evaluator.evaluate(predictions_svm)
print(f"Accuracy: {accuracy:.4f}")

execution_times.append(execution_time)
print(f"Execution Time: {execution_time} seconds")

evaluator_precision = MulticlassClassificationEvaluator(
    labelCol="Reported by Authority",
    predictionCol="prediction",
    metricName="weightedPrecision"
)
precision = evaluator_precision.evaluate(predictions_svm)
print(f"Precision: {precision:.4f}")

evaluator_recall = MulticlassClassificationEvaluator(
    labelCol="Reported by Authority",
    predictionCol="prediction",
    metricName="weightedRecall"
)
recall = evaluator_recall.evaluate(predictions_svm)
print(f"Recall: {recall:.4f}")

evaluator_f1 = MulticlassClassificationEvaluator(
    labelCol="Reported by Authority",
    predictionCol="prediction",
    metricName="f1"
)
f1_score = evaluator_f1.evaluate(predictions_svm)
print(f"F1 Score: {f1_score:.4f}")


Confusion Matrix:
+---------------------+----------+-----+
|Reported by Authority|prediction|count|
+---------------------+----------+-----+
|                    0|       0.0| 7722|
|                    0|       1.0|  152|
|                    1|       0.0|  274|
|                    1|       1.0| 1697|
+---------------------+----------+-----+

Accuracy: 0.9208
Execution Time: 24.58067297935486 seconds
Precision: 0.9561
Recall: 0.9567
F1 Score: 0.9562


6.MLP

In [21]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from sklearn.model_selection import train_test_split
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score
import numpy as np
import pandas as pd

spark = SparkSession.builder.appName("MLP_MoneyLaundering").getOrCreate()
data = spark.read.csv("/content/Big_Black_Money_Dataset.csv", header=True, inferSchema=True)
data = data.drop("features").dropna()
data = data.withColumn("Reported by Authority", col("Reported by Authority").cast("int"))

categorical_columns = ['Transaction Type', 'Person Involved', 'Industry',
                       'Destination Country', 'Source of Money', 'Financial Institution', 'Tax Haven Country']
numerical_columns = ['Amount (USD)', 'Shell Companies Involved']

indexers = [StringIndexer(inputCol=col, outputCol=col + "_index") for col in categorical_columns]
encoders = [OneHotEncoder(inputCol=col + "_index", outputCol=col + "_onehot") for col in categorical_columns]
assembler_input = numerical_columns + [col + "_onehot" for col in categorical_columns]
assembler = VectorAssembler(inputCols=assembler_input, outputCol='features')

from pyspark.ml import Pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler])
data_transformed = pipeline.fit(data).transform(data)

data_pandas = data_transformed.select("features", "Reported by Authority").toPandas()
X = np.array([row.features.toArray() for row in data_pandas.itertuples()])
y = np.array(data_pandas['Reported by Authority'])

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

mlp = MLPClassifier(hidden_layer_sizes=(64, 32), max_iter=500, random_state=42, solver='adam', alpha=0.001)
mlp.fit(X_train, y_train)
y_pred = mlp.predict(X_test)

print("Confusion Matrix:")
print(confusion_matrix(y_test, y_pred))
print(f"\nAccuracy: {accuracy_score(y_test, y_pred):.4f}")
print(f"Precision: {precision_score(y_test, y_pred):.4f}")
print(f"Recall: {recall_score(y_test, y_pred):.4f}")
print(f"F1 Score: {f1_score(y_test, y_pred):.4f}")


Confusion Matrix:
[[1564    1]
 [ 404    0]]

Accuracy: 0.7943
Precision: 0.0000
Recall: 0.0000
F1 Score: 0.0000
