In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
        col,
        concat_ws,
        from_unixtime,
        month,
        split,
        unix_timestamp,
        year
)
from pyspark.sql.types import FloatType, IntegerType, StringType, TimestampType

In [2]:
spark = SparkSession.builder.appName("ChallengePart2").getOrCreate()

In [3]:
file_path = "./data/salesdata/*.csv"

In [4]:
df = spark.read\
        .format("csv")\
        .option("header", "true")\
        .option("inferSchema", "true")\
        .load(file_path)

In [5]:
df.show(10, truncate=False)

+--------+--------------------------+----------------+----------+--------------+-----------------------------------------+
|Order ID|Product                   |Quantity Ordered|Price Each|Order Date    |Purchase Address                         |
+--------+--------------------------+----------------+----------+--------------+-----------------------------------------+
|295665  |Macbook Pro Laptop        |1               |1700.0    |12/30/19 00:01|136 Church St, New York City, NY 10001   |
|295666  |LG Washing Machine        |1               |600.0     |12/29/19 07:03|562 2nd St, New York City, NY 10001      |
|295667  |USB-C Charging Cable      |1               |11.95     |12/12/19 18:21|277 Main St, New York City, NY 10001     |
|295668  |27in FHD Monitor          |1               |149.99    |12/22/19 15:13|410 6th St, San Francisco, CA 94016      |
|295669  |USB-C Charging Cable      |1               |11.95     |12/18/19 12:38|43 Hill St, Atlanta, GA 30301            |
|295670  |AA Bat

In [6]:
df.count()

186850

##### Remove Bad Records

In [7]:
df.filter(col("Product") == "Product").show(10)

+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+
|    null|Product|            null|      null|Order Date|Purchase Address|
|    null|Product|            null|      null|Order Date|Purchase Address|
|    null|Product|            null|      null|Order Date|Purchase Address|
|    null|Product|            null|      null|Order Date|Purchase Address|
|    null|Product|            null|      null|Order Date|Purchase Address|
|    null|Product|            null|      null|Order Date|Purchase Address|
|    null|Product|            null|      null|Order Date|Purchase Address|
|    null|Product|            null|      null|Order Date|Purchase Address|
|    null|Product|            null|      null|Order Date|Purchase Address|
|    null|Product|            null|      null|Order Date|Purchase Address|
+--------+-------+-------

In [8]:
df_all_null_drop = df.na.drop("all")

In [9]:
df_all_null_drop.count()

186305

In [10]:
df_all_null_drop\
        .where(col("Product") == "Product")\
        .show(10, truncate=False)

+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+
|null    |Product|null            |null      |Order Date|Purchase Address|
|null    |Product|null            |null      |Order Date|Purchase Address|
|null    |Product|null            |null      |Order Date|Purchase Address|
|null    |Product|null            |null      |Order Date|Purchase Address|
|null    |Product|null            |null      |Order Date|Purchase Address|
|null    |Product|null            |null      |Order Date|Purchase Address|
|null    |Product|null            |null      |Order Date|Purchase Address|
|null    |Product|null            |null      |Order Date|Purchase Address|
|null    |Product|null            |null      |Order Date|Purchase Address|
|null    |Product|null            |null      |Order Date|Purchase Address|
+--------+-------+-------

In [11]:
df_cleaned_null_order_ids = df_all_null_drop.filter(col("Order ID").isNull() != True)
df_cleaned_null_order_ids\
        .where(col("Product") == "Product")\
        .show(10, truncate=False)

+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+
+--------+-------+----------------+----------+----------+----------------+



In [12]:
df_cleaned_null_order_ids.show(10, truncate=False)

+--------+--------------------------+----------------+----------+--------------+-----------------------------------------+
|Order ID|Product                   |Quantity Ordered|Price Each|Order Date    |Purchase Address                         |
+--------+--------------------------+----------------+----------+--------------+-----------------------------------------+
|295665  |Macbook Pro Laptop        |1               |1700.0    |12/30/19 00:01|136 Church St, New York City, NY 10001   |
|295666  |LG Washing Machine        |1               |600.0     |12/29/19 07:03|562 2nd St, New York City, NY 10001      |
|295667  |USB-C Charging Cable      |1               |11.95     |12/12/19 18:21|277 Main St, New York City, NY 10001     |
|295668  |27in FHD Monitor          |1               |149.99    |12/22/19 15:13|410 6th St, San Francisco, CA 94016      |
|295669  |USB-C Charging Cable      |1               |11.95     |12/18/19 12:38|43 Hill St, Atlanta, GA 30301            |
|295670  |AA Bat

In [13]:
df_cleaned_null_order_ids.count()

185950

In [14]:
distinct_df = df_cleaned_null_order_ids.distinct()
distinct_df.count()

185686

##### Extract City and State from Address into New Columns

In [15]:
added_city_state = distinct_df\
        .withColumn("City", split(col("Purchase Address"), ", ")[1])\
        .withColumn("State", split(col("Purchase Address"), ", ")[2][0:2])
added_city_state.show(10, truncate=False)

+--------+------------------------+----------------+----------+--------------+----------------------------------------+-------------+-----+
|Order ID|Product                 |Quantity Ordered|Price Each|Order Date    |Purchase Address                        |City         |State|
+--------+------------------------+----------------+----------+--------------+----------------------------------------+-------------+-----+
|295952  |Flatscreen TV           |1               |300.0     |12/23/19 12:55|403 Dogwood St, Seattle, WA 98101       |Seattle      |WA   |
|296051  |Wired Headphones        |2               |11.99     |12/15/19 14:18|796 6th St, New York City, NY 10001     |New York City|NY   |
|296357  |USB-C Charging Cable    |1               |11.95     |12/09/19 22:26|529 7th St, Atlanta, GA 30301           |Atlanta      |GA   |
|296699  |34in Ultrawide Monitor  |1               |379.99    |12/09/19 10:24|129 Center St, Dallas, TX 75001         |Dallas       |TX   |
|296829  |AA Batteri

In [16]:
added_city_state.printSchema()

root
 |-- Order ID: integer (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity Ordered: integer (nullable = true)
 |-- Price Each: double (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Purchase Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)



In [17]:
added_city_state.where(col("Order ID") == 297389).show()

+--------+-------------+----------------+----------+--------------+--------------------+-------------+-----+
|Order ID|      Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|         City|State|
+--------+-------------+----------------+----------+--------------+--------------------+-------------+-----+
|  297389|Flatscreen TV|               1|     300.0|12/15/19 00:46|455 Lake St, San ...|San Francisco|   CA|
+--------+-------------+----------------+----------+--------------+--------------------+-------------+-----+



##### Rename and Change DataTypes

In [18]:
updated_df = added_city_state\
        .withColumnRenamed("Order ID", "OrderID")\
        .withColumn("OrderID", col("OrderID").cast(IntegerType()))\
        .withColumnRenamed("Quantity Ordered", "Quantity")\
        .withColumn("Quantity", col("Quantity").cast(IntegerType()))\
        .withColumnRenamed("Order Date", "OrderDate")\
        .withColumn(
                "OrderDate",
                from_unixtime(
                        unix_timestamp(col("OrderDate"), "MM/dd/yy HH:mm")
                ).alias("OrderDate")
        )\
        .withColumnRenamed("Purchase Address", "StoreAddress")\
        .withColumn("StoreAddress", col("StoreAddress").cast(StringType()))\
        .withColumnRenamed("Price Each", "Price")\
        .withColumn("Price", col("Price").cast(FloatType()))
updated_df.show(5, truncate=False)

+-------+----------------------+--------+------+-------------------+-------------------------------------+-------------+-----+
|OrderID|Product               |Quantity|Price |OrderDate          |StoreAddress                         |City         |State|
+-------+----------------------+--------+------+-------------------+-------------------------------------+-------------+-----+
|295952 |Flatscreen TV         |1       |300.0 |2019-12-23 12:55:00|403 Dogwood St, Seattle, WA 98101    |Seattle      |WA   |
|296051 |Wired Headphones      |2       |11.99 |2019-12-15 14:18:00|796 6th St, New York City, NY 10001  |New York City|NY   |
|296357 |USB-C Charging Cable  |1       |11.95 |2019-12-09 22:26:00|529 7th St, Atlanta, GA 30301        |Atlanta      |GA   |
|296699 |34in Ultrawide Monitor|1       |379.99|2019-12-09 10:24:00|129 Center St, Dallas, TX 75001      |Dallas       |TX   |
|296829 |AA Batteries (4-pack) |1       |3.84  |2019-12-22 13:57:00|100 Cedar St, New York City, NY 10001|New Y

In [19]:
updated_df.where(col("OrderDate").isNull() == True).show(truncate=False)

+-------+-------+--------+-----+---------+------------+----+-----+
|OrderID|Product|Quantity|Price|OrderDate|StoreAddress|City|State|
+-------+-------+--------+-----+---------+------------+----+-----+
+-------+-------+--------+-----+---------+------------+----+-----+



In [20]:
updated_df.printSchema()

root
 |-- OrderID: integer (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Price: float (nullable = true)
 |-- OrderDate: string (nullable = true)
 |-- StoreAddress: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)



##### Add New Columns: ReportYear and Month

In [21]:
updated_df_month_year = updated_df\
        .withColumn("ReportYear", year(col("OrderDate")))\
        .withColumn("Month", month(col("OrderDate")))
updated_df_month_year.printSchema()

root
 |-- OrderID: integer (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Price: float (nullable = true)
 |-- OrderDate: string (nullable = true)
 |-- StoreAddress: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- ReportYear: integer (nullable = true)
 |-- Month: integer (nullable = true)



In [22]:
updated_df_month_year.show(5)

+-------+--------------------+--------+------+-------------------+--------------------+-------------+-----+----------+-----+
|OrderID|             Product|Quantity| Price|          OrderDate|        StoreAddress|         City|State|ReportYear|Month|
+-------+--------------------+--------+------+-------------------+--------------------+-------------+-----+----------+-----+
| 295952|       Flatscreen TV|       1| 300.0|2019-12-23 12:55:00|403 Dogwood St, S...|      Seattle|   WA|      2019|   12|
| 296051|    Wired Headphones|       2| 11.99|2019-12-15 14:18:00|796 6th St, New Y...|New York City|   NY|      2019|   12|
| 296357|USB-C Charging Cable|       1| 11.95|2019-12-09 22:26:00|529 7th St, Atlan...|      Atlanta|   GA|      2019|   12|
| 296699|34in Ultrawide Mo...|       1|379.99|2019-12-09 10:24:00|129 Center St, Da...|       Dallas|   TX|      2019|   12|
| 296829|AA Batteries (4-p...|       1|  3.84|2019-12-22 13:57:00|100 Cedar St, New...|New York City|   NY|      2019|   12|


In [23]:
updated_df_month_year.count()

185686

##### Find and Remove All Rows with Any Columns

In [24]:
almost_final_df = updated_df_month_year.na.drop("any")
almost_final_df.count()

185686

##### Write Final DataFrame to Parquet

In [25]:
final_df = almost_final_df.select("OrderID", "Product", "Quantity", "Price", "OrderDate", "StoreAddress",
                "City", "State", "ReportYear", "Month")
final_df.show(10)

+-------+--------------------+--------+------+-------------------+--------------------+-------------+-----+----------+-----+
|OrderID|             Product|Quantity| Price|          OrderDate|        StoreAddress|         City|State|ReportYear|Month|
+-------+--------------------+--------+------+-------------------+--------------------+-------------+-----+----------+-----+
| 295952|       Flatscreen TV|       1| 300.0|2019-12-23 12:55:00|403 Dogwood St, S...|      Seattle|   WA|      2019|   12|
| 296051|    Wired Headphones|       2| 11.99|2019-12-15 14:18:00|796 6th St, New Y...|New York City|   NY|      2019|   12|
| 296357|USB-C Charging Cable|       1| 11.95|2019-12-09 22:26:00|529 7th St, Atlan...|      Atlanta|   GA|      2019|   12|
| 296699|34in Ultrawide Mo...|       1|379.99|2019-12-09 10:24:00|129 Center St, Da...|       Dallas|   TX|      2019|   12|
| 296829|AA Batteries (4-p...|       1|  3.84|2019-12-22 13:57:00|100 Cedar St, New...|New York City|   NY|      2019|   12|


In [27]:
output_path = "./data/output/sales"
final_df.write.mode("overwrite").partitionBy("ReportYear", "Month").parquet(output_path)