In [None]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .appName("FraudDetection") \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/15 21:02:08 WARN Utils: Your hostname, NolanPC, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/08/15 21:02:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/15 21:02:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [7]:
df = spark.read.parquet("/home/nolan/fraud/data/transactions")
df.show(5)
df.describe().show()

+--------------------+---------+-------------+------+--------------------+--------+
|      transaction_id|  user_id|     merchant|amount|           timestamp|is_fraud|
+--------------------+---------+-------------+------+--------------------+--------+
|74256537-0937-434...|user_5703|     clothing| 12.97|2025-08-15 19:22:...|       0|
|ac3815a5-dd38-430...| user_501|  electronics|726.74|2025-08-15 19:22:...|       0|
|d7341b09-75f6-40a...|user_4684|subscriptions|  25.4|2025-08-15 19:22:...|       0|
|b1d6f587-18ae-49b...|user_6629|     clothing|253.08|2025-08-15 19:22:...|       0|
|95e01b16-8562-4e6...|user_1580|  electronics|212.71|2025-08-15 19:22:...|       0|
+--------------------+---------+-------------+------+--------------------+--------+
only showing top 5 rows




+-------+--------------------+---------+--------+-----------------+--------------------+
|summary|      transaction_id|  user_id|merchant|           amount|            is_fraud|
+-------+--------------------+---------+--------+-----------------+--------------------+
|  count|             3000000|  3000000| 3000000|          3000000|             3000000|
|   mean|                NULL|     NULL|    NULL|317.9378587246765|0.009986666666666666|
| stddev|                NULL|     NULL|    NULL|459.1957851287596| 0.09943307523757396|
|    min|0000024d-e931-4c1...|   user_1|clothing|              1.0|                   0|
|    max|fffffd3c-ae82-476...|user_9999|  travel|5863.141985133051|                   1|
+-------+--------------------+---------+--------+-----------------+--------------------+



                                                                                

In [9]:
from pyspark.sql.functions import log1p, col, lag, unix_timestamp, coalesce, lit, row_number, sum as spark_sum, avg, stddev
from pyspark.sql.window import Window

df = df.withColumn("log_amount", log1p(col("amount")))

window_user = Window.partitionBy("user_id").orderBy("timestamp")

df = df.withColumn("txn_index", row_number().over(window_user))

df = df.withColumn("user_cum_sum", spark_sum("amount").over(window_user))
df = df.withColumn("user_cum_avg", avg("amount").over(window_user))
df = df.withColumn("user_cum_std", stddev("amount").over(window_user))

df = df.withColumn("amount_to_avg_ratio", col("amount") / (col("user_cum_avg") + 1e-6))

df = df.withColumn("amount_x_txn_index", col("amount") * col("txn_index"))

df = df.withColumn("prev_ts", lag("timestamp").over(window_user))
df = df.withColumn("time_diff_sec", unix_timestamp(col("timestamp")) - unix_timestamp(col("prev_ts")))

df = df.withColumn("time_diff_sec", coalesce(col("time_diff_sec"), lit(0)))
df = df.withColumn("user_cum_std", coalesce(col("user_cum_std"), lit(0)))

df = df.withColumn("log_amount_x_time_diff", col("log_amount") * col("time_diff_sec"))

In [10]:
df.show()



+--------------------+---------+-------------+------------------+--------------------+--------+------------------+---------+------------------+------------------+------------------+-------------------+------------------+--------------------+-------------+----------------------+
|      transaction_id|  user_id|     merchant|            amount|           timestamp|is_fraud|        log_amount|txn_index|      user_cum_sum|      user_cum_avg|      user_cum_std|amount_to_avg_ratio|amount_x_txn_index|             prev_ts|time_diff_sec|log_amount_x_time_diff|
+--------------------+---------+-------------+------------------+--------------------+--------+------------------+---------+------------------+------------------+------------------+-------------------+------------------+--------------------+-------------+----------------------+
|39fd49af-768a-4e5...|user_1009|       travel|            733.24|2025-08-15 18:14:...|       0|6.5988359506465235|        1|            733.24|            733.24| 

                                                                                

In [11]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

indexer = StringIndexer(inputCol="merchant", outputCol="merchant_index")

encoder = OneHotEncoder(inputCol="merchant_index", outputCol="merchant_ohe")

pipeline = Pipeline(stages=[indexer, encoder])

model = pipeline.fit(df)

df_encoded = model.transform(df)

df_encoded.show(5)



+--------------------+---------+--------+-------+--------------------+--------+------------------+---------+------------+-----------------+------------------+-------------------+------------------+--------------------+-------------+----------------------+--------------+-------------+
|      transaction_id|  user_id|merchant| amount|           timestamp|is_fraud|        log_amount|txn_index|user_cum_sum|     user_cum_avg|      user_cum_std|amount_to_avg_ratio|amount_x_txn_index|             prev_ts|time_diff_sec|log_amount_x_time_diff|merchant_index| merchant_ohe|
+--------------------+---------+--------+-------+--------------------+--------+------------------+---------+------------+-----------------+------------------+-------------------+------------------+--------------------+-------------+----------------------+--------------+-------------+
|39fd49af-768a-4e5...|user_1009|  travel| 733.24|2025-08-15 18:14:...|       0|6.5988359506465235|        1|      733.24|           733.24|      

                                                                                

In [13]:
df_encoded.write.mode("overwrite") \
    .partitionBy("user_id") \
    .parquet("/home/nolan/fraud/data/processed")

25/08/15 21:40:09 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/08/15 21:40:09 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/08/15 21:40:09 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
25/08/15 21:40:09 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
25/08/15 21:40:09 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
25/08/15 21:40:09 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 58.46% for 13 writers
25/08/15 21:40:09 WARN MemoryManager: Total allocation exceeds 95.