### Data Reading

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, DoubleType

# Define schema
schema = StructType([
    StructField("transaction_id", StringType(), True), # (Column_name, data_type, is_nullable)
    StructField("customer_id", IntegerType(), True),
    StructField("transaction_date", DateType(), True),
    StructField("amount", DoubleType(), True)
])

df_spark = (
    spark.read.format("csv")
    .option("header", True)
    .schema(schema)
    .load("dbfs:/Volumes/pyspark_tut/loyalty_program/loyalty_pipeline/uploaded_file_20250807_232147.csv")
)

df_spark.printSchema()

root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- amount: double (nullable = true)



### EDA

In [0]:
df_spark.display()

transaction_id,customer_id,transaction_date,amount
1,310,2024-08-07,7384.73
2,431,2024-08-07,9087.56
3,160,2024-08-07,6642.57
4,191,2024-08-07,1686.88
5,19,2024-08-07,1723.93
6,186,2024-08-07,3868.67
7,47,2024-08-07,7513.89
8,37,2024-08-07,7107.96
9,161,2024-08-07,6803.25
10,186,2024-08-07,8168.9


In [0]:
df_spark.summary().show()

+-------+-----------------+------------------+------------------+
|summary|   transaction_id|       customer_id|            amount|
+-------+-----------------+------------------+------------------+
|  count|            10300|             10300|              9321|
|   mean|           5150.5|225.88621359223302| 5518.367632585889|
| stddev|2973.498220390701|129.70523718228526|2608.7765710499075|
|    min|                1|                 1|           1003.78|
|    25%|           2575.0|               113|           3244.57|
|    50%|           5149.0|               226|           5484.37|
|    75%|           7724.0|               337|           7783.05|
|    max|             9999|               450|           9999.07|
+-------+-----------------+------------------+------------------+



In [0]:
from pyspark.sql.functions import col, sum

transaction_id_null_count = df_spark.select(sum(col("transaction_id").isNull().cast("int")).alias("Transaction ID Null Count")).show()
customer_id_null_count = df_spark.select(sum(col("customer_id").isNull().cast("int")).alias("Customer ID Null Count")).show()
amount_null_count = df_spark.select(sum(col("amount").isNull().cast("int")).alias("Amount Null Count")).show()

+-------------------------+
|Transaction ID Null Count|
+-------------------------+
|                        0|
+-------------------------+

+----------------------+
|Customer ID Null Count|
+----------------------+
|                     0|
+----------------------+

+-----------------+
|Amount Null Count|
+-----------------+
|              979|
+-----------------+



### Processing of data

In [0]:
from pyspark.sql.functions import sum, avg, count
from pyspark.sql.functions import col, expr
from pyspark.sql.types import DecimalType

# Group the data by customer id and calculate the sum of amount and count of transactions.
df_grouped = df_spark.groupby("customer_id")\
    .agg(sum('amount').alias("sum_amount"),\
        count("transaction_id").alias('transaction_count'))

# Setup the condition for the tier level
df_grouped = df_grouped.select(col("*") , expr("CASE WHEN sum_amount >= 100000 AND transaction_count >= 20 THEN 'Platinum' " +
                                               "WHEN sum_amount >= 50000 AND transaction_count >= 10 THEN 'Gold' " +
                                               "WHEN sum_amount >= 20000 AND transaction_count >= 5 THEN 'Silver' " +
                                               "ELSE 'Bronze' END").alias('tier') )

df_grouped = df_grouped.withColumn('sum_amount', col('sum_amount').cast(DecimalType(10,2)))

df_grouped = df_grouped.orderBy( col("customer_id").asc())

df_grouped.display()

customer_id,sum_amount,transaction_count,tier
1,113440.67,26,Platinum
2,107661.76,21,Platinum
3,97190.7,22,Gold
4,89612.55,16,Gold
5,93797.08,21,Gold
6,225230.98,40,Platinum
7,88102.02,18,Gold
8,141680.46,30,Platinum
9,137235.63,24,Platinum
10,146203.17,29,Platinum


### Saving result to parquet

In [0]:
df_grouped.write.mode('overwrite').parquet("dbfs:/Volumes/pyspark_tut/loyalty_program/loyalty_pipeline/cleaned/loyalty_tier.parquet")

In [0]:
df_par = spark.read.parquet("dbfs:/Volumes/pyspark_tut/loyalty_program/loyalty_pipeline/cleaned/loyalty_tier.parquet")
df_par.display()

customer_id,sum_amount,transaction_count,tier
1,113440.67,26,Platinum
2,107661.76,21,Platinum
3,97190.7,22,Gold
4,89612.55,16,Gold
5,93797.08,21,Gold
6,225230.98,40,Platinum
7,88102.02,18,Gold
8,141680.46,30,Platinum
9,137235.63,24,Platinum
10,146203.17,29,Platinum
