In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import hour, dayofweek, when, count, concat, col, to_timestamp, lit

spark = SparkSession.builder \
    .appName("AML") \
    .config("spark.jars", "/home/kimhor/postgresql-42.7.6.jar") \
    .getOrCreate()

print(f"Spark version: {spark.version}")

25/06/12 12:32:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Spark version: 4.0.0


# Step 1: Load Dataset from PostgreSQL

In [3]:
# Database connection parameters
url = "jdbc:postgresql://localhost:5432/postgres"
properties = {
    "user": "postgres",
    "password": "0000",
    "driver": "org.postgresql.Driver"
}

table_name = "aml_transactions"

# Test connection and read data
try:
    df = spark.read.jdbc(url=url, table=table_name, properties=properties)
    print("Successfully connected to PostgreSQL!")
    print(f"Number of rows: {df.count()}")
    print("Schema:")
    df.printSchema()
except Exception as e:
    print(f"Error connecting to database: {e}")

Successfully connected to PostgreSQL!


[Stage 0:>                                                          (0 + 1) / 1]

Number of rows: 917077
Schema:
root
 |-- id: integer (nullable = true)
 |-- time: string (nullable = true)
 |-- date: string (nullable = true)
 |-- sender_account: string (nullable = true)
 |-- receiver_account: string (nullable = true)
 |-- amount: decimal(15,2) (nullable = true)
 |-- payment_currency: string (nullable = true)
 |-- received_currency: string (nullable = true)
 |-- sender_bank_location: string (nullable = true)
 |-- receiver_bank_location: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- is_laundering: integer (nullable = true)
 |-- laundering_type: string (nullable = true)



                                                                                

# Step 2: Data Exploration

In [4]:
df.createOrReplaceTempView("aml_transactions")
sql_query = "SELECT * FROM aml_transactions"

df = spark.sql(sql_query)

In [5]:
df.show(5)

[Stage 3:>                                                          (0 + 1) / 1]

+---+--------+----------+--------------+----------------+--------+----------------+-----------------+--------------------+----------------------+------------+-------------+--------------------+
| id|    time|      date|sender_account|receiver_account|  amount|payment_currency|received_currency|sender_bank_location|receiver_bank_location|payment_type|is_laundering|     laundering_type|
+---+--------+----------+--------------+----------------+--------+----------------+-----------------+--------------------+----------------------+------------+-------------+--------------------+
|  1|10:35:19|2022-10-07|    8724731955|      2769355426| 1459.15|       UK pounds|        UK pounds|                  UK|                    UK|Cash Deposit|            0|Normal_Cash_Deposits|
|  2|10:35:20|2022-10-07|    1491989064|      8401255335| 6019.64|       UK pounds|           Dirham|                  UK|                   UAE|Cross-border|            0|      Normal_Fan_Out|
|  3|10:35:20|2022-10-07|     

                                                                                

In [6]:
df.describe().show()

25/06/11 22:02:43 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 4:>                                                          (0 + 1) / 1]

+-------+-----------------+--------+----------+--------------------+--------------------+------------------+----------------+-----------------+--------------------+----------------------+------------+--------------------+--------------------+
|summary|               id|    time|      date|      sender_account|    receiver_account|            amount|payment_currency|received_currency|sender_bank_location|receiver_bank_location|payment_type|       is_laundering|     laundering_type|
+-------+-----------------+--------+----------+--------------------+--------------------+------------------+----------------+-----------------+--------------------+----------------------+------------+--------------------+--------------------+
|  count|           917077|  917077|    917077|              917077|              917077|            917077|          917077|           917077|              917077|                917077|      917077|              917077|              917077|
|   mean|         458539.0| 

                                                                                

In [7]:
# Check for null values in each column
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

[Stage 7:>                                                          (0 + 1) / 1]

+---+----+----+--------------+----------------+------+----------------+-----------------+--------------------+----------------------+------------+-------------+---------------+
| id|time|date|sender_account|receiver_account|amount|payment_currency|received_currency|sender_bank_location|receiver_bank_location|payment_type|is_laundering|laundering_type|
+---+----+----+--------------+----------------+------+----------------+-----------------+--------------------+----------------------+------------+-------------+---------------+
|  0|   0|   0|             0|               0|     0|               0|                0|                   0|                     0|           0|            0|              0|
+---+----+----+--------------+----------------+------+----------------+-----------------+--------------------+----------------------+------------+-------------+---------------+



                                                                                

In [8]:
# Check for duplicate rows
total_rows = df.count()
distinct_rows = df.distinct().count()
duplicate_count = total_rows - distinct_rows

print(f"Total rows: {total_rows}")
print(f"Distinct rows: {distinct_rows}")
print(f"Duplicate rows: {duplicate_count}")

[Stage 13:>                                                         (0 + 1) / 1]

Total rows: 917077
Distinct rows: 917077
Duplicate rows: 0


                                                                                

# Step 3: Feature Engineering

In [9]:
# Concatenate date and time columns and convert to datetime
df = df.withColumn("datetime", 
                   to_timestamp(concat(col("date"), lit(" "), col("time")), 
                               "yyyy-MM-dd HH:mm:ss"))

In [10]:
df.show(3)

[Stage 19:>                                                         (0 + 1) / 1]

+---+--------+----------+--------------+----------------+--------+----------------+-----------------+--------------------+----------------------+------------+-------------+--------------------+-------------------+
| id|    time|      date|sender_account|receiver_account|  amount|payment_currency|received_currency|sender_bank_location|receiver_bank_location|payment_type|is_laundering|     laundering_type|           datetime|
+---+--------+----------+--------------+----------------+--------+----------------+-----------------+--------------------+----------------------+------------+-------------+--------------------+-------------------+
|  1|10:35:19|2022-10-07|    8724731955|      2769355426| 1459.15|       UK pounds|        UK pounds|                  UK|                    UK|Cash Deposit|            0|Normal_Cash_Deposits|2022-10-07 10:35:19|
|  2|10:35:20|2022-10-07|    1491989064|      8401255335| 6019.64|       UK pounds|           Dirham|                  UK|                   UAE

                                                                                

In [11]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- time: string (nullable = true)
 |-- date: string (nullable = true)
 |-- sender_account: string (nullable = true)
 |-- receiver_account: string (nullable = true)
 |-- amount: decimal(15,2) (nullable = true)
 |-- payment_currency: string (nullable = true)
 |-- received_currency: string (nullable = true)
 |-- sender_bank_location: string (nullable = true)
 |-- receiver_bank_location: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- is_laundering: integer (nullable = true)
 |-- laundering_type: string (nullable = true)
 |-- datetime: timestamp (nullable = true)



In [12]:
# Extract hour from datetime
df = df.withColumn("hour", hour("datetime"))

# Extract day of the week using dayofweek function (Sunday = 1, Saturday = 7)
df = df.withColumn("dayofweek", dayofweek("datetime"))

In [13]:
df.show(3)

[Stage 20:>                                                         (0 + 1) / 1]

+---+--------+----------+--------------+----------------+--------+----------------+-----------------+--------------------+----------------------+------------+-------------+--------------------+-------------------+----+---------+
| id|    time|      date|sender_account|receiver_account|  amount|payment_currency|received_currency|sender_bank_location|receiver_bank_location|payment_type|is_laundering|     laundering_type|           datetime|hour|dayofweek|
+---+--------+----------+--------------+----------------+--------+----------------+-----------------+--------------------+----------------------+------------+-------------+--------------------+-------------------+----+---------+
|  1|10:35:19|2022-10-07|    8724731955|      2769355426| 1459.15|       UK pounds|        UK pounds|                  UK|                    UK|Cash Deposit|            0|Normal_Cash_Deposits|2022-10-07 10:35:19|  10|        6|
|  2|10:35:20|2022-10-07|    1491989064|      8401255335| 6019.64|       UK pounds| 

                                                                                

In [14]:
df = df.drop("id", "date", "time", "sender_account", "receiver_account", "datetime")

In [4]:
df = df.toPandas()

                                                                                

In [6]:
df["payment_type"].unique()

array(['Cash Deposit', 'Cross-border', 'Cheque', 'ACH', 'Credit card',
       'Debit card', 'Cash Withdrawal'], dtype=object)

In [16]:
from sklearn.preprocessing import RobustScaler
scaler = RobustScaler()
df['amount'] = df['amount'].astype(float)
df['amount'] = scaler.fit_transform(df[['amount']])

In [17]:
df['amount'].head()

0   -0.561957
1   -0.009364
2    0.997411
3    0.702551
4   -0.724797
Name: amount, dtype: float64

In [18]:
import pandas as pd

ct = pd.crosstab(df['laundering_type'], df['is_laundering'], dropna=False)
print(ct)

is_laundering                0    1
laundering_type                    
Behavioural_Change_1         0   55
Behavioural_Change_2         0   25
Bipartite                    0   17
Cash_Withdrawal              0  101
Cycle                        0   16
Deposit-Send                 0   85
Fan_In                       0   37
Fan_Out                      0   14
Gather-Scatter               0   25
Layered_Fan_In               0   29
Layered_Fan_Out              0   66
Normal_Cash_Deposits     21002    0
Normal_Cash_Withdrawal   29954    0
Normal_Fan_In           203801    0
Normal_Fan_Out          225391    0
Normal_Foward             4105    0
Normal_Group             51956    0
Normal_Mutual            11256    0
Normal_Periodical        19913    0
Normal_Plus_Mutual        5100    0
Normal_Small_Fan_Out    341727    0
Normal_single_large       2042    0
Over-Invoicing               0    4
Scatter-Gather               0   11
Single_large                 0   28
Smurfing                    

In [20]:
df.columns

Index(['amount', 'payment_currency', 'received_currency',
       'sender_bank_location', 'receiver_bank_location', 'payment_type',
       'is_laundering', 'laundering_type', 'hour', 'dayofweek'],
      dtype='object')

In [21]:
df['change_currency'] = (df['payment_currency'] != df['received_currency']).astype(int)
df['change_location'] = (df['sender_bank_location'] != df['sender_bank_location']).astype(int)

In [23]:
from sklearn.preprocessing import OneHotEncoder


# Your categorical columns
categorical_cols = [
    'payment_type'
]

encoder = OneHotEncoder(sparse_output=False)

encoded_features = encoder.fit_transform(df[categorical_cols])

encoded_df = pd.DataFrame(
    encoded_features,
    columns=encoder.get_feature_names_out(categorical_cols),
    index=df.index
)
print(f"Original df shape: {df.shape}")

df = df.drop(columns=categorical_cols)
df = df.drop(columns=['laundering_type', 'payment_currency', 'received_currency', 'sender_bank_location', 'receiver_bank_location'])
df = pd.concat([df, encoded_df], axis=1)

# Check the shape and columns
print(f"Encoded df shape: {encoded_df.shape}")
print(f"Final df shape: {df.shape}")


Original df shape: (917077, 12)
Encoded df shape: (917077, 7)
Final df shape: (917077, 13)


In [25]:
df.to_csv('cleaned_aml.csv', index=False)

In [70]:
# spark.stop()