### Creating pyspark session

In [None]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder.appName("FirstSaprkSession").getOrCreate())

In [4]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema_df = StructType([StructField('Order ID', StringType(), True),
							StructField('Product', StringType(), True),
							StructField('Quantity Ordered', StringType(), True),
                             StructField('Price Each', StringType(), True),
							StructField('Order Date', StringType(), True),
							StructField('Purchase Address', StringType(), True)
							])

### Reading CSV files in pysprk

In [62]:
sales_df = spark.read.format('CSV')\
	.option("header", True)\
	.schema(schema_df).load('salesdata/')

In [63]:
sales_df.show(5)

+--------+--------------------+----------------+----------+--------------+--------------------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|
+--------+--------------------+----------------+----------+--------------+--------------------+
|  295665|  Macbook Pro Laptop|               1|      1700|12/30/19 00:01|136 Church St, Ne...|
|  295666|  LG Washing Machine|               1|     600.0|12/29/19 07:03|562 2nd St, New Y...|
|  295667|USB-C Charging Cable|               1|     11.95|12/12/19 18:21|277 Main St, New ...|
|  295668|    27in FHD Monitor|               1|    149.99|12/22/19 15:13|410 6th St, San F...|
|  295669|USB-C Charging Cable|               1|     11.95|12/18/19 12:38|43 Hill St, Atlan...|
+--------+--------------------+----------------+----------+--------------+--------------------+
only showing top 5 rows



### Importing pyspark functions and checking for Bad data 

Checking bad data by "final_sales_df.describe().show()"

In [64]:
from pyspark.sql.functions import col,expr,split,count

In [65]:
filter_sales_df = sales_df.filter(col("Order ID") != 'Order ID')

Below output tell us that the now the output is in perfect form

In [66]:
filter_sales_df.describe().show()

+-------+-----------------+------------+-------------------+------------------+--------------+--------------------+
|summary|         Order ID|     Product|   Quantity Ordered|        Price Each|    Order Date|    Purchase Address|
+-------+-----------------+------------+-------------------+------------------+--------------+--------------------+
|  count|           185950|      185950|             185950|            185950|        185950|              185950|
|   mean|230417.5693788653|        NULL| 1.1243828986286637|184.39973476743927|          NULL|                NULL|
| stddev|51512.73710999594|        NULL|0.44279262402866965|332.73132988434367|          NULL|                NULL|
|    min|           141234|20in Monitor|                  1|            109.99|01/01/19 03:07|1 11th St, Atlant...|
|    max|           319670|      iPhone|                  9|            999.99|12/31/19 23:53|999 Wilson St, Sa...|
+-------+-----------------+------------+-------------------+------------

### Extract the City and State from Address

In [67]:
filter_sales_df = filter_sales_df.withColumn("Purchase Address", split(col("Purchase Address"),','))

In [68]:
filter_sales_df.show(5)

+--------+--------------------+----------------+----------+--------------+--------------------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|
+--------+--------------------+----------------+----------+--------------+--------------------+
|  295665|  Macbook Pro Laptop|               1|      1700|12/30/19 00:01|[136 Church St,  ...|
|  295666|  LG Washing Machine|               1|     600.0|12/29/19 07:03|[562 2nd St,  New...|
|  295667|USB-C Charging Cable|               1|     11.95|12/12/19 18:21|[277 Main St,  Ne...|
|  295668|    27in FHD Monitor|               1|    149.99|12/22/19 15:13|[410 6th St,  San...|
|  295669|USB-C Charging Cable|               1|     11.95|12/18/19 12:38|[43 Hill St,  Atl...|
+--------+--------------------+----------------+----------+--------------+--------------------+
only showing top 5 rows



In [69]:
filter_sales_df = filter_sales_df.withColumn("City", col("Purchase Address").getItem(1))

In [70]:
filter_sales_df.show(5, False)

+--------+--------------------+----------------+----------+--------------+------------------------------------------+--------------+
|Order ID|Product             |Quantity Ordered|Price Each|Order Date    |Purchase Address                          |City          |
+--------+--------------------+----------------+----------+--------------+------------------------------------------+--------------+
|295665  |Macbook Pro Laptop  |1               |1700      |12/30/19 00:01|[136 Church St,  New York City,  NY 10001]| New York City|
|295666  |LG Washing Machine  |1               |600.0     |12/29/19 07:03|[562 2nd St,  New York City,  NY 10001]   | New York City|
|295667  |USB-C Charging Cable|1               |11.95     |12/12/19 18:21|[277 Main St,  New York City,  NY 10001]  | New York City|
|295668  |27in FHD Monitor    |1               |149.99    |12/22/19 15:13|[410 6th St,  San Francisco,  CA 94016]   | San Francisco|
|295669  |USB-C Charging Cable|1               |11.95     |12/18/19 1

In [71]:
filter_sales_df = filter_sales_df.withColumn("State", col("Purchase Address").getItem(2)[1:3])

In [72]:
filter_sales_df.show(5, False)

+--------+--------------------+----------------+----------+--------------+------------------------------------------+--------------+-----+
|Order ID|Product             |Quantity Ordered|Price Each|Order Date    |Purchase Address                          |City          |State|
+--------+--------------------+----------------+----------+--------------+------------------------------------------+--------------+-----+
|295665  |Macbook Pro Laptop  |1               |1700      |12/30/19 00:01|[136 Church St,  New York City,  NY 10001]| New York City| NY  |
|295666  |LG Washing Machine  |1               |600.0     |12/29/19 07:03|[562 2nd St,  New York City,  NY 10001]   | New York City| NY  |
|295667  |USB-C Charging Cable|1               |11.95     |12/12/19 18:21|[277 Main St,  New York City,  NY 10001]  | New York City| NY  |
|295668  |27in FHD Monitor    |1               |149.99    |12/22/19 15:13|[410 6th St,  San Francisco,  CA 94016]   | San Francisco| CA  |
|295669  |USB-C Charging Ca

In [73]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, TimestampType
from pyspark.sql.functions import to_timestamp

### Updating the column names

In [74]:
filter_sales_df = filter_sales_df.withColumnRenamed("Order ID","OrderID").withColumnRenamed("Quantity Ordered","Quantity").withColumnRenamed("Order Date","OrderDate").withColumnRenamed("Purchase Address","StoreAddress")\
                                .withColumnRenamed("Price Each","Price")

In [75]:
filter_sales_df.show(5, False)

+-------+--------------------+--------+------+--------------+------------------------------------------+--------------+-----+
|OrderID|Product             |Quantity|Price |OrderDate     |StoreAddress                              |City          |State|
+-------+--------------------+--------+------+--------------+------------------------------------------+--------------+-----+
|295665 |Macbook Pro Laptop  |1       |1700  |12/30/19 00:01|[136 Church St,  New York City,  NY 10001]| New York City| NY  |
|295666 |LG Washing Machine  |1       |600.0 |12/29/19 07:03|[562 2nd St,  New York City,  NY 10001]   | New York City| NY  |
|295667 |USB-C Charging Cable|1       |11.95 |12/12/19 18:21|[277 Main St,  New York City,  NY 10001]  | New York City| NY  |
|295668 |27in FHD Monitor    |1       |149.99|12/22/19 15:13|[410 6th St,  San Francisco,  CA 94016]   | San Francisco| CA  |
|295669 |USB-C Charging Cable|1       |11.95 |12/18/19 12:38|[43 Hill St,  Atlanta,  GA 30301]         | Atlanta      

### Updating the datatypes of columns

As OrderDate is in StringType format. To convert it to datetime format we used "to_timestamp()" function.

In [76]:
filter_sales_df = filter_sales_df.withColumn("OrderID", col("OrderID").cast(IntegerType()))\
                                .withColumn("Quantity", col("Quantity").cast(IntegerType()))\
                                .withColumn("Price", col("Price").cast(FloatType()))\
                                .withColumn("OrderDate", to_timestamp(col("OrderDate"),"MM/dd/yy HH:mm"))\
                                .withColumn("StoreAddress", col("StoreAddress").cast(StringType()))

In [77]:
filter_sales_df.printSchema()

root
 |-- OrderID: integer (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Price: float (nullable = true)
 |-- OrderDate: timestamp (nullable = true)
 |-- StoreAddress: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)



In [78]:
filter_sales_df.show(5, False)

+-------+--------------------+--------+------+-------------------+------------------------------------------+--------------+-----+
|OrderID|Product             |Quantity|Price |OrderDate          |StoreAddress                              |City          |State|
+-------+--------------------+--------+------+-------------------+------------------------------------------+--------------+-----+
|295665 |Macbook Pro Laptop  |1       |1700.0|2019-12-30 00:01:00|[136 Church St,  New York City,  NY 10001]| New York City| NY  |
|295666 |LG Washing Machine  |1       |600.0 |2019-12-29 07:03:00|[562 2nd St,  New York City,  NY 10001]   | New York City| NY  |
|295667 |USB-C Charging Cable|1       |11.95 |2019-12-12 18:21:00|[277 Main St,  New York City,  NY 10001]  | New York City| NY  |
|295668 |27in FHD Monitor    |1       |149.99|2019-12-22 15:13:00|[410 6th St,  San Francisco,  CA 94016]   | San Francisco| CA  |
|295669 |USB-C Charging Cable|1       |11.95 |2019-12-18 12:38:00|[43 Hill St,  Atl

### Extract year and month from OrderDate

In [79]:
from pyspark.sql.functions import year, month

In [80]:
filter_sales_df = filter_sales_df.withColumn("ReportYear", year(col("OrderDate"))).withColumn("Month", month(col("OrderDate")))

In [81]:
filter_sales_df.show(5, False)

+-------+--------------------+--------+------+-------------------+------------------------------------------+--------------+-----+----------+-----+
|OrderID|Product             |Quantity|Price |OrderDate          |StoreAddress                              |City          |State|ReportYear|Month|
+-------+--------------------+--------+------+-------------------+------------------------------------------+--------------+-----+----------+-----+
|295665 |Macbook Pro Laptop  |1       |1700.0|2019-12-30 00:01:00|[136 Church St,  New York City,  NY 10001]| New York City| NY  |2019      |12   |
|295666 |LG Washing Machine  |1       |600.0 |2019-12-29 07:03:00|[562 2nd St,  New York City,  NY 10001]   | New York City| NY  |2019      |12   |
|295667 |USB-C Charging Cable|1       |11.95 |2019-12-12 18:21:00|[277 Main St,  New York City,  NY 10001]  | New York City| NY  |2019      |12   |
|295668 |27in FHD Monitor    |1       |149.99|2019-12-22 15:13:00|[410 6th St,  San Francisco,  CA 94016]   | Sa

### Write the final dataframe to parquet file

In [83]:
sales_final_df = filter_sales_df.select("OrderID","Product","Quantity","Price","OrderDate","StoreAddress","City","ReportYear","Month")

In [84]:
sales_final_df.show(10,False)

+-------+--------------------------+--------+------+-------------------+---------------------------------------------+--------------+----------+-----+
|OrderID|Product                   |Quantity|Price |OrderDate          |StoreAddress                                 |City          |ReportYear|Month|
+-------+--------------------------+--------+------+-------------------+---------------------------------------------+--------------+----------+-----+
|295665 |Macbook Pro Laptop        |1       |1700.0|2019-12-30 00:01:00|[136 Church St,  New York City,  NY 10001]   | New York City|2019      |12   |
|295666 |LG Washing Machine        |1       |600.0 |2019-12-29 07:03:00|[562 2nd St,  New York City,  NY 10001]      | New York City|2019      |12   |
|295667 |USB-C Charging Cable      |1       |11.95 |2019-12-12 18:21:00|[277 Main St,  New York City,  NY 10001]     | New York City|2019      |12   |
|295668 |27in FHD Monitor          |1       |149.99|2019-12-22 15:13:00|[410 6th St,  San Fran

In [88]:
output_path = './output_data/parquet_sales_final/'

In [89]:
sales_final_df.write.format('parquet').mode('overwrite').partitionBy("ReportYear","Month").save(output_path)