# Preliminary Analysis

BNPL Data timeline: 2021-2-28 to 2022-10-26

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import* 

In [3]:
spark = (
    SparkSession.builder.appName("Preliminary Analysis")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.driver.memory","4G")
    .config("spark.executor.memory","4G")
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .getOrCreate()
)

24/08/29 15:32:15 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


# Load Dataset

### Table 1

In [34]:
consumer_fraud = spark.read.csv('../data/tables/tables 1/consumer_fraud_probability.csv', header=True, inferSchema=True)
merchant_fraud = spark.read.csv('../data/tables/tables 1/merchant_fraud_probability.csv', header=True, inferSchema=True)
consumer_user_details = spark.read.parquet('../data/tables/tables 1/consumer_user_details.parquet')
tbl_consumer = spark.read.csv('../data/tables/tables 1/tbl_consumer.csv', header=True, inferSchema=True)
tbl_merchants = spark.read.parquet('../data/tables/tables 1/tbl_merchants.parquet')

In [36]:
# split tbl_consumer table
# single column into multiple columns
split_col = split(tbl_consumer['name|address|state|postcode|gender|consumer_id'], r'\|')

# create separate columns for each part
tbl_consumer = tbl_consumer.withColumn('name', split_col.getItem(0)) \
                           .withColumn('address', split_col.getItem(1)) \
                           .withColumn('state', split_col.getItem(2)) \
                           .withColumn('postcode', split_col.getItem(3)) \
                           .withColumn('gender', split_col.getItem(4)) \
                           .withColumn('consumer_id', split_col.getItem(5))

tbl_consumer = tbl_consumer.drop('name|address|state|postcode|gender|consumer_id')

tbl_consumer.show(3)

+----------------+--------------------+-----+--------+------+-----------+
|            name|             address|state|postcode|gender|consumer_id|
+----------------+--------------------+-----+--------+------+-----------+
|Yolanda Williams|413 Haney Gardens...|   WA|    6935|Female|    1195503|
|      Mary Smith|     3764 Amber Oval|  NSW|    2782|Female|     179208|
|   Jill Jones MD|  40693 Henry Greens|   NT|     862|Female|    1194530|
+----------------+--------------------+-----+--------+------+-----------+
only showing top 3 rows



24/08/29 18:48:46 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 912515 ms exceeds timeout 120000 ms
24/08/29 18:48:46 WARN SparkContext: Killing executors is not supported by current scheduler.
24/08/29 19:05:36 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

### Table 2 3 4 - transactions

In [16]:
# 3 transactions tables
tables_2 = spark.read.parquet('../data/tables/tables 2')
tables_3 = spark.read.parquet('../data/tables/tables 3')
tables_4 = spark.read.parquet('../data/tables/tables 4')

In [19]:
print('number of transactions in table 2 3 4: ', tables_2.count(), tables_3.count(), tables_4.count())


number of transactions in table 2 3 4:  3643266 4508106 6044133


In [20]:
# combine all transactions - 14195505 transactions with no duplicate record
combined_table = tables_2.union(tables_3).union(tables_4)

In [23]:
# Check duplicate transaction records

# group by all columns and count occurrences
duplicates = combined_table.groupBy(combined_table.columns).count()

# keep only duplicate records
duplicates = duplicates.filter(col("count") > 1)

# duplicate row
duplicates.show()

                                                                                

+-------+------------+------------+--------+--------------+-----+
|user_id|merchant_abn|dollar_value|order_id|order_datetime|count|
+-------+------------+------------+--------+--------------+-----+
+-------+------------+------------+--------+--------------+-----+



                                                                                