In [1]:
#spark_session
from pyspark.sql import SparkSession
spark_session =  SparkSession.builder\
    .master('local[*]')\
    .config("spark.driver.memory", "16g")\
    .config("spark.executor.memory", "16g") \
    .config("spark.memory.fraction", "0.8") \
    .appName('AML_project')\
    .getOrCreate()

In [2]:
df = spark_session.read.csv("../Data/SAML-D.csv",header=True)
df.show()

+--------+----------+--------------+----------------+--------+----------------+-----------------+--------------------+----------------------+------------+-------------+--------------------+
|    Time|      Date|Sender_account|Receiver_account|  Amount|Payment_currency|Received_currency|Sender_bank_location|Receiver_bank_location|Payment_type|Is_laundering|     Laundering_type|
+--------+----------+--------------+----------------+--------+----------------+-----------------+--------------------+----------------------+------------+-------------+--------------------+
|10:35:19|2022-10-07|    8724731955|      2769355426| 1459.15|       UK pounds|        UK pounds|                  UK|                    UK|Cash Deposit|            0|Normal_Cash_Deposits|
|10:35:20|2022-10-07|    1491989064|      8401255335| 6019.64|       UK pounds|           Dirham|                  UK|                   UAE|Cross-border|            0|      Normal_Fan_Out|
|10:35:20|2022-10-07|     287305149|      44047670

In [3]:
from pyspark.sql.functions import year, month, dayofmonth, to_date

df = df.withColumn("Date", to_date(df["Date"], "yyyy-MM-dd"))

df = df.withColumn("year", year(df["Date"])) \
       .withColumn("month", month(df["Date"])) \
       .withColumn("day", dayofmonth(df["Date"]))


In [4]:
from pyspark.sql.functions import to_timestamp,hour,minute,second 

df = df.withColumn("Time", to_timestamp(df["Time"], "HH:mm:ss"))

df = df.withColumn("hour", hour(df["Time"])) \
       .withColumn("minute", minute(df["Time"])) \
       .withColumn("second", second(df["Time"]))


In [5]:
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
print(f"Training Set: {train_df.count()} rows")
print(f"Testing Set: {test_df.count()} rows")

Training Set: 7603866 rows
Testing Set: 1900986 rows


In [6]:
from pyspark.sql.functions import col
numeric_cols = ['Is_laundering']

for col_name in numeric_cols:
    train_df = train_df.withColumn(col_name, col(col_name).cast("double"))
    test_df = test_df.withColumn(col_name, col(col_name).cast("double"))

In [7]:
train_df = train_df.limit(603866)

In [8]:
drop_cols = ['Date', 'Time']
train_df = train_df.drop(*drop_cols)
test_df = test_df.drop(*drop_cols)

In [9]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler
from xgboost.spark import SparkXGBClassifier
from pyspark.ml import Pipeline
# Categorical and numerical columns
categorical_cols = ['Payment_currency', 'Received_currency', 'Sender_bank_location',
                    'Receiver_bank_location', 'Payment_type', 'Laundering_type']
numeric_cols = ['Sender_account', 'Receiver_account', 'Amount', 'year', 'month', 'day', 'hour', 'minute', 'second','Is_laundering']
scaled_cols = ['Amount', 'year', 'month', 'day', 'hour', 'minute', 'second']

# String Indexing for categorical columns
indexers = [StringIndexer(inputCol=col, outputCol=col + "_index", handleInvalid="keep") for col in categorical_cols]

# Casting numeric columns to double
train_df = train_df.select([col(c).cast("double").alias(c) for c in numeric_cols] + categorical_cols)
test_df = test_df.select([col(c).cast("double").alias(c) for c in numeric_cols] + categorical_cols)

# Handling class imbalance using oversampling
fraud_cases = train_df.filter(col("Is_laundering") == 1)
non_fraud_cases = train_df.filter(col("Is_laundering") == 0)
fraud_count = fraud_cases.count()
non_fraud_count = non_fraud_cases.count()

if fraud_count < non_fraud_count:
    fraud_cases = fraud_cases.sample(withReplacement=True, fraction=non_fraud_count / fraud_count, seed=42)
elif non_fraud_count < fraud_count:
    non_fraud_cases = non_fraud_cases.sample(withReplacement=True, fraction=fraud_count / non_fraud_count, seed=42)
# Ensuring class balance
train_df = fraud_cases.union(non_fraud_cases)
# Assembling features excluding 'Is_laundering'
assembler = VectorAssembler(inputCols=[c for c in numeric_cols if c != 'Is_laundering'] + [c + "_index" for c in categorical_cols], outputCol="features")
# Scaling numeric columns
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
# Defining Random Forest model
xgb = SparkXGBClassifier(features_col="scaled_features", label_col="Is_laundering",
                         num_workers=2)
# Creating pipeline
pipeline = Pipeline(stages=indexers + [assembler, scaler, xgb])

In [10]:
SparkXGBClassifier()

SparkXGBClassifier_ad98ea9f4f20

In [11]:
model = pipeline.fit(train_df)
# Predictions
predictions = model.transform(test_df)
# Show results
predictions.select("Is_laundering", "prediction", "probability").show()

2025-02-27 21:09:06,947 INFO XGBoost-PySpark: _fit Running xgboost-2.1.4 on 2 workers with
	booster params: {'objective': 'binary:logistic', 'device': 'cpu', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.scheduler.BarrierJobUnsupportedRDDChainException: [SPARK-24820][SPARK-24821]: Barrier execution mode does not allow the following pattern of RDD chain within a barrier stage:
1. Ancestor RDDs that have different number of partitions from the resulting RDD (e.g. union()/coalesce()/first()/take()/PartitionPruningRDD). A workaround for first()/take() can be barrierRdd.collect().head (scala) or barrierRdd.collect()[0] (python).
2. An RDD that depends on multiple barrier RDDs (e.g. barrierRdd1.zip(barrierRdd2)).
	at org.apache.spark.errors.SparkCoreErrors$.barrierStageWithRDDChainPatternError(SparkCoreErrors.scala:225)
	at org.apache.spark.scheduler.DAGScheduler.checkBarrierStageWithRDDChainPattern(DAGScheduler.scala:491)
	at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:640)
	at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1284)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3003)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:195)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)
