In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark=SparkSession.builder\
    .appName("CleaningSalesData")\
        .getOrCreate()
#read csv file sales4.csv
raw_df=spark.read.format("csv")\
    .option("inferschema","true")\
    .option("header","true")\
    .load("../input/sales4.csv")
raw_df.show()

+--------+---------------+---------+------------+--------+-------+
|order_id| customer_name | product | order_date | amount | region|
+--------+---------------+---------+------------+--------+-------+
|       1|         Alice |  Laptop | 2023-01-15 |  55000 |  North|
|       2|            Bob|   Mobile|  15-02-2023|   30000|  South|
|       3|         Carol |  Tablet | 2023/03/10 |   NULL |   East|
|       4|          David|   Laptop| 2023-13-01 |  65000 |   West|
|       5|           Eve |  Mobile |  2023-04-05|  ERROR |  North|
|       6|          Frank|   Laptop| 05-05-2023 |   72000|  South|
|       7|          Grace|   Tablet|  2023-06-18|   48000|   East|
|       7|          Grace|   Tablet|  2023-06-18|   48000|   East|
|       8|         Heidi |  Mobile |            |  50000 |   West|
|       9|           Ivan|   Laptop|  2023-07-22|  85,000|  North|
|      10|          Judy |  Tablet | 2023-08-30 |        |  South|
+--------+---------------+---------+------------+--------+----

In [10]:
#clean column names (trim, lower)
col_trim_df=raw_df.toDF(*[c.strip().lower() for c in raw_df.columns]) # all columns names are already in lower case

#Trim all string columns
trim_df=col_trim_df.withColumn("order_id",trim(col_trim_df["order_id"]))\
    .withColumn("customer_name",trim(col_trim_df["customer_name"]))\
        .withColumn("product",trim(col_trim_df["product"]))\
            .withColumn("order_date",trim(col_trim_df["order_date"]))\
                .withColumn("amount",trim(col_trim_df["amount"]))\
                    .withColumn("region",trim(col_trim_df["region"]))

#casting column data types
b = trim_df.withColumn("amount", regexp_replace(col("amount"),",", ""))\
    .withColumn("order_id",expr("try_cast(order_id as int)"))\
    .withColumn("amount",expr("try_cast(amount as double)"))\
    .withColumn("amount", when(col("amount").isNotNull(),col("amount")).otherwise(None))
         #filling blank and currpted cell by string 'null', rekoving 'error' word
b.printSchema()
b.show()

root
 |-- order_id: integer (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- product: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- region: string (nullable = true)

+--------+-------------+-------+----------+-------+------+
|order_id|customer_name|product|order_date| amount|region|
+--------+-------------+-------+----------+-------+------+
|       1|        Alice| Laptop|2023-01-15|55000.0| North|
|       2|          Bob| Mobile|15-02-2023|30000.0| South|
|       3|        Carol| Tablet|2023/03/10|   NULL|  East|
|       4|        David| Laptop|2023-13-01|65000.0|  West|
|       5|          Eve| Mobile|2023-04-05|   NULL| North|
|       6|        Frank| Laptop|05-05-2023|72000.0| South|
|       7|        Grace| Tablet|2023-06-18|48000.0|  East|
|       7|        Grace| Tablet|2023-06-18|48000.0|  East|
|       8|        Heidi| Mobile|          |50000.0|  West|
|       9|         Ivan| Laptop|2023-07-22|

In [11]:
#parsing date
dfdate=b.withColumn("order_date",
                    coalesce(
                        to_date(col("order_date"),"yyyy-MM-dd"),
                        to_date(col("order_date"),"yyyy-dd-MM"),
                        to_date(col("order_date"),"dd-MM-yyyy"),
                        to_date(col("order_date"),"dd/MM/yyyy"),
                        to_date(col("order_date"),"MM/dd/yyyy"),
                        to_date(col("order_date"),"yyyy/MM/dd"),
                        to_date(col("order_date"),"MMM dd, yyyy")
                    ))
dfdate.printSchema()
dfdate.show()

root
 |-- order_id: integer (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- product: string (nullable = true)
 |-- order_date: date (nullable = true)
 |-- amount: double (nullable = true)
 |-- region: string (nullable = true)

+--------+-------------+-------+----------+-------+------+
|order_id|customer_name|product|order_date| amount|region|
+--------+-------------+-------+----------+-------+------+
|       1|        Alice| Laptop|2023-01-15|55000.0| North|
|       2|          Bob| Mobile|2023-02-15|30000.0| South|
|       3|        Carol| Tablet|2023-03-10|   NULL|  East|
|       4|        David| Laptop|2023-01-13|65000.0|  West|
|       5|          Eve| Mobile|2023-04-05|   NULL| North|
|       6|        Frank| Laptop|2023-05-05|72000.0| South|
|       7|        Grace| Tablet|2023-06-18|48000.0|  East|
|       7|        Grace| Tablet|2023-06-18|48000.0|  East|
|       8|        Heidi| Mobile|      NULL|50000.0|  West|
|       9|         Ivan| Laptop|2023-07-22|85

In [None]:
#remove duplicates
dropdupdf=dfdate.dropDuplicates()
dropdupdf.show()

+--------+-------------+-------+----------+-------+------+
|order_id|customer_name|product|order_date| amount|region|
+--------+-------------+-------+----------+-------+------+
|       9|         Ivan| Laptop|2023-07-22|85000.0| North|
|       2|          Bob| Mobile|2023-02-15|30000.0| South|
|       7|        Grace| Tablet|2023-06-18|48000.0|  East|
|       4|        David| Laptop|2023-01-13|65000.0|  West|
|       6|        Frank| Laptop|2023-05-05|72000.0| South|
|       1|        Alice| Laptop|2023-01-15|55000.0| North|
|       8|        Heidi| Mobile|      NULL|50000.0|  West|
|       3|        Carol| Tablet|2023-03-10|   NULL|  East|
|       5|          Eve| Mobile|2023-04-05|   NULL| North|
|      10|         Judy| Tablet|2023-08-30|   NULL| South|
+--------+-------------+-------+----------+-------+------+



In [22]:
#drop null rows make seperate file
#filter rows with nulls (or invalid values) into one DataFrame, and valid rows into another.

#rows with nulls in critical fields
error_df=dropdupdf.filter(
    col("order_date").isNull() | col("amount").isNull())\
        .orderBy("order_id")

#clean rows (no nulls in critical fields)
clean_df=dropdupdf.filter(
    col("order_date").isNotNull() & col("amount").isNotNull())\
        .orderBy("order_id")

#printiting error_df and clean_df
print("Null record df:-")
error_df.show()
print("Cleaned records df:-")
clean_df.show()

Null record df:-
+--------+-------------+-------+----------+-------+------+
|order_id|customer_name|product|order_date| amount|region|
+--------+-------------+-------+----------+-------+------+
|       3|        Carol| Tablet|2023-03-10|   NULL|  East|
|       5|          Eve| Mobile|2023-04-05|   NULL| North|
|       8|        Heidi| Mobile|      NULL|50000.0|  West|
|      10|         Judy| Tablet|2023-08-30|   NULL| South|
+--------+-------------+-------+----------+-------+------+

Cleaned records df:-
+--------+-------------+-------+----------+-------+------+
|order_id|customer_name|product|order_date| amount|region|
+--------+-------------+-------+----------+-------+------+
|       1|        Alice| Laptop|2023-01-15|55000.0| North|
|       2|          Bob| Mobile|2023-02-15|30000.0| South|
|       4|        David| Laptop|2023-01-13|65000.0|  West|
|       6|        Frank| Laptop|2023-05-05|72000.0| South|
|       7|        Grace| Tablet|2023-06-18|48000.0|  East|
|       9|       

In [24]:
#clean df schema
print("Schema of cleaned data:-")
clean_df.printSchema()

Schema of cleaned data:-
root
 |-- order_id: integer (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- product: string (nullable = true)
 |-- order_date: date (nullable = true)
 |-- amount: double (nullable = true)
 |-- region: string (nullable = true)



In [None]:
#total sales per day.
#group by order_date and calculate total sales
sales_per_day=clean_df.groupBy("order_date")\
    .agg(sum(col("amount")).alias("total_sales"))\
        .orderBy("order_date")
print("Sales per day:-")
sales_per_day.show()

Sales per day:-
+----------+-----------+
|order_date|total_sales|
+----------+-----------+
|2023-01-13|    65000.0|
|2023-01-15|    55000.0|
|2023-02-15|    30000.0|
|2023-05-05|    72000.0|
|2023-06-18|    48000.0|
|2023-07-22|    85000.0|
+----------+-----------+



In [None]:
#top 5 product (by revenue)
top_product_df=clean_df.groupBy("product")\
    .agg(sum(col("amount")).alias("top_product"))\
        .orderBy(col("top_product").desc())\
            .limit(5)
print("Top selling products:-")
top_product_df.show()

Top selling products:-
+-------+-----------+
|product|top_product|
+-------+-----------+
| Laptop|   277000.0|
| Tablet|    48000.0|
| Mobile|    30000.0|
+-------+-----------+



In [None]:
#monthly revenue
#extract year-month 
year_month_df=clean_df.withColumn("year_month",concat_ws("-", year(col("order_date")),month(col("order_date"))))

#group by year_month and sum_revenue
sum_revenue_df=year_month_df.groupBy("year_month")\
    .agg(sum("amount").alias("sum_revenue"))\
        .orderBy("year_month")
print("Monthly revenue summary:-")
sum_revenue_df.show()

Monthly revenue summary:-
+----------+-----------+
|year_month|sum_revenue|
+----------+-----------+
|    2023-1|   120000.0|
|    2023-2|    30000.0|
|    2023-5|    72000.0|
|    2023-6|    48000.0|
|    2023-7|    85000.0|
+----------+-----------+



In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

#RANK PRODUCTS (WINDOW FUNCTION)
#region + product level revenue
region_product_group_df=clean_df.groupBy("region","product")\
    .agg(sum(col("amount")).alias("total_sale"))

#define windows function
windowspec=Window.partitionBy("region").orderBy(col("total_sale").desc())

#appy row_number
ranked=region_product_group_df.withColumn("rank",row_number().over(windowspec))

#filter top product per region
top_product_per_region_df=ranked.filter(col("rank")==1)

#show result
top_product_per_region_df.show()

+------+-------+----------+----+
|region|product|total_sale|rank|
+------+-------+----------+----+
|  East| Tablet|   48000.0|   1|
| North| Laptop|  140000.0|   1|
| South| Laptop|   72000.0|   1|
|  West| Laptop|   65000.0|   1|
+------+-------+----------+----+



In [None]:
#export and delivery
#save clean_df in outpute folder
clean_df.coalesce(1)\
    .write\
    .mode("overwrite")\
        .option("header","true")\
            .csv("../output/cleaned_data/")

In [None]:
#save report tables (csv) in output folder
#daily sale report
sales_per_day.coalesce(1)\
    .write\
        .mode("overwrite")\
            .option("header","true")\
                .csv("../output/reports/daily_sales/")

#monthly sale reports                
sum_revenue_df.coalesce(1)\
    .write\
        .mode("overwrite")\
            .option("header","true")\
                .csv("../output/reports/monthly_revenue/")

#top product sale report region wise
top_product_per_region_df.coalesce(1)\
    .write\
        .mode("overwrite")\
            .option("header","true")\
                .csv("../output/reports/top_product_per_region/")

#top product 
top_product_df.coalesce(1)\
    .write\
        .mode("overwrite")\
        .option("header","true")\
            .csv("../output/reports/top_products/")

In [42]:
#saving clean data in parquet file
clean_df.write\
    .mode("overwrite")\
        .parquet("../output/clean_data_in_parquet/")