# Challenge Part II - Data Prepartion and Cleansing

you ar required to prepare and clean the data
Remove Bad Records
If you query the records from the sales dataframe, you would find bad data 
1. Remove Bad Records
2. Extract City and State form Address into New Columns
3. Rename and Change DataTypes of DataFrame
4. Add New Columns: Report Year and Month
5. Find and Remove Rows with "any" null columns
6. Write Final DataFrame to Parquet

with data frame to parquet directory "data/output/sales" partitioned by ReportYear and Month


In [1]:
from pyspark.sql import SparkSession

In [2]:
from pyspark.sql.types import StructType, StructField, StringType

In [3]:
spark = SparkSession.builder.appName("SalesAnalytics").getOrCreate()

In [4]:
schema = StructType([
    StructField("Order ID", StringType(), True),
    StructField("Product", StringType(), True),
    StructField("Quantity Ordered", StringType(), True),
    StructField("Price Each", StringType(), True),
    StructField("Order Date", StringType(), True),
    StructField("Purchase Address", StringType(), True)
])

In [5]:
sales_data_fpath = "/home/shyam/NiceSoftwareSolutions/ApacheSpark3_for_Data_Engineering_and_Analytics_with_Python/Structured_API-Spark_DataFrame/salesdata"

In [6]:
sales_raw_df = (spark.read.format("csv")
               .option("header",True)
               .schema(schema)
               .load(sales_data_fpath))

In [7]:
sales_raw_df.show(10)

+--------+--------------------+----------------+----------+--------------+--------------------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|
+--------+--------------------+----------------+----------+--------------+--------------------+
|  295665|  Macbook Pro Laptop|               1|      1700|12/30/19 00:01|136 Church St, Ne...|
|  295666|  LG Washing Machine|               1|     600.0|12/29/19 07:03|562 2nd St, New Y...|
|  295667|USB-C Charging Cable|               1|     11.95|12/12/19 18:21|277 Main St, New ...|
|  295668|    27in FHD Monitor|               1|    149.99|12/22/19 15:13|410 6th St, San F...|
|  295669|USB-C Charging Cable|               1|     11.95|12/18/19 12:38|43 Hill St, Atlan...|
|  295670|AA Batteries (4-p...|               1|      3.84|12/31/19 22:58|200 Jefferson St,...|
|  295671|USB-C Charging Cable|               1|     11.95|12/16/19 15:10|928 12th St, Port...|
|  295672|USB-C Charging Cable|         

In [8]:
sales_raw_df.printSchema()

root
 |-- Order ID: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity Ordered: string (nullable = true)
 |-- Price Each: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Purchase Address: string (nullable = true)



# Remove Null Row and Bad Records

### Let's remove null and bad rows

In [9]:
from pyspark.sql.functions import col

In [10]:
sales_raw_df.filter(col("Order ID").isNull() == True).show(10)

+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+
|    null|   null|            null|      null|      null|            null|
|    null|   null|            null|      null|      null|            null|
|    null|   null|            null|      null|      null|            null|
|    null|   null|            null|      null|      null|            null|
|    null|   null|            null|      null|      null|            null|
|    null|   null|            null|      null|      null|            null|
|    null|   null|            null|      null|      null|            null|
|    null|   null|            null|      null|      null|            null|
|    null|   null|            null|      null|      null|            null|
|    null|   null|            null|      null|      null|            null|
+--------+-------+-------

In [11]:
sales_raw_df = sales_raw_df.na.drop("any")

In [12]:
sales_raw_df.filter(col("Order ID").isNull() == True).show()

+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+
+--------+-------+----------------+----------+----------+----------------+



In [13]:
sales_raw_df.describe("Order ID","Product","Quantity Ordered","Price Each","Order Date","Purchase Address").show()

+-------+------------------+------------+------------------+------------------+--------------+--------------------+
|summary|          Order ID|     Product|  Quantity Ordered|        Price Each|    Order Date|    Purchase Address|
+-------+------------------+------------+------------------+------------------+--------------+--------------------+
|  count|            186305|      186305|            186305|            186305|        186305|              186305|
|   mean| 230417.5693788653|        null|1.1243828986286637|184.39973476747707|          null|                null|
| stddev|51512.737109995265|        null|0.4427926240286704| 332.7313298843439|          null|                null|
|    min|            141234|20in Monitor|                 1|            109.99|01/01/19 03:07|1 11th St, Atlant...|
|    max|          Order ID|      iPhone|  Quantity Ordered|        Price Each|    Order Date|    Purchase Address|
+-------+------------------+------------+------------------+------------

In [14]:
sales_raw_df.filter(col("Order ID") == "Order ID").show(10)

+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+-------

In [15]:
sales_temp_df = sales_raw_df.distinct()

In [16]:
sales_temp_df.filter(col("Order ID") == "Order ID").show(10)

+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+



In [17]:
sales_temp_df = sales_temp_df.filter(col("Order ID")!= "Order ID")

In [18]:
sales_temp_df.filter(col("Order ID") == "Order ID").show()

+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+
+--------+-------+----------------+----------+----------+----------------+



In [19]:
sales_temp_df.show(10,truncate = False)

+--------+--------------------------+----------------+----------+--------------+----------------------------------------+
|Order ID|Product                   |Quantity Ordered|Price Each|Order Date    |Purchase Address                        |
+--------+--------------------------+----------------+----------+--------------+----------------------------------------+
|295900  |AA Batteries (4-pack)     |1               |3.84      |12/27/19 18:56|283 Washington St, Boston, MA 02215     |
|295923  |Lightning Charging Cable  |1               |14.95     |12/21/19 13:41|968 8th St, Austin, TX 73301            |
|295991  |Lightning Charging Cable  |1               |14.95     |12/15/19 20:16|857 Center St, Boston, MA 02215         |
|296076  |Macbook Pro Laptop        |1               |1700      |12/03/19 15:19|679 Chestnut St, San Francisco, CA 94016|
|297015  |AAA Batteries (4-pack)    |3               |2.99      |12/13/19 08:43|58 Dogwood St, San Francisco, CA 94016  |
|297237  |Bose SoundSpor

In [20]:
sales_temp_df.describe("Order ID","Product","Quantity Ordered","Price Each","Order Date","Purchase Address").show()

+-------+------------------+------------+------------------+------------------+--------------+--------------------+
|summary|          Order ID|     Product|  Quantity Ordered|        Price Each|    Order Date|    Purchase Address|
+-------+------------------+------------+------------------+------------------+--------------+--------------------+
|  count|            185686|      185686|            185686|            185686|        185686|              185686|
|   mean|230411.37622653297|        null|1.1245435843305365|184.51925546352427|          null|                null|
| stddev| 51511.71718332086|        null|0.4430687383832874| 332.8438383900525|          null|                null|
|    min|            141234|20in Monitor|                 1|            109.99|01/01/19 03:07|1 11th St, Atlant...|
|    max|            319670|      iPhone|                 9|            999.99|12/31/19 23:53|999 Wilson St, Sa...|
+-------+------------------+------------+------------------+------------

### Let's extract City and State from the Purchase Address

The python split method splits a string into a list, however, Spark does have a Split function too

In [21]:
from pyspark.sql.functions import split

In [22]:
sales_temp_df.select("Purchase Address").show(10, False)

+----------------------------------------+
|Purchase Address                        |
+----------------------------------------+
|283 Washington St, Boston, MA 02215     |
|968 8th St, Austin, TX 73301            |
|857 Center St, Boston, MA 02215         |
|679 Chestnut St, San Francisco, CA 94016|
|58 Dogwood St, San Francisco, CA 94016  |
|355 Park St, Boston, MA 02215           |
|542 9th St, New York City, NY 10001     |
|708 Walnut St, New York City, NY 10001  |
|538 Hickory St, San Francisco, CA 94016 |
|199 8th St, San Francisco, CA 94016     |
+----------------------------------------+
only showing top 10 rows



In [23]:
sales_temp_df.select("Purchase Address",split(col("Purchase Address"),",")).show(10, False)

+----------------------------------------+--------------------------------------------+
|Purchase Address                        |split(Purchase Address, ,, -1)              |
+----------------------------------------+--------------------------------------------+
|283 Washington St, Boston, MA 02215     |[283 Washington St,  Boston,  MA 02215]     |
|968 8th St, Austin, TX 73301            |[968 8th St,  Austin,  TX 73301]            |
|857 Center St, Boston, MA 02215         |[857 Center St,  Boston,  MA 02215]         |
|679 Chestnut St, San Francisco, CA 94016|[679 Chestnut St,  San Francisco,  CA 94016]|
|58 Dogwood St, San Francisco, CA 94016  |[58 Dogwood St,  San Francisco,  CA 94016]  |
|355 Park St, Boston, MA 02215           |[355 Park St,  Boston,  MA 02215]           |
|542 9th St, New York City, NY 10001     |[542 9th St,  New York City,  NY 10001]     |
|708 Walnut St, New York City, NY 10001  |[708 Walnut St,  New York City,  NY 10001]  |
|538 Hickory St, San Francisco, 

In [24]:
sales_temp_df.select("Purchase Address",split(col("Purchase Address"),",").getItem(1)).show(10, False)

+----------------------------------------+---------------------------------+
|Purchase Address                        |split(Purchase Address, ,, -1)[1]|
+----------------------------------------+---------------------------------+
|283 Washington St, Boston, MA 02215     | Boston                          |
|968 8th St, Austin, TX 73301            | Austin                          |
|857 Center St, Boston, MA 02215         | Boston                          |
|679 Chestnut St, San Francisco, CA 94016| San Francisco                   |
|58 Dogwood St, San Francisco, CA 94016  | San Francisco                   |
|355 Park St, Boston, MA 02215           | Boston                          |
|542 9th St, New York City, NY 10001     | New York City                   |
|708 Walnut St, New York City, NY 10001  | New York City                   |
|538 Hickory St, San Francisco, CA 94016 | San Francisco                   |
|199 8th St, San Francisco, CA 94016     | San Francisco                   |

In [25]:
sales_temp_df.select("Purchase Address",split(col("Purchase Address"),",").getItem(2)).show(10, False)

+----------------------------------------+---------------------------------+
|Purchase Address                        |split(Purchase Address, ,, -1)[2]|
+----------------------------------------+---------------------------------+
|283 Washington St, Boston, MA 02215     | MA 02215                        |
|968 8th St, Austin, TX 73301            | TX 73301                        |
|857 Center St, Boston, MA 02215         | MA 02215                        |
|679 Chestnut St, San Francisco, CA 94016| CA 94016                        |
|58 Dogwood St, San Francisco, CA 94016  | CA 94016                        |
|355 Park St, Boston, MA 02215           | MA 02215                        |
|542 9th St, New York City, NY 10001     | NY 10001                        |
|708 Walnut St, New York City, NY 10001  | NY 10001                        |
|538 Hickory St, San Francisco, CA 94016 | CA 94016                        |
|199 8th St, San Francisco, CA 94016     | CA 94016                        |

In [26]:
sales_temp_df.select("Purchase Address",split(split(col("Purchase Address"),",").getItem(2),' ')).show(10,False)

+----------------------------------------+-----------------------------------------------+
|Purchase Address                        |split(split(Purchase Address, ,, -1)[2],  , -1)|
+----------------------------------------+-----------------------------------------------+
|283 Washington St, Boston, MA 02215     |[, MA, 02215]                                  |
|968 8th St, Austin, TX 73301            |[, TX, 73301]                                  |
|857 Center St, Boston, MA 02215         |[, MA, 02215]                                  |
|679 Chestnut St, San Francisco, CA 94016|[, CA, 94016]                                  |
|58 Dogwood St, San Francisco, CA 94016  |[, CA, 94016]                                  |
|355 Park St, Boston, MA 02215           |[, MA, 02215]                                  |
|542 9th St, New York City, NY 10001     |[, NY, 10001]                                  |
|708 Walnut St, New York City, NY 10001  |[, NY, 10001]                                  |

In [27]:
sales_temp_df = sales_temp_df.withColumn("City",split(col("Purchase Address"),",").
                                         getItem(1)).withColumn("State",split(split(col("Purchase Address"),",").getItem(2),' ').getItem(1))

In [28]:
sales_temp_df.show(10, False)

+--------+--------------------------+----------------+----------+--------------+----------------------------------------+--------------+-----+
|Order ID|Product                   |Quantity Ordered|Price Each|Order Date    |Purchase Address                        |City          |State|
+--------+--------------------------+----------------+----------+--------------+----------------------------------------+--------------+-----+
|295900  |AA Batteries (4-pack)     |1               |3.84      |12/27/19 18:56|283 Washington St, Boston, MA 02215     | Boston       |MA   |
|295923  |Lightning Charging Cable  |1               |14.95     |12/21/19 13:41|968 8th St, Austin, TX 73301            | Austin       |TX   |
|295991  |Lightning Charging Cable  |1               |14.95     |12/15/19 20:16|857 Center St, Boston, MA 02215         | Boston       |MA   |
|296076  |Macbook Pro Laptop        |1               |1700      |12/03/19 15:19|679 Chestnut St, San Francisco, CA 94016| San Francisco|CA   |

## Let's change some datatype, rename a few column, drop some solumns,and add new Columns

### Rename and change Data Types

In [29]:
from pyspark.sql.functions import to_timestamp

In [30]:
from pyspark.sql.types import IntegerType, FloatType

In [31]:
sales_temp_df = sales_temp_df.withColumn("OrderID", col("Order ID").cast(IntegerType())).withColumn("Quantity",col("Quantity Ordered").cast(IntegerType())).withColumn("Price",col("Price Each").cast(FloatType())).withColumn("OrderDate", to_timestamp(col("Order Date"), "MM/dd/yy HH:mm")).withColumnRenamed("purchase Address","StoreAddress").drop("Order ID").drop("Quantity Ordered").drop("Price Each").drop("Purchase Address")


In [32]:
sales_temp_df.show(10,False)

+--------------------------+--------------+----------------------------------------+--------------+-----+-------+--------+------+-------------------+
|Product                   |Order Date    |StoreAddress                            |City          |State|OrderID|Quantity|Price |OrderDate          |
+--------------------------+--------------+----------------------------------------+--------------+-----+-------+--------+------+-------------------+
|AA Batteries (4-pack)     |12/27/19 18:56|283 Washington St, Boston, MA 02215     | Boston       |MA   |295900 |1       |3.84  |2019-12-27 18:56:00|
|Lightning Charging Cable  |12/21/19 13:41|968 8th St, Austin, TX 73301            | Austin       |TX   |295923 |1       |14.95 |2019-12-21 13:41:00|
|Lightning Charging Cable  |12/15/19 20:16|857 Center St, Boston, MA 02215         | Boston       |MA   |295991 |1       |14.95 |2019-12-15 20:16:00|
|Macbook Pro Laptop        |12/03/19 15:19|679 Chestnut St, San Francisco, CA 94016| San Francisco|C

In [33]:
sales_temp_df.printSchema()

root
 |-- Product: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- StoreAddress: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- OrderID: integer (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Price: float (nullable = true)
 |-- OrderDate: timestamp (nullable = true)



# Add New Columns and Month

In [34]:
from pyspark.sql.functions import year, month

In [35]:
sales_temp_df = sales_temp_df.withColumn("ReportYear", year(col("OrderDate"))).withColumn("Month",month(col("OrderDate")))

In [36]:
sales_temp_df.show(10)

+--------------------+--------------+--------------------+--------------+-----+-------+--------+------+-------------------+----------+-----+
|             Product|    Order Date|        StoreAddress|          City|State|OrderID|Quantity| Price|          OrderDate|ReportYear|Month|
+--------------------+--------------+--------------------+--------------+-----+-------+--------+------+-------------------+----------+-----+
|AA Batteries (4-p...|12/27/19 18:56|283 Washington St...|        Boston|   MA| 295900|       1|  3.84|2019-12-27 18:56:00|      2019|   12|
|Lightning Chargin...|12/21/19 13:41|968 8th St, Austi...|        Austin|   TX| 295923|       1| 14.95|2019-12-21 13:41:00|      2019|   12|
|Lightning Chargin...|12/15/19 20:16|857 Center St, Bo...|        Boston|   MA| 295991|       1| 14.95|2019-12-15 20:16:00|      2019|   12|
|  Macbook Pro Laptop|12/03/19 15:19|679 Chestnut St, ...| San Francisco|   CA| 296076|       1|1700.0|2019-12-03 15:19:00|      2019|   12|
|AAA Batterie

# Let's write the final data frame into a partioned parquet file

In [37]:
sales_final_df = sales_temp_df.select("OrderID","product","Quantity","Price","OrderDate","StoreAddress","City","State","ReportYear","Month")

In [38]:
sales_final_df.show(10)

+-------+--------------------+--------+------+-------------------+--------------------+--------------+-----+----------+-----+
|OrderID|             product|Quantity| Price|          OrderDate|        StoreAddress|          City|State|ReportYear|Month|
+-------+--------------------+--------+------+-------------------+--------------------+--------------+-----+----------+-----+
| 295900|AA Batteries (4-p...|       1|  3.84|2019-12-27 18:56:00|283 Washington St...|        Boston|   MA|      2019|   12|
| 295923|Lightning Chargin...|       1| 14.95|2019-12-21 13:41:00|968 8th St, Austi...|        Austin|   TX|      2019|   12|
| 295991|Lightning Chargin...|       1| 14.95|2019-12-15 20:16:00|857 Center St, Bo...|        Boston|   MA|      2019|   12|
| 296076|  Macbook Pro Laptop|       1|1700.0|2019-12-03 15:19:00|679 Chestnut St, ...| San Francisco|   CA|      2019|   12|
| 297015|AAA Batteries (4-...|       3|  2.99|2019-12-13 08:43:00|58 Dogwood St, Sa...| San Francisco|   CA|      2019

In [39]:
output_path = "/home/shyam/NiceSoftwareSolutions/ApacheSpark3_for_Data_Engineering_and_Analytics_with_Python/Structured_API-Spark_DataFrame/output/sales"
sales_final_df.write.mode("overwrite").partitionBy("ReportYear","Month").parquet(output_path)