In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
%run /Users/roboticsmanoj@gmail.com/retail_sales_project/Table_Definitions


In [0]:
%run /Users/roboticsmanoj@gmail.com/retail_sales_project/Data_Ingestion

In [0]:
sales_df.show(5)

+-------+-----------+----------+--------+----------+--------+--------+--------------+
|sale_id|customer_id|product_id|store_id| sale_date|  amount|discount|payment_method|
+-------+-----------+----------+--------+----------+--------+--------+--------------+
| 500000|     945173|    954042|  300376|2019-12-31|  555.60|   21.39|        paypal|
| 500001|     102351|    200093|  300178|2022-02-06|14048.03|   11.96|         check|
| 500002|     104649|    200015|  300409|2023-08-15| 9448.34|   13.29|         check|
| 500003|     106302|    200741|  300283|2024-01-05|12295.88|   28.49|          card|
| 500004|     103450|    200834|  300166|2023-12-30|13112.78|    1.11|          card|
+-------+-----------+----------+--------+----------+--------+--------+--------------+
only showing top 5 rows



True

In [0]:
sales_df_customer_flag = sales_df.join(customers_df.select(col("customer_id")).withColumn("customer_id_valid_flag" , lit(True)) , on = "customer_id" , how = "left")

sales_df_errors = sales_df_customer_flag.withColumn("Validation_Errors",  concat_ws(",", array(
    when(col("customer_id_valid_flag").isNull(),
         lit("Referential Integrity compromised. Invalid Customer ID")),
            when(col("sale_id").isNull(), lit("Invalid Sale ID")),
            when(col("customer_id").isNull(), lit("Invalid Customer ID")),
            when(col("sale_date") < to_date(lit("2020-01-01")), lit("Invalid Date")),
            when(col("amount") < 4000, lit("Invalid Amount")),
            when(~col("payment_method").isin("card", "cash", "upi"), lit("Invalid Payment Method"))
        ))
    )


In [0]:
sales_df_errors.select(col("Validation_Errors")).groupBy(col("Validation_Errors")).agg(count("*")).show()

+-----------------+--------+
|Validation_Errors|count(1)|
+-----------------+--------+
|   Invalid Amount|  100000|
+-----------------+--------+



In [0]:
sales_df_validated = sales_df_errors.filter(col("Validation_Errors") == "")
sales_df_invalid = sales_df_errors.filter(col("Validation_Errors") != "")
sales_df_validated.show(5)
sales_df_invalid.limit(20).show(truncate=False)


+-----------+-------+----------+--------+----------+--------+--------+--------------+----------------------+-----------------+
|customer_id|sale_id|product_id|store_id| sale_date|  amount|discount|payment_method|customer_id_valid_flag|Validation_Errors|
+-----------+-------+----------+--------+----------+--------+--------+--------------+----------------------+-----------------+
|     106302| 500003|    200741|  300283|2024-01-05|12295.88|   28.49|          card|                  true|                 |
|     103450| 500004|    200834|  300166|2023-12-30|13112.78|    1.11|          card|                  true|                 |
|     104306| 500006|    200840|  300357|2021-06-21| 8159.18|   26.21|          card|                  true|                 |
|     103721| 500007|    200533|  300264|2022-03-28|17560.44|   10.03|          card|                  true|                 |
|     100888| 500009|    200431|  300096|2021-11-13|18021.82|   18.37|           upi|                  true|   

In [0]:
sales_df_validated.show(5)
sales_df_invalid.limit(50).show(truncate=False)

+-----------+-------+----------+--------+----------+--------+--------+--------------+----------------------+-----------------+
|customer_id|sale_id|product_id|store_id| sale_date|  amount|discount|payment_method|customer_id_valid_flag|Validation_Errors|
+-----------+-------+----------+--------+----------+--------+--------+--------------+----------------------+-----------------+
|     106302| 500003|    200741|  300283|2024-01-05|12295.88|   28.49|          card|                  true|                 |
|     103450| 500004|    200834|  300166|2023-12-30|13112.78|    1.11|          card|                  true|                 |
|     104306| 500006|    200840|  300357|2021-06-21| 8159.18|   26.21|          card|                  true|                 |
|     103721| 500007|    200533|  300264|2022-03-28|17560.44|   10.03|          card|                  true|                 |
|     100888| 500009|    200431|  300096|2021-11-13|18021.82|   18.37|           upi|                  true|   

In [0]:
is_datetype = isinstance(customers_df.schema["join_date"].dataType , DateType)
email_regex = "^[a-zA-Z0-9_.'-]+@[A-Za-z]+\.[A-Za-z]{2,}$"
schema_flag_col = lit("Joining Date DataType Invalid") if not is_datetype else lit(None)

customers_df_errors = customers_df.withColumn("Validation Errors" , concat_ws(", " ,

    schema_flag_col,
    when(~col("email").rlike(email_regex) , lit("Invalid Email"))        
                                                                       
)   )

In [0]:
customers_df_validated = customers_df_errors.filter(col("Validation Errors") == "")
customers_df_invalid = customers_df_errors.filter(col("Validation Errors") != "")
customers_df_validated.show()
customers_df_invalid.show()

+-----------+------------------+--------------------+----------+-----------------+
|customer_id|              name|               email| join_date|Validation Errors|
+-----------+------------------+--------------------+----------+-----------------+
|     100001|   Samantha Cooper| scottdiaz@gmail.com|2023-08-03|                 |
|     100003|         John Pugh|elizabethgould@jo...|2022-06-06|                 |
|     100004|   Catherine Avery|     fchoi@gmail.com|2022-04-22|                 |
|     100006|       Donald Hunt|armstrongtravis@y...|2020-05-28|                 |
|     100007|     Shannon Jones|skinneranthony@ho...|2023-07-17|                 |
|     100008|    Kimberly Lewis|cbennett@hotmail.com|2024-10-03|                 |
|     100009|Michelle Rodriguez|  andrew47@gmail.com|2023-04-23|                 |
|     100011|      Samuel Mills|  alicia83@gmail.com|2023-06-10|                 |
|     100012|     Michael Lopez| keith77@hotmail.com|2020-08-15|                 |
|   

In [0]:
w = Window.partitionBy(col("product_id"))
products_df_errors = products_df.withColumn("id_count" , count("product_id").over(w)).withColumn("Validation Errors" , concat_ws(", " ,

array(

when((col("id_count") > 1) , lit("Duplicate Product ID")),
when(col("product_id").isNull() , lit("Invalid Product ID")) ,
when(~col("category").isin("electronics" , "clothing" , "grocery" , "automotive") , lit("Invalid Category")),
when(length(split(col("price").cast("string") , "\\.").getItem(1)) > 2 , lit("Invalid Price"))

)
)                                                                                                                                 
).drop("id_count")

In [0]:
products_df_validated = products_df_errors.filter(col("Validation Errors") == "")
products_df_invalid = products_df_errors.filter(col("Validation Errors") != "")
products_df_validated.show(10)
products_df_invalid.show(10)

+----------+--------+-----------+------+-----------------+
|product_id|    name|   category| price|Validation Errors|
+----------+--------+-----------+------+-----------------+
|    200002|  public|   clothing|214.39|                 |
|    200003|    game|electronics|598.13|                 |
|    200004|  member|electronics|599.65|                 |
|    200005|    call| automotive|988.83|                 |
|    200006|      Mr|   clothing|435.63|                 |
|    200007|response| automotive|548.25|                 |
|    200008|together| automotive|722.85|                 |
|    200009|   about|    grocery|156.16|                 |
|    200010|     cut|    grocery|123.03|                 |
|    200012|    east|    grocery|696.54|                 |
+----------+--------+-----------+------+-----------------+
only showing top 10 rows

+----------+--------+-----------+------+--------------------+
|product_id|    name|   category| price|   Validation Errors|
+----------+--------+---

In [0]:
w1 = Window.partitionBy(col("promotion_id"))
promotions_df_errors = promotions_df.withColumn("promotions_id_count" , count("promotion_id").over(w1)).withColumn("Validation Errors" , 
                                                                                                                  
       concat_ws(", " , array(
           when(col("promotions_id_count") > 1 , lit("Duplicate Promotion ID")),
           when(col("promotion_id").isNull() , lit("Invalid Promotion ID")),
           when((col("discount_rate") < 0) | (col("discount_rate") > 100) , lit("Invalid Discount Rate")),
           when(col("start_date") > col("end_date") , lit("End Date is less than Start Date"))
       ))                                                                                                           
    ).drop("promotions_id_count")

In [0]:
promotions_df_validated = promotions_df_errors.filter(col("Validation Errors") == "")
promotions_df_invalid = promotions_df_errors.filter(col("Validation Errors") != "")
promotions_df_validated.show(10)
promotions_df_invalid.show(10)


+------------+-------------+----------+----------+-----------------+
|promotion_id|discount_rate|start_date|  end_date|Validation Errors|
+------------+-------------+----------+----------+-----------------+
|      400003|        28.71|2024-04-12|2024-11-26|                 |
|      400004|        80.49|2025-03-19|2025-07-12|                 |
|      400005|        48.96|2023-06-27|2023-10-27|                 |
|      400006|        79.15|2024-11-15|2025-07-02|                 |
|      400007|        82.49|2024-10-16|2024-11-20|                 |
|      400008|         1.89|2023-06-25|2024-01-07|                 |
|      400009|         42.2|2025-04-13|2026-02-28|                 |
|      400011|        27.74|2024-03-31|2025-01-05|                 |
|      400012|        82.77|2025-02-04|2025-07-14|                 |
|      400014|        88.56|2024-11-13|2025-10-01|                 |
+------------+-------------+----------+----------+-----------------+
only showing top 10 rows

+-------