## Challenge : Part II - Data Preparation and Cleansing

### Imports and Spark Session initialization

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from pyspark.sql.functions import col, expr, split, trim, to_timestamp, year, month

In [2]:
spark = (SparkSession.builder.appName("SparkSalesAnalytics").getOrCreate())

### DataFrame Preparation

In [3]:
sales_schema = StructType([
    StructField("Order ID", StringType(), True),
    StructField("Product", StringType(), True),
    StructField("Quantity Ordered", StringType(), True),
    StructField("Price Each", StringType(), True),
    StructField("Order Date", StringType(), True),
    StructField("Purchase Address", StringType(), True)
])

sales_file_path = "./data/salesdata"

sales_df = (spark.read.format("csv")
           .option("header", True)
           .schema(sales_schema)
           .load(sales_file_path))

### Remove bad records (columns names recurring in the data)

In [4]:
print("Count of rows in original data frame - ", sales_df.count())
print("Count of rows in having 'Order ID' in their Order IDs data frame - ",sales_df.filter(col("Order ID") == "Order ID").count())
cleaned_sales_df = sales_df.filter(col("Order ID") != "Order ID")
print("Count of rows in cleaned data frame - ",cleaned_sales_df.count())

Count of rows in original data frame -  186850
Count of rows in having 'Order ID' in their Order IDs data frame -  355
Count of rows in cleaned data frame -  185950


### Extract City and State from Address into New Columns

In [5]:
new_col_sales_df = (cleaned_sales_df
                    .withColumn("State",trim(split(split(col("Purchase Address"),",").getItem(2)," ").getItem(1)))
                    .withColumn("City",trim(split(col("Purchase Address"),",").getItem(1)))
                   )
new_col_sales_df.show(5)

+--------+--------------------+----------------+----------+--------------+--------------------+-----+-------------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|State|         City|
+--------+--------------------+----------------+----------+--------------+--------------------+-----+-------------+
|  295665|  Macbook Pro Laptop|               1|      1700|12/30/19 00:01|136 Church St, Ne...|   NY|New York City|
|  295666|  LG Washing Machine|               1|     600.0|12/29/19 07:03|562 2nd St, New Y...|   NY|New York City|
|  295667|USB-C Charging Cable|               1|     11.95|12/12/19 18:21|277 Main St, New ...|   NY|New York City|
|  295668|    27in FHD Monitor|               1|    149.99|12/22/19 15:13|410 6th St, San F...|   CA|San Francisco|
|  295669|USB-C Charging Cable|               1|     11.95|12/18/19 12:38|43 Hill St, Atlan...|   GA|      Atlanta|
+--------+--------------------+----------------+----------+-------------

In [6]:
upd_fld_sales_df = (new_col_sales_df
                        .withColumn("OrderID", new_col_sales_df["Order ID"].cast(IntegerType()))
                        .withColumn("Product", new_col_sales_df["Product"].cast(StringType()))
                        .withColumn("Quantity", new_col_sales_df["Quantity Ordered"].cast(IntegerType()))
                        .withColumn("Price", new_col_sales_df["Price Each"].cast(FloatType()))
                        .withColumn("OrderDate", to_timestamp(new_col_sales_df["Order Date"], "MM/dd/yy HH:mm"))
                        .withColumn("StoreAddress", new_col_sales_df["Purchase Address"].cast(StringType()))
                        .drop("Order ID")
                        .drop("Quantity Ordered")
                        .drop("Price Each")
                        .drop("Order Date")
                        .drop("Purchase Address")
                   )
upd_fld_sales_df.printSchema()
upd_fld_sales_df.show(5, truncate=False)

root
 |-- Product: string (nullable = true)
 |-- State: string (nullable = true)
 |-- City: string (nullable = true)
 |-- OrderID: integer (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Price: float (nullable = true)
 |-- OrderDate: timestamp (nullable = true)
 |-- StoreAddress: string (nullable = true)

+--------------------+-----+-------------+-------+--------+------+-------------------+--------------------------------------+
|Product             |State|City         |OrderID|Quantity|Price |OrderDate          |StoreAddress                          |
+--------------------+-----+-------------+-------+--------+------+-------------------+--------------------------------------+
|Macbook Pro Laptop  |NY   |New York City|295665 |1       |1700.0|2019-12-30 00:01:00|136 Church St, New York City, NY 10001|
|LG Washing Machine  |NY   |New York City|295666 |1       |600.0 |2019-12-29 07:03:00|562 2nd St, New York City, NY 10001   |
|USB-C Charging Cable|NY   |New York City|29566

### Add New Columns: ReportYear and Month

In [7]:
year_and_month_sales_df = (upd_fld_sales_df
                               .withColumn("ReportYear", year(upd_fld_sales_df["OrderDate"]))
                               .withColumn("Month", month(upd_fld_sales_df["OrderDate"]))
                          )
year_and_month_sales_df.printSchema()
year_and_month_sales_df.show(5, truncate=False)

root
 |-- Product: string (nullable = true)
 |-- State: string (nullable = true)
 |-- City: string (nullable = true)
 |-- OrderID: integer (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Price: float (nullable = true)
 |-- OrderDate: timestamp (nullable = true)
 |-- StoreAddress: string (nullable = true)
 |-- ReportYear: integer (nullable = true)
 |-- Month: integer (nullable = true)

+--------------------+-----+-------------+-------+--------+------+-------------------+--------------------------------------+----------+-----+
|Product             |State|City         |OrderID|Quantity|Price |OrderDate          |StoreAddress                          |ReportYear|Month|
+--------------------+-----+-------------+-------+--------+------+-------------------+--------------------------------------+----------+-----+
|Macbook Pro Laptop  |NY   |New York City|295665 |1       |1700.0|2019-12-30 00:01:00|136 Church St, New York City, NY 10001|2019      |12   |
|LG Washing Machine  |NY

### Find and remove all rows with any null columns

In [8]:
# The filter in the first step automatically removed nulls because of how spark filter works
# Read - https://stackoverflow.com/a/49115058 for more info
no_nulls_sales_df = year_and_month_sales_df.na.drop("any")
print("Count of rows before rows containing nulls are removed - ", year_and_month_sales_df.count())
print("Count of rows after rows containing nulls are removed - ", no_nulls_sales_df.count())
no_nulls_sales_df.filter(col("Order ID").isNull() == True).show()
no_nulls_sales_df.describe('Product', 'State', 'City', 'OrderID', 'Quantity', 'Price', 'OrderDate', 'StoreAddress', 'ReportYear', 'Month').show()

Count of rows before rows containing nulls are removed -  185950
Count of rows after rows containing nulls are removed -  185950
+-------+-----+----+-------+--------+-----+---------+------------+----------+-----+
|Product|State|City|OrderID|Quantity|Price|OrderDate|StoreAddress|ReportYear|Month|
+-------+-----+----+-------+--------+-----+---------+------------+----------+-----+
+-------+-----+----+-------+--------+-----+---------+------------+----------+-----+

+-------+------------+------+-------+-----------------+------------------+-----------------+--------------------+--------------------+-----------------+
|summary|     Product| State|   City|          OrderID|          Quantity|            Price|        StoreAddress|          ReportYear|            Month|
+-------+------------+------+-------+-----------------+------------------+-----------------+--------------------+--------------------+-----------------+
|  count|      185950|185950| 185950|           185950|            185950| 

### Write Final DataFrame to Parquet

In [9]:
final_sales_df = no_nulls_sales_df.select("OrderID", "Product", "Quantity", "Price", "OrderDate", "StoreAddress", "City", "State", "ReportYear", "Month")
final_sales_df.printSchema()
final_sales_df.show(truncate=False)

root
 |-- OrderID: integer (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Price: float (nullable = true)
 |-- OrderDate: timestamp (nullable = true)
 |-- StoreAddress: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- ReportYear: integer (nullable = true)
 |-- Month: integer (nullable = true)

+-------+--------------------------+--------+------+-------------------+-----------------------------------------+-------------+-----+----------+-----+
|OrderID|Product                   |Quantity|Price |OrderDate          |StoreAddress                             |City         |State|ReportYear|Month|
+-------+--------------------------+--------+------+-------------------+-----------------------------------------+-------------+-----+----------+-----+
|295665 |Macbook Pro Laptop        |1       |1700.0|2019-12-30 00:01:00|136 Church St, New York City, NY 10001   |New York City|NY   |2019  

In [10]:
(final_sales_df.write.option("header", True)
                     .partitionBy("ReportYear","Month")
                     .mode("overwrite")
                     .parquet("./data/output/sales")
)