In [1]:
import os
os.sys.path.append("../")
from scripts.consumer_transaction_model import *

In [2]:
os.environ['PYSPARK_PYTHON'] = "/usr/local/bin/python3.11"
os.environ['PYSPARK_DRIVER_PYTHON'] = "/usr/local/bin/python3.11"

In [3]:
# Create a Spark Session
spark = (
    SparkSession.builder.appName("consumer transaction model")
    .config("spark.sql.repl.eagerEval.enabled", True)
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.driver.memory", "4g")
    .config("spark.execturo.memory", "2g")
    .getOrCreate()
)

24/09/30 22:52:42 WARN Utils: Your hostname, DESKTOP-H6V94HM resolves to a loopback address: 127.0.1.1; using 192.168.0.236 instead (on interface wifi0)
24/09/30 22:52:42 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/30 22:52:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# Loading the necessary data
consumer_info = spark.read.parquet('../data/curated/consumer_info.parquet')

transaction_records = spark.read.parquet('../data/curated/transactions.parquet')
transaction_records = transaction_records.drop("name") # drop name so it doesn't conflict when merge with consumer

fraudulent_consumer_rate = spark.read.parquet('../data/curated/consumer_fp.parquet')

personal_fraud = spark.read.csv('../data/curated/personal_fraud.csv', header=True, inferSchema=True)
postcode_info = spark.read.csv('../data/curated/postcode_info.csv', header=True, inferSchema=True)

personal_fraud = personal_fraud.drop(personal_fraud.columns[0])
postcode_info = postcode_info.drop(postcode_info.columns[0])

                                                                                

# Consumer Fraud Probability Prediction Model

As there are a lot of missing fraud probability for consumers in the transactions dataset, we would like to use machine learning algorthm to predict the missing values. This version of the model predict fraud probability at a transactional level. In other words, for any transactions that the customer made, the fraud probability is compute for that transaction only and all transactions made by a customer may have different fraud probability

## Feature Engineering

We will do some feature engineering and data aggregation as we believe that the current data we have on each transactions isn't enough for us to accurately predict the fraud probability.

In [5]:
# Add consumer info to transaction records
transaction_fraudulent_consumer_with_info = transaction_records.join(consumer_info, on="consumer_id", how="inner")
transaction_fraudulent_consumer_with_info = transaction_fraudulent_consumer_with_info.drop(
    "merchant_abn", "merchant_fp", "category", "revenue_level", "take_rate"
)
transaction_fraudulent_consumer_with_info.limit(5)

                                                                                

consumer_id,order_datetime,dollar_value,order_id,consumer_fp,name,gender,state,postcode
285333,2021-08-19,244.11185528431415,ab5d50f5-cf77-47f...,,Jose Rodriguez,Undisclosed,WA,6901
255477,2021-08-19,63.60772275481862,e7da0886-4c01-4f1...,,Dawn Rush,Female,ACT,2911
458016,2021-08-15,278.8957491120757,97329434-96eb-40b...,,Heather Martinez,Female,NT,837
471660,2021-08-16,10.081895520137127,5065b7d1-b838-4d7...,,Jennifer Guzman,Female,WA,6068
948149,2021-08-27,40.93297316366031,6887bae1-06d6-4d7...,,Stephen Nguyen,Male,SA,5139


Each consumer's area of living will have their associated fraud probability. The state and postcode average fraud probability will be useful features.

In [6]:
# Average fraud probability in each postcode or state
consumer_info_with_fp = consumer_info.join(fraudulent_consumer_rate, on = 'consumer_id', how = 'inner')

fraudulent_consumer_group_by_postcode = consumer_info_with_fp.groupBy(["postcode"]).agg(F.avg("fraud_probability").alias("average_fraud_prob_of_postcode"))

fraudulent_consumer_group_by_state = consumer_info_with_fp.groupBy(["state"]).agg(F.avg("fraud_probability").alias("average_fraud_prob_of_state"))

We believe that consumer with fluctutating buying behaviour, i.e high standard deviation accross all order, are potentially commiting fraud.

In [7]:
# analysis order value, consider the variance of order value and purchase frequency
consumer_transaction_value_analysis =  transaction_fraudulent_consumer_with_info.groupBy("consumer_id", "state", "postcode") \
                                        .agg(
                                            F.avg("dollar_value").alias("average_dollar_value"),
                                            F.min("dollar_value").alias("min_dollar_value"),
                                            F.max("dollar_value").alias("max_dollar_value"),
                                            F.count("dollar_value").alias("transaction_count"),
                                            F.stddev("dollar_value").alias("stddev_dollar_value")
                                        )

# consumer_transaction_value_analysis.limit(10)

Consumer with high standard deviation in the dollar value of their transactions may be suspicious as that mean their shopping habid varies a lot.

In [8]:
transaction_fraudulent_consumer_summary = transaction_fraudulent_consumer_with_info \
    .join(consumer_transaction_value_analysis, on=["consumer_id", "state","postcode"], how="left") \
    .join(fraudulent_consumer_group_by_postcode, on="postcode", how="inner") \
    .join(fraudulent_consumer_group_by_state, on="state", how="inner")

Since we have data on personal fraud rate and income from each postcode, we can use it to help predicting consumer fraud probability. We will also create a feature that calculate the proportion of the mean/median income of the consumer's respective location that is used for making transactions. We think that it is unreasonable for a person to spend more than 70% of their annual salary on purchasing items as that would mean they wouldn't have enough money for other neccessity.

In [9]:
# Get infomation on personal fraud and income from external dataset
postcode_info = postcode_info.drop("state", "long", "lat", "lgacode")
transaction_fraudulent_consumer_summary = transaction_fraudulent_consumer_summary.join(personal_fraud, on="state", how="inner")
transaction_fraudulent_consumer_summary = transaction_fraudulent_consumer_summary.join(postcode_info, on="postcode", how="inner")

In [10]:
# Get proportion of the money used to purchase item with respect to income (one and a half year)
# average income
transaction_fraudulent_consumer_summary = transaction_fraudulent_consumer_summary.withColumn("Proportion_between_max_order_value_mean_income", F.col("max_dollar_value") / (F.col("mean_income") * 1.5) )
transaction_fraudulent_consumer_summary = transaction_fraudulent_consumer_summary.withColumn("Proportion_between_max_order_value_median_income", F.col("max_dollar_value") / (F.col("median_income") * 1.5))

# Total income
transaction_fraudulent_consumer_summary = transaction_fraudulent_consumer_summary.withColumn("Proportion_between_total_order_value_mean_income", F.col("average_dollar_value") * F.col("transaction_count") / (F.col("mean_income") * 1.5))
transaction_fraudulent_consumer_summary = transaction_fraudulent_consumer_summary.withColumn("Proportion_between_total_order_value_median_income", F.col("average_dollar_value") * F.col("transaction_count") / (F.col("median_income") * 1.5))


As prediting consumer's fraud probability on a transactional level is our main goal, we suspect that there is a temporal relationship between fraud probability and the month, date of purchase. Thus, we will split the `order_datetime` column into month, day (Monday - Sunday).

We also introduce a feature that indicates the number of order that a customer made in the previous week. Hence, we will train our model on 2021-03-07, which is 6 days before the first date of entry.

In [11]:
# Convert 'order_datetime' from string to date format
transaction_fraudulent_consumer_summary = transaction_fraudulent_consumer_summary.withColumn("order_datetime", F.to_date("order_datetime", "yyyy-MM-dd"))
cutoff_date = "2021-03-07"
transaction_fraudulent_consumer_summary = transaction_fraudulent_consumer_summary.filter(F.col("order_datetime") >= F.lit(cutoff_date))

# Add a new column 'transaction_count_last_n_days' that counts the transactions within n days before each transaction
window_spec = Window.partitionBy("consumer_id").orderBy(F.col("order_datetime").cast("long")) \
    .rangeBetween(-7 * 86400, 0)  # 7 days in seconds (86400 seconds = 1 day)

transaction_fraudulent_consumer_summary = transaction_fraudulent_consumer_summary.withColumn("transaction_count_last_7_days", F.count("order_datetime").over(window_spec))

In [12]:
# Return the corresponding day of the week for the given date in the DataFrame.
transaction_fraudulent_consumer_summary = transaction_fraudulent_consumer_summary.withColumn("day_of_week", F.dayofweek("order_datetime"))
transaction_fraudulent_consumer_summary = transaction_fraudulent_consumer_summary.withColumn("is_weekend", F.when((F.col("day_of_week") == 7) | (F.col("day_of_week") == 1), 1).otherwise(0))

transaction_fraudulent_consumer_summary = transaction_fraudulent_consumer_summary.withColumn("month", F.month("order_datetime"))


In [13]:
monthly_summary = transaction_fraudulent_consumer_summary.withColumn("month", F.month("order_datetime")) \
    .groupBy("month") \
    .agg(
        F.count("*").alias("count"),
        F.avg("consumer_fp").alias("average_fraud_probability")
    )

monthly_summary.show()



+-----+-------+-------------------------+
|month|  count|average_fraud_probability|
+-----+-------+-------------------------+
|   12| 910143|        14.34681743793918|
|    1| 525394|        14.65688007921009|
|    6|1312987|        13.90154329508384|
|    3|1051393|        14.73715900790723|
|    5|1329977|       14.764061886055469|
|    9| 644670|        14.60164630257746|
|    4|1174513|       14.588978233806925|
|    8|1365693|       14.731618998459677|
|    7|1375524|       14.511252271710653|
|   10| 693724|       14.602753834274727|
|   11| 942185|       14.623580500547867|
|    2| 504446|       14.772366739280768|
+-----+-------+-------------------------+



                                                                                

We can see that the average fraud probability of each month doesn't vary a lot. This may suggest that there is little to no temporal relationship between consumer's fraud probability and month. Though, during our model fitting, we will stil include this feature in the model and will check the feature importance.

During our preliminary analysis, we found out that the distribution of the dollar value for the transactions are heavily right-skewed even after a log-transformation. Thus, we will do a log-transformation on the feature `dollar_value` as well as any other features that are related to it, and then normalise for better comparision.

# Visualisation

## Assumptions or Observations:
1. The gender plot shows that there is a similar number of male and female consumers.
2. The number of consumers varies significantly across different states.
3. Consumers make a similar number of purchases on each day of the week, whether it’s a weekday or a weekend.
4. Both fraud probability and the dollar value of an order are strongly right-skewed and should be normalized.
5. Proportion features exhibit a linear relationship with fraud probability but may need transformation to clarify this relationship.

In [16]:
# # Convert relevant columns to Pandas
# df_pandas = transaction_fraudulent_consumer_summary.select(
#     "dollar_value", "scaled_dollar_value", "consumer_fp", "scaled_average_dollar_value", 
#     "scaled_min_dollar_value", "scaled_max_dollar_value", "transaction_count", 
#     "median_income", "mean_income", "state", "gender", "scaled_stddev_dollar_value",
#     "day_of_week", "is_weekend", "Proportion_between_max_order_value_mean_income",
#     "Proportion_between_max_order_value_median_income", 
#     "Proportion_between_total_order_value_mean_income", 
#     "Proportion_between_total_order_value_median_income"
# ).toPandas()

# # Define plots in a dictionary for looping
# plots = {
#     "Dollar Value Distribution": ("dollar_value", "hist"),
#     "Scaled Dollar Value Distribution": ("scaled_dollar_value", "hist"),
#     "Max Dollar Value Distribution": ("scaled_max_dollar_value", "hist"),
#     "Min Dollar Value Distribution": ("scaled_min_dollar_value", "hist"),
#     "Std Dollar Value Distribution": ("scaled_stddev_dollar_value", "hist"),
#     "Average Dollar Value Distribution": ("scaled_average_dollar_value", "hist"),
#     "Fraud Probability Distribution": ("consumer_fp", "hist"),
#     "Transaction Count Distribution": ("transaction_count", "hist"),
#     "Gender Count": ("gender", "count"),
#     "State Count": ("state", "count"),
#     "Day of Week Count": ("day_of_week", "count"),
#     "Is Weekend Count": ("is_weekend", "count"),
#     "Scatter 1 (Max Order Value vs Fraud Prob - Mean Income)": ("Proportion_between_max_order_value_mean_income", "scatter1"),
#     "Scatter 2 (Max Order Value vs Fraud Prob - Median Income)": ("Proportion_between_max_order_value_median_income", "scatter2"),
#     "Scatter 3 (Total Order Value vs Fraud Prob - Mean Income)": ("Proportion_between_total_order_value_mean_income", "scatter3"),
#     "Scatter 4 (Total Order Value vs Fraud Prob - Median Income)": ("Proportion_between_total_order_value_median_income", "scatter4")
# }
# feature_visualisation(df_pandas, plots)


In [17]:
# df_pandas = transaction_fraudulent_consumer_summary.select(
#     "scaled_dollar_value", "fraud_probability", "scaled_average_dollar_value", 
#     "scaled_min_dollar_value", "scaled_max_dollar_value", "transaction_count", 
#     "median_income", "mean_income", "Proportion_between_max_order_value_mean_income",
#     "Proportion_between_max_order_value_median_income", 
#     "Proportion_between_total_order_value_mean_income", 
#     "Proportion_between_total_order_value_median_income"
# ).toPandas()
# corr_matrix = df_pandas.corr()
# plt.figure(figsize=(12, 8))
# sns.heatmap(corr_matrix, annot=True, cmap="coolwarm", fmt=".2f", linewidths=0.5)
# plt.title("Correlation Heatmap of Numeric Features")
# plt.show()


# Idea
1. Time Frequency feature: https://ieeexplore.ieee.org/document/9399421/

# Modelling

First, let's split our train data and the data that we want to predict.

In [18]:
train_data = transaction_fraudulent_consumer_summary.filter(F.col("consumer_fp").isNotNull())
predict_data = transaction_fraudulent_consumer_summary.filter(F.col("consumer_fp").isNull())

We will be using 2 regression model, one is Random Forest Regression (RFR) and the other is Linear Regression (LR). We will use LR as the baseline model to compare with RFR.

In [56]:
# List of feature to be used in the model
features_rf = ["norm_dollar_value", "norm_max_dollar_value","average_fraud_prob_of_postcode", "norm_stddev_dollar_value", "Proportion_between_max_order_value_median_income",
               "Proportion_between_max_order_value_mean_income", "transaction_count_last_7_days", "month_index", "weekday_index", "is_weekend_vector"]

features_lr = ['norm_dollar_value', 'norm_average_dollar_value', 'norm_stddev_dollar_value', 'average_fraud_prob_of_postcode', 'Proportion_between_total_order_value_mean_income',
               'Proportion_between_max_order_value_median_income', 'Proportion_between_max_order_value_mean_income', 'month_index', 'weekday_index', 'is_weekend_vector',
               'transaction_count_last_7_days']

In [41]:
assembled_train_data_rf, rf_assembler = assemble_data(train_data, features_rf)
assembled_train_data_lr, _ = assemble_data(train_data, features_lr)

                                                                                

In [21]:
train_set_rf, validate_set_rf = assembled_train_data_rf.randomSplit([0.8, 0.2], seed=123)
train_set_lr, validate_set_lr = assembled_train_data_lr.randomSplit([0.8, 0.2], seed=123)

In [42]:
# Parameter grid
rf_paramGrid = ParamGridBuilder() \
    .addGrid(RandomForestRegressor(labelCol='consumer_fp', featuresCol='features').numTrees, [10, 20, 40]) \
    .addGrid(RandomForestRegressor(labelCol='consumer_fp', featuresCol='features').maxDepth, [5, 10, 12]) \
    .build()

rf_evaluator = RegressionEvaluator(labelCol="consumer_fp", predictionCol="prediction")

rf_crossval = CrossValidator(estimator=RandomForestRegressor(labelCol='consumer_fp', featuresCol='features'),
                          estimatorParamMaps=rf_paramGrid,
                          evaluator=rf_evaluator,
                          numFolds=2)

rf_model = rf_crossval.fit(train_set_rf)


                                                                                

In [23]:
lr_paramGrid = ParamGridBuilder() \
    .addGrid(LinearRegression(labelCol="consumer_fp", featuresCol="features").regParam, [0.01, 0.1, 1.0]) \
    .addGrid(LinearRegression(labelCol="consumer_fp", featuresCol="features").elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()
    
lr_evaluator = RegressionEvaluator(labelCol="consumer_fp", predictionCol="prediction")

lr_crossval = CrossValidator(estimator=LinearRegression(labelCol="consumer_fp", featuresCol="features"),
                          estimatorParamMaps=lr_paramGrid,
                          evaluator=lr_evaluator,
                          numFolds=2)

lr_model = lr_crossval.fit(train_set_lr)

24/09/30 23:02:19 WARN Instrumentation: [a2b48f26] regParam is zero, which might cause numerical instability and overfitting.
24/09/30 23:02:19 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/09/30 23:02:20 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
24/09/30 23:02:58 WARN Instrumentation: [6ea8a951] regParam is zero, which might cause numerical instability and overfitting.
24/09/30 23:03:02 WARN Instrumentation: [11654c4f] regParam is zero, which might cause numerical instability and overfitting.
24/09/30 23:03:04 WARN Instrumentation: [017340ad] regParam is zero, which might cause numerical instability and overfitting.
24/09/30 23:03:06 WARN Instrumentation: [91c799a2] regParam is zero, which might cause numerical instability and overfitting.
24/09/30 23:03:09 WARN Instrumentation: [32b4e5d2] regParam is zero, which might cause numerical instability and overfitting.
24/09/30 23:03:11 WARN Ins

# Model Evaluation

Random Forest Regression

In [43]:
rf_predictions = rf_model.transform(validate_set_rf)
rf_rmse = rf_evaluator.evaluate(rf_predictions, {rf_evaluator.metricName: "rmse"})
rf_r2 = rf_evaluator.evaluate(rf_predictions, {rf_evaluator.metricName: "r2"})
print(f"RFR's Root Mean Squared Error (RMSE) on validation data = {rf_rmse}")
print(f"RFR's R2 (Coefficient of Determination) on validation data: {rf_r2}")



RFR's Root Mean Squared Error (RMSE) on validation data = 6.811379335834735
RFR's R2 (Coefficient of Determination) on validation data: 0.4257320912403959


                                                                                

Linear Regression

In [25]:
lr_predictions = lr_model.transform(validate_set_lr)
lr_rmse = lr_evaluator.evaluate(lr_predictions, {lr_evaluator.metricName: "rmse"})
lr_r2 = lr_evaluator.evaluate(lr_predictions, {lr_evaluator.metricName: "r2"})
print(f"LR's Root Mean Squared Error (RMSE) on validation data = {lr_rmse}")
print(f"LR's R2 (Coefficient of Determination) on validation data: {lr_r2}")



LR's Root Mean Squared Error (RMSE) on validation data = 7.829881311475111
LR's R2 (Coefficient of Determination) on validation data: 0.24115204065560436


                                                                                

Based on the result, we can see that the RFR model perform better than the LR model both in terms of prediction accuracy as well as variation in data explained by the model. In fact, we expected this as linear regression is a simple model and only work best with linear data. It's very unlikely the case that the relationship between the features and the response variable is linear, thus giving RFR the upper hand. Based on such performance, we will use RFR to predict our missing fraud probability.

Let's have a look at the hyperparameters of the "better" model for RFR and its feature importances.

In [48]:
best_rf_model = rf_model.bestModel
print(f"Best Random Forest numTrees: {best_rf_model.getNumTrees}") # 20
print(f"Best Random Forest maxDepth: {best_rf_model.getMaxDepth()}") # 5


Best Random Forest numTrees: 20
Best Random Forest maxDepth: 5


In [57]:
feature_names_rf = rf_assembler.getInputCols()
rf_feature_importances = rf_model.bestModel.featureImportances

rf_importances_df = pd.DataFrame({
    "Feature": features_rf,
    "Importance": rf_feature_importances.toArray()
}).sort_values(by="Importance", ascending=False)

print(rf_importances_df)

                                            Feature  Importance
0                                 norm_dollar_value    0.425110
1                             norm_max_dollar_value    0.205717
2                    average_fraud_prob_of_postcode    0.109584
3                          norm_stddev_dollar_value    0.097465
4  Proportion_between_max_order_value_median_income    0.067603
5    Proportion_between_max_order_value_mean_income    0.048784
7                                       month_index    0.042556
8                                     weekday_index    0.002601
6                     transaction_count_last_7_days    0.000503
9                                 is_weekend_vector    0.000076


From the feature importance table, we can see that the bottom 3 have very little impact on the predictions of the fraud probabilities. Thus, we will exclude them from our model. Using the `best_rf_model` to predict our missing consumer fraud probabilities.

In [75]:
final_features_rf = ["norm_dollar_value", "norm_max_dollar_value","average_fraud_prob_of_postcode", "norm_stddev_dollar_value", "Proportion_between_max_order_value_median_income",
               "Proportion_between_max_order_value_mean_income", "month_index"]

assembled_prediction_data, _ = assemble_data(predict_data, final_features_rf)

final_rf = RandomForestRegressor(labelCol='consumer_fp', featuresCol='features',
                                 numTrees=20, maxDepth=5)

final_rf_model = final_rf.fit(train_set_rf)
predictions = final_rf_model.transform(assembled_prediction_data)
predictions = predictions.select(*['consumer_id', 'order_datetime', 'order_id', 'prediction'])
predictions = predictions.withColumnRenamed('prediction', 'consumer_fp')

                                                                                

All that's left is to select the same columns from the train data and concat them togther.

In [76]:
train_data = train_data.select(*['consumer_id', 'order_datetime', 'order_id', 'consumer_fp'])

In [81]:
final_df = train_data.union(predictions)
final_df.write.parquet(f"../data/curated/predicted_consumer_fp.parquet", mode = "overwrite")

                                                                                