# Data Preprocessing Stage 1
This notebook will focus on merging the main dataset and handling missing values in the given data.

In [1]:
# Initialise a spark session
import pandas as pd
from collections import Counter
import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as F


spark = (
    SparkSession.builder.appName("Data Processing")
    .config("spark.sql.repl.eagerEval.enabled", True)
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.driver.memory", "8g")  # Increase driver memory
    .config("spark.executor.memory", "8g")  # Increase executor memory
    .config("spark.executor.instances", "4")  # Increase the number of executor instances
    .config("spark.driver.maxResultSize", "2g")
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()
)

24/09/15 17:32:35 WARN Utils: Your hostname, Melissas-MacBook-Pro-2.local resolves to a loopback address: 127.0.0.1; using 192.168.0.3 instead (on interface en0)
24/09/15 17:32:35 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/15 17:32:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Reas Data Files
cons_fraud_prob = spark.read.csv('../data/tables/consumer_fraud_probability.csv', header=True, inferSchema=True)
cons_user_details = spark.read.parquet('../data/tables/consumer_user_details.parquet')

merch_fraud_prob = spark.read.csv('../data/tables/merchant_fraud_probability.csv', header=True, inferSchema=True)

tbl_cons = spark.read.csv('../data/tables/tbl_consumer.csv', sep='|', header=True, inferSchema=True)
tbl_merch = spark.read.parquet('../data/tables/tbl_merchants.parquet')



## Merging Merchant Data and Consumer Data

In [3]:
# Perform the join on the 'merchant_abn' column
# Using left join to retain all rows

# Merging Merchant Data and Merchant Fraud Probability
merchant_merged = tbl_merch.join(merch_fraud_prob, tbl_merch.merchant_abn == merch_fraud_prob.merchant_abn, "left")

merchant_merged = merchant_merged.drop(merch_fraud_prob.merchant_abn)

# Show the merged DataFrame
merchant_merged.show()

+--------------------+--------------------+------------+--------------+-----------------+
|                name|                tags|merchant_abn|order_datetime|fraud_probability|
+--------------------+--------------------+------------+--------------+-----------------+
|       Felis Limited|((furniture, home...| 10023283211|          NULL|             NULL|
|Arcu Ac Orci Corp...|([cable, satellit...| 10142254217|          NULL|             NULL|
|    Nunc Sed Company|([jewelry, watch,...| 10165489824|          NULL|             NULL|
|Ultricies Digniss...|([wAtch, clock, a...| 10187291046|          NULL|             NULL|
| Enim Condimentum PC|([music shops - m...| 10192359162|          NULL|             NULL|
|       Fusce Company|[(gift, card, nov...| 10206519221|          NULL|             NULL|
|Aliquam Enim Inco...|[(computers, comP...| 10255988167|          NULL|             NULL|
|    Ipsum Primis Ltd|[[watch, clock, a...| 10264435225|          NULL|             NULL|
|Pede Ultr

In [4]:
# Show null value count at this stage
null_merchant_merged = merchant_merged.agg(
    *(F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c) for c in merchant_merged.columns)
)

null_merchant_merged.show()

+----+----+------------+--------------+-----------------+
|name|tags|merchant_abn|order_datetime|fraud_probability|
+----+----+------------+--------------+-----------------+
|   0|   0|           0|          3978|             3978|
+----+----+------------+--------------+-----------------+



In [5]:
# Merging Consumer Information and Consumer Fraud Probability
# Using left join to retain all rows

consumer_merged = cons_user_details.join(cons_fraud_prob, cons_user_details.user_id == cons_fraud_prob.user_id, "left")
consumer_merged = consumer_merged.drop(cons_fraud_prob.user_id)

consumer_merged = consumer_merged.join(tbl_cons, consumer_merged.consumer_id == tbl_cons.consumer_id, "left")
consumer_merged = consumer_merged.drop(tbl_cons.consumer_id)

consumer_merged.show(5)

+-------+-----------+--------------+------------------+----------------+--------------------+-----+--------+------+
|user_id|consumer_id|order_datetime| fraud_probability|            name|             address|state|postcode|gender|
+-------+-----------+--------------+------------------+----------------+--------------------+-----+--------+------+
|      4|     154128|    2021-10-09| 9.633302411090419| Lindsay Jimenez|00653 Davenport C...|  NSW|    2780|Female|
|      2|     179208|    2021-08-30| 9.599513915425788|      Mary Smith|     3764 Amber Oval|  NSW|    2782|Female|
|      2|     179208|    2021-09-25|10.069850934775245|      Mary Smith|     3764 Amber Oval|  NSW|    2782|Female|
|      3|    1194530|    2021-11-03| 8.300636455314633|   Jill Jones MD|  40693 Henry Greens|   NT|     862|Female|
|      1|    1195503|    2022-02-20| 9.805431136520959|Yolanda Williams|413 Haney Gardens...|   WA|    6935|Female|
+-------+-----------+--------------+------------------+----------------+

In [6]:
consumer_merged.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- consumer_id: long (nullable = true)
 |-- order_datetime: date (nullable = true)
 |-- fraud_probability: double (nullable = true)
 |-- name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postcode: integer (nullable = true)
 |-- gender: string (nullable = true)



In [7]:
# Show null count at this stage
null_consumer_merged = consumer_merged.agg(
    *(F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c) for c in consumer_merged.columns)
)

# Show the result
null_consumer_merged.show()

+-------+-----------+--------------+-----------------+----+-------+-----+--------+------+
|user_id|consumer_id|order_datetime|fraud_probability|name|address|state|postcode|gender|
+-------+-----------+--------------+-----------------+----+-------+-----+--------+------+
|      0|          0|        479871|           479871|   0|      0|    0|       0|     0|
+-------+-----------+--------------+-----------------+----+-------+-----+--------+------+



## Reading Transaction Data

In [8]:
# Import necessary libraries
import os

# Base directory where transaction folders are stored
base_directories = [
    "../data/tables/transactions_20210228_20210827_snapshot",
    "../data/tables/transactions_20210828_20220227_snapshot",
    "../data/tables/transactions_20220228_20220828_snapshot"
]

# Initialize an empty DataFrame to hold all transactions
all_transactions = None

# Loop through each base directory and read the partitioned Parquet files
for base_dir in base_directories:
    # Read the entire directory as partitioned data
    df = spark.read.option("mergeSchema", "true").parquet(base_dir)
    
    # Union the data with the master DataFrame
    if all_transactions is None:
        all_transactions = df
    else:
        all_transactions = all_transactions.union(df)

# Show the schema or a few rows of the resulting DataFrame to verify
all_transactions.printSchema()
all_transactions.show(5)


root
 |-- user_id: long (nullable = true)
 |-- merchant_abn: long (nullable = true)
 |-- dollar_value: double (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_datetime: date (nullable = true)

+-------+------------+------------------+--------------------+--------------+
|user_id|merchant_abn|      dollar_value|            order_id|order_datetime|
+-------+------------+------------------+--------------------+--------------+
|  18478| 62191208634|63.255848959735246|949a63c8-29f7-4ab...|    2021-08-20|
|      2| 15549624934| 130.3505283105634|6a84c3cf-612a-457...|    2021-08-20|
|  18479| 64403598239|120.15860593212783|b10dcc33-e53f-425...|    2021-08-20|
|      3| 60956456424| 136.6785200286976|0f09c5a5-784e-447...|    2021-08-20|
|  18479| 94493496784| 72.96316578355305|f6c78c1a-4600-4c5...|    2021-08-20|
+-------+------------+------------------+--------------------+--------------+
only showing top 5 rows



In [9]:
# Show Null Count
null_transaction = all_transactions.agg(
    *(F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c) for c in all_transactions.columns)
)

# Show the result
null_transaction.show()



+-------+------------+------------+--------+--------------+
|user_id|merchant_abn|dollar_value|order_id|order_datetime|
+-------+------------+------------+--------+--------------+
|      0|           0|           0|       0|             0|
+-------+------------+------------+--------+--------------+



                                                                                

## Merging Consumer, Merchant, and Transaction Data

In [10]:
merged_all = all_transactions.join(
    merchant_merged, 
    (all_transactions["merchant_abn"] == merchant_merged["merchant_abn"]) & 
    (all_transactions["order_datetime"] == merchant_merged["order_datetime"]),
    "left"
)

merged_all = merged_all.drop(merchant_merged.order_datetime)
merged_all = merged_all.drop(merchant_merged.merchant_abn)


# Rename duplicated columns from merchant_merged
merged_all = merged_all.withColumnRenamed("fraud_probability", "fraud_probability_merchant") \
                       .withColumnRenamed("name", "name_merchant")

merged_all = merged_all.join(
    consumer_merged, 
    (merged_all["order_datetime"] == consumer_merged["order_datetime"]) & 
    (merged_all["user_id"] == consumer_merged["user_id"]),
    "left"
)

merged_all = merged_all.drop(consumer_merged.order_datetime)
merged_all = merged_all.drop(consumer_merged.user_id)

# Rename duplicated columns from consumer_merged
merged_all = merged_all.withColumnRenamed("fraud_probability", "fraud_probability_consumer") \
                       .withColumnRenamed("name", "name_consumer") \
                       .withColumnRenamed("address", "address_consumer") \
                       .withColumnRenamed("state", "state_consumer") \
                       .withColumnRenamed("postcode", "postcode_consumer") \
                       .withColumnRenamed("gender", "gender_consumer")

merged_all.show(5)

+-------+------------+------------------+--------------------+--------------+-------------+----+--------------------------+-----------+--------------------------+-------------+----------------+--------------+-----------------+---------------+
|user_id|merchant_abn|      dollar_value|            order_id|order_datetime|name_merchant|tags|fraud_probability_merchant|consumer_id|fraud_probability_consumer|name_consumer|address_consumer|state_consumer|postcode_consumer|gender_consumer|
+-------+------------+------------------+--------------------+--------------+-------------+----+--------------------------+-----------+--------------------------+-------------+----------------+--------------+-----------------+---------------+
|      2| 15549624934| 130.3505283105634|6a84c3cf-612a-457...|    2021-08-20|         NULL|NULL|                      NULL|       NULL|                      NULL|         NULL|            NULL|          NULL|             NULL|           NULL|
|      3| 76819856970|  448.

### Completing Merchant and Consumer Information based on Given Data

In [11]:
from pyspark.sql.functions import coalesce

# Rename columns in merchant_merged to avoid ambiguity
merchant_renamed = merchant_merged.withColumnRenamed("name", "merchant_name") \
                                  .withColumnRenamed("tags", "merchant_tags") \
                                  .withColumnRenamed("fraud_probability", "merchant_fraud_probability")

# Rrename columns in merged_all
merged_all = merged_all.withColumnRenamed("name_merchant", "existing_name_merchant") \
                       .withColumnRenamed("tags", "existing_tags") \
                       .withColumnRenamed("fraud_probability_merchant", "existing_fraud_probability_merchant")

# Left join merged_all with renamed merchant_merged on merchant_abn
merged_all = merged_all.join(
    merchant_renamed, 
    on="merchant_abn", 
    how="left"
)

# Drop the duplicate order_datetime column from the joined merchant_renamed
merged_all = merged_all.drop(merchant_renamed["order_datetime"])

# Fill the null values in the columns from merged_all with values from renamed merchant_merged
merged_all = merged_all.withColumn(
    "name_merchant", coalesce(merged_all["existing_name_merchant"], merged_all["merchant_name"])
).withColumn(
    "tags", coalesce(merged_all["existing_tags"], merged_all["merchant_tags"])
).withColumn(
    "fraud_probability_merchant", coalesce(merged_all["existing_fraud_probability_merchant"], merged_all["merchant_fraud_probability"])
)

# Drop the extra columns from the join if needed
merged_all = merged_all.drop("merchant_name", "merchant_tags", "merchant_fraud_probability", "existing_name_merchant", "existing_tags", "existing_fraud_probability_merchant")

column_to_drop = merged_all.columns[-4]
merged_all = merged_all.drop(column_to_drop)

merged_all.show(5)


+------------+-------+------------------+--------------------+-----------+--------------------------+-------------+----------------+--------------+-----------------+---------------+--------------------+--------------------+--------------------------+
|merchant_abn|user_id|      dollar_value|            order_id|consumer_id|fraud_probability_consumer|name_consumer|address_consumer|state_consumer|postcode_consumer|gender_consumer|       name_merchant|                tags|fraud_probability_merchant|
+------------+-------+------------------+--------------------+-----------+--------------------------+-------------+----------------+--------------+-----------------+---------------+--------------------+--------------------+--------------------------+
| 15549624934|      2| 130.3505283105634|6a84c3cf-612a-457...|       NULL|                      NULL|         NULL|            NULL|          NULL|             NULL|           NULL|  Commodo Associates|[(opticians, optI...|                      NU

In [12]:
# Alias the consumer_merged DataFrame to avoid column name conflicts
consumer_renamed = consumer_merged.withColumnRenamed("fraud_probability", "consumer_fraud_probability") \
                                  .withColumnRenamed("consumer_id", "consumer_id_renamed") \
                                  .withColumnRenamed("name", "name_consumer_renamed") \
                                  .withColumnRenamed("address", "address_consumer_renamed") \
                                  .withColumnRenamed("state", "state_consumer_renamed") \
                                  .withColumnRenamed("postcode", "postcode_consumer_renamed") \
                                  .withColumnRenamed("gender", "gender_consumer_renamed")

# Left join merged_all with renamed consumer_merged on user_id
merged_all = merged_all.join(
    consumer_renamed, 
    on="user_id", 
    how="left"
)

# Fill the null values in the columns from merged_all with values from consumer_renamed
merged_all = merged_all.withColumn(
    "fraud_probability_consumer", coalesce(merged_all["fraud_probability_consumer"], merged_all["consumer_fraud_probability"])
).withColumn(
    "consumer_id", coalesce(merged_all["consumer_id"], merged_all["consumer_id_renamed"])
).withColumn(
    "name_consumer", coalesce(merged_all["name_consumer"], merged_all["name_consumer_renamed"])
).withColumn(
    "address_consumer", coalesce(merged_all["address_consumer"], merged_all["address_consumer_renamed"])
).withColumn(
    "state_consumer", coalesce(merged_all["state_consumer"], merged_all["state_consumer_renamed"])
).withColumn(
    "postcode_consumer", coalesce(merged_all["postcode_consumer"], merged_all["postcode_consumer_renamed"])
).withColumn(
    "gender_consumer", coalesce(merged_all["gender_consumer"], merged_all["gender_consumer_renamed"])
)

# Drop the extra columns from the join if needed
merged_all = merged_all.drop(
    "consumer_fraud_probability", "consumer_id_renamed", 
    "name_consumer_renamed", "address_consumer_renamed", 
    "state_consumer_renamed", "postcode_consumer_renamed", 
    "gender_consumer_renamed"
)

merged_all.show(5)


+-------+------------+------------------+--------------------+-----------+--------------------------+--------------+--------------------+--------------+-----------------+---------------+--------------------+--------------------+--------------------------+--------------+
|user_id|merchant_abn|      dollar_value|            order_id|consumer_id|fraud_probability_consumer| name_consumer|    address_consumer|state_consumer|postcode_consumer|gender_consumer|       name_merchant|                tags|fraud_probability_merchant|order_datetime|
+-------+------------+------------------+--------------------+-----------+--------------------------+--------------+--------------------+--------------+-----------------+---------------+--------------------+--------------------+--------------------------+--------------+
|  18478| 62191208634|63.255848959735246|949a63c8-29f7-4ab...|     651338|        20.213514250568863|   James Smith|27393 Wiley Lane ...|           TAS|             7001|           Male|C

In [13]:
# Check for duplicates by grouping by order_id and counting
duplicate_orders = merged_all.groupBy("order_id").count().filter(F.col("count") > 1)

# Remove duplicates by keeping the first occurrence
merged_all = merged_all.dropDuplicates(["order_id"])

## Checking Null/Invalid Values

In [14]:
null_merged_all = merged_all.agg(
    *(F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c) for c in merged_all.columns)
)

null_merged_all.show()

24/09/15 17:32:48 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/09/15 17:32:50 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors

+-------+------------+------------+--------+-----------+--------------------------+-------------+----------------+--------------+-----------------+---------------+-------------+------+--------------------------+--------------+
|user_id|merchant_abn|dollar_value|order_id|consumer_id|fraud_probability_consumer|name_consumer|address_consumer|state_consumer|postcode_consumer|gender_consumer|name_merchant|  tags|fraud_probability_merchant|order_datetime|
+-------+------------+------------+--------+-----------+--------------------------+-------------+----------------+--------------+-----------------+---------------+-------------+------+--------------------------+--------------+
|      0|           0|           0|       0|          0|                   2328724|            0|               0|             0|                0|              0|       580830|580830|                  13611959|       2328724|
+-------+------------+------------+--------+-----------+--------------------------+---------

                                                                                

There seems to be a problem in merging, as the name_merchant and tags are not supposed to be null. Reperform join especially on rows with null values.

In [15]:
null_merchant_rows = merged_all.filter(
    (merged_all["name_merchant"].isNull()) & (merged_all["tags"].isNull()))

null_merchant_rows.show(5)



+-------+------------+------------------+--------------------+-----------+--------------------------+------------------+--------------------+--------------+-----------------+---------------+-------------+----+--------------------------+--------------+
|user_id|merchant_abn|      dollar_value|            order_id|consumer_id|fraud_probability_consumer|     name_consumer|    address_consumer|state_consumer|postcode_consumer|gender_consumer|name_merchant|tags|fraud_probability_merchant|order_datetime|
+-------+------------+------------------+--------------------+-----------+--------------------------+------------------+--------------------+--------------+-----------------+---------------+-------------+----+--------------------------+--------------+
|   2438| 11240426404|161.65965550330247|000334fe-7ab8-4e7...|     237010|         9.124030652870667|       Cindy Ortiz|957 Richards Terrace|            WA|             6014|         Female|         NULL|NULL|                      NULL|    2021

                                                                                

In [16]:
# Test to see if merchant abn is present in existing data
merchant_merged.filter(merchant_merged["merchant_abn"] == '32234779638').show()

+----+----+------------+--------------+-----------------+
|name|tags|merchant_abn|order_datetime|fraud_probability|
+----+----+------------+--------------+-----------------+
+----+----+------------+--------------+-----------------+



It seems that these missing values are due to unseen merchant abn data. Since the merchant name and tag is unknown, default unique values will be assigned.

In [17]:
from pyspark.sql.functions import col, row_number, lit, when, concat
from pyspark.sql.window import Window

window_spec = Window.orderBy("merchant_abn")

# Generate unique default names like "Unknown Merchant 1", "Unknown Merchant 2", etc.
null_merchant_rows = null_merchant_rows.withColumn(
    "name_merchant_default", concat(lit("Unknown Merchant "), row_number().over(window_spec))
).withColumn(
    "tags_default", concat(lit("Unknown Tag "), row_number().over(window_spec))
)

In [18]:
null_merchant_rows = null_merchant_rows.drop("tags", "name_merchant") \
    .withColumnRenamed("tags_default", "tags") \
    .withColumnRenamed("name_merchant_default", "name_merchant")

In [19]:
no_null_merch_row =  merged_all.filter(
    (merged_all["name_merchant"].isNotNull()) & (merged_all["tags"].isNotNull()))

merged_all = no_null_merch_row.unionByName(null_merchant_rows)

In [20]:
# Recheck nulls
null_merged_all = merged_all.agg(
    *(F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c) for c in merged_all.columns)
)

null_merged_all.show()

24/09/15 17:33:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:33:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:33:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:33:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:33:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:33:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 1

+-------+------------+------------+--------+-----------+--------------------------+-------------+----------------+--------------+-----------------+---------------+-------------+----+--------------------------+--------------+
|user_id|merchant_abn|dollar_value|order_id|consumer_id|fraud_probability_consumer|name_consumer|address_consumer|state_consumer|postcode_consumer|gender_consumer|name_merchant|tags|fraud_probability_merchant|order_datetime|
+-------+------------+------------+--------+-----------+--------------------------+-------------+----------------+--------------+-----------------+---------------+-------------+----+--------------------------+--------------+
|      0|           0|           0|       0|          0|                   2328724|            0|               0|             0|                0|              0|            0|   0|                  13611959|       2328724|
+-------+------------+------------+--------+-----------+--------------------------+-------------+---

                                                                                

## ML Imputation for Missing Values

In [21]:
# Filter for only complete rows to train ML model for imputation
merged_all_cleaned = merged_all.dropna()

In [22]:
merged_all_cleaned.show(5)

24/09/15 17:34:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:34:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:34:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:34:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:34:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:34:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 1

+-------+------------+------------------+--------------------+-----------+--------------------------+----------------+--------------------+--------------+-----------------+---------------+--------------------+--------------------+--------------------------+--------------+
|user_id|merchant_abn|      dollar_value|            order_id|consumer_id|fraud_probability_consumer|   name_consumer|    address_consumer|state_consumer|postcode_consumer|gender_consumer|       name_merchant|                tags|fraud_probability_merchant|order_datetime|
+-------+------------+------------------+--------------------+-----------+--------------------------+----------------+--------------------+--------------+-----------------+---------------+--------------------+--------------------+--------------------------+--------------+
|  20501| 94493496784|257.30994156137365|0002e45b-60ca-4a3...|     587288|         8.536066314972981|       Donna Lee| 37294 Vaughan Plaza|           NSW|             2546|         

                                                                                

In [23]:
from pyspark.ml.feature import StringIndexer

categorical_cols = ["state_consumer", "gender_consumer"]

indexers = []

# Loop through each categorical column and create a StringIndexer
for col in categorical_cols:
    indexer = StringIndexer(inputCol=col, outputCol=f"{col}_index", handleInvalid="keep")  # 'keep' allows unseen labels
    indexers.append(indexer)

# Apply the indexers to the dataset using a Pipeline
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=indexers)

# Fit and transform the data
merged_all_encoded = pipeline.fit(merged_all_cleaned).transform(merged_all_cleaned)

# Show the encoded columns
merged_all_encoded.select([f"{col}_index" for col in categorical_cols]).show(5)


24/09/15 17:35:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:35:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:35:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:35:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:35:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:35:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 1

+--------------------+---------------------+
|state_consumer_index|gender_consumer_index|
+--------------------+---------------------+
|                 0.0|                  1.0|
|                 0.0|                  1.0|
|                 1.0|                  0.0|
|                 4.0|                  0.0|
|                 2.0|                  0.0|
+--------------------+---------------------+
only showing top 5 rows



                                                                                

In [24]:
merged_all_encoded.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- merchant_abn: long (nullable = true)
 |-- dollar_value: double (nullable = true)
 |-- order_id: string (nullable = true)
 |-- consumer_id: long (nullable = true)
 |-- fraud_probability_consumer: double (nullable = true)
 |-- name_consumer: string (nullable = true)
 |-- address_consumer: string (nullable = true)
 |-- state_consumer: string (nullable = true)
 |-- postcode_consumer: integer (nullable = true)
 |-- gender_consumer: string (nullable = true)
 |-- name_merchant: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- fraud_probability_merchant: double (nullable = true)
 |-- order_datetime: date (nullable = true)
 |-- state_consumer_index: double (nullable = false)
 |-- gender_consumer_index: double (nullable = false)



In [25]:
from pyspark.ml.feature import VectorAssembler

# List of input columns to include in the feature vector
input_cols = [
    "user_id", "merchant_abn", "dollar_value",
    "state_consumer_index", "gender_consumer_index"
]
    

# Assemble the feature columns into a single vector
assembler = VectorAssembler(inputCols=input_cols, outputCol="features")

# Transform the data to create the feature vector
merged_all_encoded = assembler.transform(merged_all_encoded)

# Show the resulting features
merged_all_encoded.select("features").show(5, truncate=False)


24/09/15 17:37:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:37:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:37:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:37:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:37:21 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:37:21 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 1

+----------------------------------------------------+
|features                                            |
+----------------------------------------------------+
|[20501.0,9.4493496784E10,257.30994156137365,0.0,1.0]|
|[7626.0,5.0315283629E10,366.14616684135876,0.0,1.0] |
|[1921.0,1.8158387243E10,549.2874391432783,1.0,0.0]  |
|[6149.0,5.0315283629E10,295.547844260679,4.0,0.0]   |
|[17487.0,7.676726614E10,191.36695168740195,2.0,0.0] |
+----------------------------------------------------+
only showing top 5 rows



                                                                                

### Training Consumer Fraud Probability Model
Imputed by Random Forest Regressor Model.

In [26]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline

# Set up the RandomForestRegressor for fraud_probability_consumer
rf_consumer_fraud = RandomForestRegressor(labelCol="fraud_probability_consumer", featuresCol="features", maxBins=50)

train_data_fraud_consumer, test_data_fraud_consumer = merged_all_encoded.randomSplit([0.8, 0.2], seed=42)

pipeline_fraud_consumer = Pipeline(stages=[rf_consumer_fraud])

model_fraud_consumer = pipeline_fraud_consumer.fit(train_data_fraud_consumer)

24/09/15 17:38:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:38:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:38:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:38:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:38:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:38:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 1

In [27]:
from pyspark.ml.evaluation import RegressionEvaluator

# Make predictions on the test set
predictions_fraud_consumer = model_fraud_consumer.transform(test_data_fraud_consumer)

# Initialize the regression evaluator
evaluator = RegressionEvaluator(labelCol="fraud_probability_consumer", predictionCol="prediction")

r2 = evaluator.evaluate(predictions_fraud_consumer, {evaluator.metricName: "r2"})
print(f"R-squared (R²): {r2}")

24/09/15 17:39:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:39:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:39:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:39:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:39:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:39:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 1

R-squared (R²): 0.005317415863837716


                                                                                

### Training Merchant Fraud Probability Model
Using random forest regressor model.

In [28]:
rf_merchant_fraud = RandomForestRegressor(labelCol="fraud_probability_merchant", featuresCol="features", maxBins=50)

train_data_fraud_merchant, test_data_fraud_merchant = merged_all_encoded.randomSplit([0.8, 0.2], seed=42)

pipeline_fraud_merchant = Pipeline(stages=[rf_merchant_fraud])

model_fraud_merchant = pipeline_fraud_merchant.fit(train_data_fraud_merchant)

24/09/15 17:40:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:40:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:40:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:40:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:40:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:40:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 1

In [29]:
predictions_fraud_merchant = model_fraud_merchant.transform(test_data_fraud_merchant)

# Initialize the regression evaluator
evaluator = RegressionEvaluator(labelCol="fraud_probability_merchant", predictionCol="prediction")

r2 = evaluator.evaluate(predictions_fraud_merchant, {evaluator.metricName: "r2"})
print(f"R-squared (R²): {r2}")

24/09/15 17:41:42 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:41:42 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:41:42 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:41:42 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:41:42 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:41:42 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 1

R-squared (R²): 0.48268573454611075


                                                                                

### Imputing Fraud Probability Values

In [30]:
# Imputing Consumer Fraud Probability
merged_all = pipeline.fit(merged_all).transform(merged_all)

merged_all = assembler.transform(merged_all)

                                                                                

In [31]:
missing_data_fraud_consumer = merged_all.filter(merged_all["fraud_probability_consumer"].isNull())
predicted_data_fraud_consumer = model_fraud_consumer.transform(missing_data_fraud_consumer)
predicted_data_fraud_consumer = predicted_data_fraud_consumer.drop("fraud_probability_consumer")
predicted_data_fraud_consumer = predicted_data_fraud_consumer.withColumnRenamed("prediction", "fraud_probability_consumer")
predicted_data_fraud_consumer.show(5)

24/09/15 17:43:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:43:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:43:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:43:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:43:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:43:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 1

+-------+------------+------------------+--------------------+-----------+----------------+--------------------+--------------+-----------------+---------------+--------------------+--------------------+--------------------------+--------------+--------------------+---------------------+--------------------+--------------------------+
|user_id|merchant_abn|      dollar_value|            order_id|consumer_id|   name_consumer|    address_consumer|state_consumer|postcode_consumer|gender_consumer|       name_merchant|                tags|fraud_probability_merchant|order_datetime|state_consumer_index|gender_consumer_index|            features|fraud_probability_consumer|
+-------+------------+------------------+--------------------+-----------+----------------+--------------------+--------------+-----------------+---------------+--------------------+--------------------+--------------------------+--------------+--------------------+---------------------+--------------------+-----------------

                                                                                

In [32]:
no_null_cons_fraud = merged_all.filter(merged_all["fraud_probability_consumer"].isNotNull())
merged_all = no_null_cons_fraud.unionByName(predicted_data_fraud_consumer)

In [33]:
# Imputing Merchant Fraud Probability Values
missing_data_fraud_merchant = merged_all.filter(merged_all["fraud_probability_merchant"].isNull())
predicted_data_fraud_merchant = model_fraud_merchant.transform(missing_data_fraud_merchant)
predicted_data_fraud_merchant = predicted_data_fraud_merchant.drop("fraud_probability_merchant")
predicted_data_fraud_merchant = predicted_data_fraud_merchant.withColumnRenamed("prediction", "fraud_probability_merchant")
predicted_data_fraud_merchant.show(5)


24/09/15 17:44:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:44:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:44:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:44:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:44:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:44:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 1

+-------+------------+------------------+--------------------+-----------+--------------------------+--------------+--------------------+--------------+-----------------+---------------+--------------------+--------------------+--------------+--------------------+---------------------+--------------------+--------------------------+
|user_id|merchant_abn|      dollar_value|            order_id|consumer_id|fraud_probability_consumer| name_consumer|    address_consumer|state_consumer|postcode_consumer|gender_consumer|       name_merchant|                tags|order_datetime|state_consumer_index|gender_consumer_index|            features|fraud_probability_merchant|
+-------+------------+------------------+--------------------+-----------+--------------------------+--------------+--------------------+--------------+-----------------+---------------+--------------------+--------------------+--------------+--------------------+---------------------+--------------------+-----------------------

                                                                                

In [34]:
no_null_merch_fraud = merged_all.filter(merged_all["fraud_probability_merchant"].isNotNull())
merged_all = no_null_merch_fraud.unionByName(predicted_data_fraud_merchant)

### Imputing Order Date time
Combining forward fill, backward fill, and median imputation for the remaining null values.

In [35]:
from pyspark.sql import Window
from pyspark.sql.functions import last, first, coalesce, lit, expr, col

# Forward Fill
window_spec = Window.partitionBy("user_id").orderBy("order_id")

# Forward fill for order_datetime
filled_df = merged_all.withColumn(
    "order_datetime_ffill", last("order_datetime", ignorenulls=True).over(window_spec)
)

# Backward Fill
filled_df = filled_df.withColumn(
    "order_datetime_bfill", first("order_datetime_ffill", ignorenulls=True).over(window_spec.rowsBetween(0, Window.unboundedFollowing))
)

# Combine Forward and Backward Fill using coalesce
filled_df = filled_df.withColumn(
    "final_order_datetime", coalesce("order_datetime_ffill", "order_datetime_bfill")
)

# Drop intermediate columns for clarity
filled_df = filled_df.drop("order_datetime_ffill", "order_datetime_bfill")

# Check for any remaining nulls (entire groups may still have nulls)
remaining_nulls = filled_df.filter(col("final_order_datetime").isNull())


# Handle remaining nulls with median imputation
# Calculate the median datetime for order_datetime
median_datetime = filled_df.agg(expr("percentile_approx(order_datetime, 0.5)").alias("median_datetime")).first()["median_datetime"]

# Apply median imputation to remaining null values
filled_df = filled_df.withColumn(
    "final_order_datetime", coalesce("final_order_datetime", lit(median_datetime))
)

# Verify that no nulls remain
final_null_check = filled_df.filter(col("final_order_datetime").isNull())

# Show the final DataFrame with all missing datetimes handled
print("Final DataFrame:")
filled_df.select("user_id", "final_order_datetime").show(5)


                                                                                

Final DataFrame:


[Stage 1167:>                                                       (0 + 1) / 1]

+-------+--------------------+
|user_id|final_order_datetime|
+-------+--------------------+
|      7|          2021-11-20|
|      7|          2021-11-20|
|      7|          2021-11-20|
|      7|          2021-11-20|
|      7|          2021-11-20|
+-------+--------------------+
only showing top 5 rows



                                                                                

In [36]:
filled_df = filled_df.drop("order_datetime", "state_consumer_index", "gender_consumer_index", "features")

filled_df = filled_df.withColumnRenamed("final_order_datetime", "order_datetime")

filled_df.printSchema()


root
 |-- user_id: long (nullable = true)
 |-- merchant_abn: long (nullable = true)
 |-- dollar_value: double (nullable = true)
 |-- order_id: string (nullable = true)
 |-- consumer_id: long (nullable = true)
 |-- fraud_probability_consumer: double (nullable = true)
 |-- name_consumer: string (nullable = true)
 |-- address_consumer: string (nullable = true)
 |-- state_consumer: string (nullable = true)
 |-- postcode_consumer: integer (nullable = true)
 |-- gender_consumer: string (nullable = true)
 |-- name_merchant: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- fraud_probability_merchant: double (nullable = true)
 |-- order_datetime: date (nullable = false)



In [39]:
# Final Check for Null Values
null_order_datetime_count = filled_df.filter(col("order_datetime").isNull()).count()
print(f"Number of null values in the 'order_datetime' column: {null_order_datetime_count}")

Number of null values in the 'order_datetime' column: 0


### Export Final Dataframe to Parquet Format

In [37]:
filled_df.write.parquet("../data/curated/complete_data")

24/09/15 17:46:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:46:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:46:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:46:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:46:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 17:46:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/15 1