In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, DoubleType
from pyspark.sql.functions import year, month, concat_ws, broadcast, sum, col, avg, count

In [2]:
#Create a Spark Session
#Define Schemas for our dataFrame based on the csv file we have stored in Google Cloud Storage Bucket.

spark = SparkSession.builder\
        .appName("Retail DataSet")\
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/18 06:02:20 INFO SparkEnv: Registering MapOutputTracker
25/05/18 06:02:21 INFO SparkEnv: Registering BlockManagerMaster
25/05/18 06:02:21 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
25/05/18 06:02:21 INFO SparkEnv: Registering OutputCommitCoordinator


In [3]:
#Reading the data from Cloud Storage Bucket and creating dataFrame

In [3]:
customers_df = spark.read\
        .option("header", True)\
        .option("mode", "PERMISSIVE")\
        .option("badRecordsPath", "gs://retail_pysparkdata/bad_data")\
        .option("nullValue", "")\
        .option("inferSchema", True)\
        .csv("gs://retail_pysparkdata/customers.csv")

transactions_df = spark.read\
            .option("header", True)\
            .option("mode", "PERMISSIVE")\
            .option("badRecordsPath", "gs://retail_pysparkdata/bad_data")\
            .option("nullValue", "")\
            .option("inferSchema", True)\
            .csv("gs://retail_pysparkdata/transactions.csv")

                                                                                

In [5]:
#Clean data - nulls & remove duplicates

In [6]:
customers_df_clean = customers_df.dropna(subset = ["customer_id"]).dropDuplicates(["customer_id"])

In [7]:
transactions_df_clean = transactions_df.dropna(subset = ["customer_id","transaction_id","quantity","price"]).dropDuplicates(["customer_id"])

In [8]:
#Add transaction year/month and full address new column

transactions_df_enriched = transactions_df_clean.withColumn("transaction_year", year("transaction_date"))\
                            .withColumn("transaction_month", month("transaction_date"))

customers_df_enriched = customers_df_clean.withColumn("full_address", concat_ws(", ", "street_address", "city", "state", "zip_code"))


In [9]:
customers_count = transactions_df_enriched.groupBy("customer_id").agg(count(col("transaction_id")).alias("count_cust"))

In [10]:
customers_count = transactions_df_enriched.groupBy("customer_id").agg(count(col("transaction_id")).alias("count_cust"))
customers_count.filter("count_cust > 1").show()



+-----------+----------+
|customer_id|count_cust|
+-----------+----------+
+-----------+----------+



                                                                                

In [11]:
transactions_df_enriched.printSchema()

root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- quantity: double (nullable = true)
 |-- price: double (nullable = true)
 |-- transaction_date: timestamp (nullable = true)
 |-- store_location: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- discount_applied: double (nullable = true)
 |-- transaction_year: integer (nullable = true)
 |-- transaction_month: integer (nullable = true)



In [9]:
#Lets cache the dataframe thats enriched and data handled for reuse

In [12]:
# Repartition by customer_id for better join performance
customers_df_enriched = customers_df_enriched.repartition("customer_id")
transactions_df_enriched = transactions_df_enriched.repartition("customer_id")

# Cache for reuse
customers_df_enriched.cache()
transactions_df_enriched.cache()


DataFrame[transaction_id: string, customer_id: string, product_name: string, product_category: string, quantity: double, price: double, transaction_date: timestamp, store_location: string, payment_method: string, discount_applied: double, transaction_year: int, transaction_month: int]

In [12]:
#Lets Explore Joins

In [13]:
#Inner Join

inner_join_df = transactions_df_enriched.join(customers_df_enriched, how="inner", on="customer_id")


In [14]:
#Left Outer Join

left_outer_join_df = transactions_df_enriched.join(customers_df_enriched, how="left", on="customer_id")

In [15]:
#Right Outer Join

right_outer_join_df = transactions_df_enriched.join(customers_df_enriched, how="right", on="customer_id")

In [16]:
#Full Outer Join

full_outer_join_df = transactions_df_enriched.join(customers_df_enriched, how="full", on="customer_id")

In [17]:
#Broadcast Join

broadcast_join_df =transactions_df_enriched.join(broadcast(customers_df_enriched), how="outer", on="customer_id")

In [18]:
#Common Aggregations & Transformations

In [18]:
#To Find Total Spend per customer, lets first create a amount column with price & quantity and then apply the discount percentage

#transactions_df_enriched = transactions_df_enriched.withColumn("amount", col("quantity") * col("price"))

#transactions_df_enriched = transactions_df_enriched.withColumn("final_amount", col("amount") - col("discount_applied"))


#We found NULL being reported for most final_amount - Lets Investigate

transactions_df_enriched.select("quantity", "price", "discount_applied")\
                        .where(col("quantity").isNull() | col("price").isNull() | col("discount_applied").isNull())\
                        .show()

#It is now investigated that in the dataset there is Quqntity value blank when price is present thus it cascades null to final_amount



+--------+-------+----------------+
|quantity|  price|discount_applied|
+--------+-------+----------------+
|     1.0| 112.31|            null|
|     2.0|1425.13|            null|
|     1.0| 249.58|            null|
|     2.0| 744.62|            null|
|     1.0| 746.84|            null|
|     1.0|1928.22|            null|
|     1.0| 125.76|            null|
|     1.0|   40.8|            null|
|     2.0|   62.1|            null|
|     1.0|  207.4|            null|
|     1.0|  51.95|            null|
|     2.0| 459.22|            null|
|     1.0| 350.79|            null|
|     3.0| 448.67|            null|
|     1.0| 808.35|            null|
|     1.0|  27.58|            null|
|     1.0| 138.24|            null|
|     1.0|1151.79|            null|
|     1.0| 118.36|            null|
|     1.0|1298.82|            null|
+--------+-------+----------------+
only showing top 20 rows



                                                                                

In [19]:
#It is now investigated that in the dataset there is Quqntity value blank when price is present thus it cascades null to final_amount
#Lets create safe columns without NUll before calculating Final Amount

#We tried coalesce but that was replacing all with 0.0 because quantity is blank and not null. So alrernate method below

from pyspark.sql.functions import col, when, lit

# Replace empty strings or nulls with 0 for quantity and price
transactions_df_enriched = transactions_df_enriched\
                        .withColumn("clean_quantity", when((col("quantity").isNull()) | (col("quantity") == ""), lit(0))\
                                    .otherwise(col("quantity").cast("int")))\
                        .withColumn("clean_price", when((col("price").isNull()) | (col("price") == ""), lit(0.0))\
                                    .otherwise(col("price").cast("double")))\
                        .withColumn("clean_discount", when((col("discount_applied").isNull()) | (col("discount_applied") == ""), lit(0))\
                                    .otherwise(col("discount_applied").cast("int")))\
                        .withColumn("amount", col("clean_quantity") * col("clean_price"))\
                        .withColumn("final_amount", col("amount") - col("clean_discount"))



In [20]:
#Now that we got the final columns - final_amount / amount we can drop the safe columns we created.

transactions_df_enriched = transactions_df_enriched.drop("clean_quantity","clean_price","clean_discount")

In [21]:
#To Find Total Spend per customer

total_spend_df = transactions_df_enriched.groupBy("customer_id").agg(sum("final_amount").alias("total_spend"))

In [22]:
#Average Spend Per Store

average_spend_df = transactions_df_enriched.groupBy("store_location").agg(avg("final_amount").alias("avg_spend"))

In [23]:
#Count of Transactions Per Payment Method

count_trans_paymentMethod = transactions_df_enriched.groupBy("payment_method").agg(count("transaction_id"))


In [24]:
#High-Value Transactions (Final Amount > 1000)

transactions_highvalue_df = transactions_df_enriched.filter("final_amount > 1000")

In [25]:
#Top N Customers by Spend

top_customer_df = total_spend_df.orderBy(col("total_spend").desc()).limit(10)



In [90]:
# Count of Transactions per City - this may not give accurate result

#trans_city_df = customers_df_enriched.join(transactions_df_enriched, on="customer_id", how = "left").groupBy("city").count()

In [26]:
# Count of Transactions per City

trans_city_df = customers_df_enriched.join(transactions_df_enriched, on="customer_id", how = "left").groupBy("city").agg(count(when(col("transaction_id").isNotNull(),1)).alias("transaction_count"))

In [27]:
#Average Spend per Month

avg_spend_month = transactions_df_enriched.groupBy("transaction_month").agg(avg("final_amount").alias("avg_spend_month"))

In [35]:
#Rename & Drop Columns
renamed_df = transactions_df_enriched.withColumnRenamed("total_amount" , "final_amount")

transactions_df_enriched = renamed_df.drop("total_amount")




In [28]:
transactions_df_enriched.show()

+--------------------+--------------------+------------------+--------------------+--------+-------+-------------------+-----------------+--------------+----------------+----------------+-----------------+-------+------------+
|      transaction_id|         customer_id|      product_name|    product_category|quantity|  price|   transaction_date|   store_location|payment_method|discount_applied|transaction_year|transaction_month| amount|final_amount|
+--------------------+--------------------+------------------+--------------------+--------+-------+-------------------+-----------------+--------------+----------------+----------------+-----------------+-------+------------+
|da684eac-a7d9-4da...|1280bddb-d9ed-4dc...|    Food Processor|Small Kitchen App...|     1.0| 101.53|2022-08-23 00:00:00|       Denver, CO|   Credit Card|             0.0|            2022|                8| 101.53|      101.53|
|c5cf1693-8807-40e...|256c3dc8-04a9-41b...|     Sony Soundbar|     Audio Equipment|     2.0|

In [29]:
#Adding Derived Columns (withColumn)

from pyspark.sql.functions import when

transactions_flagged_df = transactions_df_enriched.withColumn("high_value_flag",when(transactions_df_enriched["amount"] > 1000, "HIGH")\
                                                                .otherwise("NORMAL"))


In [30]:
#Select and Rename Columns

renamed_df = transactions_flagged_df.selectExpr("transaction_id as txn_id","customer_id","amount","high_value_flag")


In [31]:
renamed_df.show()

+--------------------+--------------------+-------+---------------+
|              txn_id|         customer_id| amount|high_value_flag|
+--------------------+--------------------+-------+---------------+
|da684eac-a7d9-4da...|1280bddb-d9ed-4dc...| 101.53|         NORMAL|
|c5cf1693-8807-40e...|256c3dc8-04a9-41b...| 743.42|         NORMAL|
|bb0009d5-b09f-4a1...|2fa8b3e5-af90-4be...| 254.09|         NORMAL|
|b3d79cde-8a70-487...|2fe076bb-d401-427...| 602.18|         NORMAL|
|69b79747-fff8-498...|377dacf5-92d0-496...| 526.14|         NORMAL|
|c7fd2754-1a23-4a1...|43d172e3-4ee1-475...| 155.78|         NORMAL|
|073e8c29-b8b3-4d9...|455c05cc-3c5c-4f0...|1759.76|           HIGH|
|71eb03b1-9fe7-430...|54e325e2-8499-44e...| 962.08|         NORMAL|
|db7941b1-6f82-40c...|64afc1c8-2eaa-446...| 765.68|         NORMAL|
|635186b2-f814-41f...|81b29f5b-a070-45a...|1172.53|           HIGH|
|cb7015c9-3f21-415...|861c66ad-3aec-4da...| 112.31|         NORMAL|
|624860f2-24cd-464...|8a185511-3dc2-469...| 1489

In [None]:
# Repartition and Cache (optional for performance tuning)

In [33]:
from pyspark.sql.functions import lit, monotonically_increasing_id
from pyspark.sql import DataFrame

# 1. Pick a sample of customers to duplicate
sample_customers = transactions_df_enriched.select("customer_id").distinct().limit(5)




In [34]:
# 2. Join with transactions to get their original transactions
sample_txns = sample_customers.join(transactions_df_enriched, on="customer_id", how="inner")



In [35]:
#WINDOW Functions..

#Rank by amount per customer

from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, rank, dense_rank, sum as _sum, lag, lead

window_spec_rank = Window.partitionBy("customer_id").orderBy(col("final_amount").desc())

ranked_products_df = transactions_df_enriched.withColumn("rank_in_customer",row_number().over(window_spec_rank))


In [37]:
transactions_df_enriched.select("customer_id","amount","final_amount").show()

+--------------------+-------+------------+
|         customer_id| amount|final_amount|
+--------------------+-------+------------+
|1280bddb-d9ed-4dc...| 101.53|      101.53|
|256c3dc8-04a9-41b...| 743.42|      743.42|
|2fa8b3e5-af90-4be...| 254.09|      254.09|
|2fe076bb-d401-427...| 602.18|      602.18|
|377dacf5-92d0-496...| 526.14|      526.14|
|43d172e3-4ee1-475...| 155.78|      155.78|
|455c05cc-3c5c-4f0...|1759.76|     1754.76|
|54e325e2-8499-44e...| 962.08|      962.08|
|64afc1c8-2eaa-446...| 765.68|      750.68|
|81b29f5b-a070-45a...|1172.53|     1152.53|
|861c66ad-3aec-4da...| 112.31|      112.31|
|8a185511-3dc2-469...| 1489.2|      1484.2|
|99ba027b-24e2-406...|  354.4|       339.4|
|bdb529e6-281d-4ab...|  67.78|       42.78|
|dcdb6180-6f27-4dd...| 269.38|      264.38|
|e2275f6f-253e-44e...|1097.29|     1067.29|
|e5249c67-3787-4ee...| 221.37|      191.37|
|e8aa2256-69c1-433...| 864.15|      864.15|
|eacf4624-88d9-4aa...| 1544.0|      1544.0|
|ebbb77a6-097b-47b...| 913.04|  

In [38]:
ranked_products_df.select("customer_id","final_amount","rank_in_customer").show()

+--------------------+------------+----------------+
|         customer_id|final_amount|rank_in_customer|
+--------------------+------------+----------------+
|1280bddb-d9ed-4dc...|      101.53|               1|
|256c3dc8-04a9-41b...|      743.42|               1|
|2fa8b3e5-af90-4be...|      254.09|               1|
|2fe076bb-d401-427...|      602.18|               1|
|377dacf5-92d0-496...|      526.14|               1|
|43d172e3-4ee1-475...|      155.78|               1|
|455c05cc-3c5c-4f0...|     1754.76|               1|
|54e325e2-8499-44e...|      962.08|               1|
|64afc1c8-2eaa-446...|      750.68|               1|
|81b29f5b-a070-45a...|     1152.53|               1|
|861c66ad-3aec-4da...|      112.31|               1|
|8a185511-3dc2-469...|      1484.2|               1|
|99ba027b-24e2-406...|       339.4|               1|
|bdb529e6-281d-4ab...|       42.78|               1|
|dcdb6180-6f27-4dd...|      264.38|               1|
|e2275f6f-253e-44e...|     1067.29|           

In [39]:
#Running total of final_amount per customer

window_spec_running = Window.partitionBy("customer_id").orderBy("transaction_date")

running_total_df = transactions_df_enriched.withColumn("running_total",_sum("final_amount").over(window_spec_running))


In [40]:
#Compare each transaction with previous

window_spec_lag = Window.partitionBy("customer_id").orderBy("transaction_date")

lag_lead_df = transactions_df_enriched.withColumn("prev_amount",lag("final_amount", 1).over(window_spec_lag))\
            .withColumn("next_amount",lead("final_amount", 1).over(window_spec_lag))


In [41]:
#Dense Rank: Cities by total spend

city_total_df = customers_df_enriched.join(transactions_df_enriched, "customer_id")

city_spend_df = city_total_df.groupBy("city").agg(
    _sum("final_amount").alias("total_spent")
)

window_spec_city = Window.orderBy(col("total_spent").desc())

city_ranked_df = city_spend_df.withColumn(
    "dense_rank", dense_rank().over(window_spec_city)
)


In [42]:
print("Before repartition:", transactions_df_enriched.rdd.getNumPartitions())

Before repartition: 200


In [43]:
transactions_df_repartitioned = transactions_df_enriched.repartition(10, "customer_id")

In [44]:
print("After repartition:", transactions_df_repartitioned.rdd.getNumPartitions())



After repartition: 10


In [62]:
"""Performance Tuning...!!!!!
Step 8: Repartition, Coalesce, and Caching
These are critical concepts when optimizing performance in PySpark — especially for large datasets.
8.1 Repartitioning
When to use?
Increase parallelism when working with larger datasets
Ensures better load balancing across Spark executors

# Check current number of partitions
print("Before repartition:", transactions_df_with_dupes.rdd.getNumPartitions())  --> 200

# Repartition to 10 partitions based on customer_id
transactions_df_repartitioned = transactions_df_with_dupes.repartition(10, "customer_id")

print("After repartition:", transactions_df_repartitioned.rdd.getNumPartitions())
✅ 8.2 Coalesce
When to use?
Reduce number of partitions

Especially useful before writing to disk or performing actions like collect()
# Coalesce into fewer partitions (e.g., 2) before saving
transactions_df_coalesced = transactions_df_repartitioned.coalesce(2)
✅ 8.3 Caching / Persisting
When to use?
If you're using a DataFrame multiple times in your pipeline

# Cache to memory
transactions_df_repartitioned.cache()
transactions_df_repartitioned.count()  # Trigger cache
or

from pyspark.storagelevel import StorageLevel

# Persist to memory and disk
transactions_df_repartitioned.persist(StorageLevel.MEMORY_AND_DISK)

"""

'Performance Tuning...!!!!!\nStep 8: Repartition, Coalesce, and Caching\nThese are critical concepts when optimizing performance in PySpark — especially for large datasets.\n8.1 Repartitioning\nWhen to use?\nIncrease parallelism when working with larger datasets\nEnsures better load balancing across Spark executors\n\n# Check current number of partitions\nprint("Before repartition:", transactions_df_with_dupes.rdd.getNumPartitions())  --> 200\n\n# Repartition to 10 partitions based on customer_id\ntransactions_df_repartitioned = transactions_df_with_dupes.repartition(10, "customer_id")\n\nprint("After repartition:", transactions_df_repartitioned.rdd.getNumPartitions())\n✅ 8.2 Coalesce\nWhen to use?\nReduce number of partitions\n\nEspecially useful before writing to disk or performing actions like collect()\n# Coalesce into fewer partitions (e.g., 2) before saving\ntransactions_df_coalesced = transactions_df_repartitioned.coalesce(2)\n✅ 8.3 Caching / Persisting\nWhen to use?\nIf you\'re

In [45]:
transactions_df_coalesced = transactions_df_repartitioned.coalesce(2)

In [46]:
print("After Coalesce:", transactions_df_coalesced.rdd.getNumPartitions())

After Coalesce: 2




In [47]:
#Save output as csv

transactions_df_repartitioned \
    .write \
    .mode("overwrite") \
    .option("header", True) \
    .csv("gs://retail_pysparkdata//output/transactions")


                                                                                

In [48]:
#Save output as parque

transactions_df_repartitioned \
    .write \
    .mode("overwrite") \
    .parquet("gs://retail_pysparkdata//output/transactionss_parquet")


                                                                                

In [49]:
spark.stop()