In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField,StringType,IntegerType
from pyspark.sql.functions import col,split,to_timestamp,year,month
from pyspark.sql.types import IntegerType, FloatType

# EXTRACTING THE DATA

In [2]:
spark=SparkSession.builder.appName("practice1").getOrCreate()

24/03/17 05:16:27 WARN Utils: Your hostname, Manishas-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.1.10.215 instead (on interface en0)
24/03/17 05:16:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/03/17 05:16:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
schema=StructType([
    StructField("Order ID",StringType(),True),
    StructField("Product",StringType(),True),
    StructField("Quantity Ordered",StringType(),True),
    StructField("Price Each",StringType(),True),
    StructField("Order Date",StringType(),True),
    StructField("Purchase Address",StringType(),True)
    
])

In [4]:
salesfile_path="./data/salesdata"
df=spark.read.format("csv").option("header",True).schema(schema).load(salesfile_path)

In [5]:
df.show(truncate=True)

+--------+--------------------+----------------+----------+--------------+--------------------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|
+--------+--------------------+----------------+----------+--------------+--------------------+
|  295665|  Macbook Pro Laptop|               1|      1700|12/30/19 00:01|136 Church St, Ne...|
|  295666|  LG Washing Machine|               1|     600.0|12/29/19 07:03|562 2nd St, New Y...|
|  295667|USB-C Charging Cable|               1|     11.95|12/12/19 18:21|277 Main St, New ...|
|  295668|    27in FHD Monitor|               1|    149.99|12/22/19 15:13|410 6th St, San F...|
|  295669|USB-C Charging Cable|               1|     11.95|12/18/19 12:38|43 Hill St, Atlan...|
|  295670|AA Batteries (4-p...|               1|      3.84|12/31/19 22:58|200 Jefferson St,...|
|  295671|USB-C Charging Cable|               1|     11.95|12/16/19 15:10|928 12th St, Port...|
|  295672|USB-C Charging Cable|         

In [6]:
df.printSchema()

root
 |-- Order ID: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity Ordered: string (nullable = true)
 |-- Price Each: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Purchase Address: string (nullable = true)



# TRANSFORMING THE DATA

## Data preparation and cleansing

### Removing null rows and bad records 

In [7]:
df.filter(col('Order ID').isNull()==True).show()

+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+
|    null|   null|            null|      null|      null|            null|
|    null|   null|            null|      null|      null|            null|
|    null|   null|            null|      null|      null|            null|
|    null|   null|            null|      null|      null|            null|
|    null|   null|            null|      null|      null|            null|
|    null|   null|            null|      null|      null|            null|
|    null|   null|            null|      null|      null|            null|
|    null|   null|            null|      null|      null|            null|
|    null|   null|            null|      null|      null|            null|
|    null|   null|            null|      null|      null|            null|
|    null|   null|       

In [8]:
df= df.na.drop("any")


In [9]:
df.filter(col('Order ID').isNull()==True).show()


+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+
+--------+-------+----------------+----------+----------+----------------+



In [10]:
df.describe("Order ID","Product","Quantity Ordered","Price Each","Order Date","Purchase Address").show()

                                                                                

+-------+------------------+------------+-------------------+------------------+--------------+--------------------+
|summary|          Order ID|     Product|   Quantity Ordered|        Price Each|    Order Date|    Purchase Address|
+-------+------------------+------------+-------------------+------------------+--------------+--------------------+
|  count|            186305|      186305|             186305|            186305|        186305|              186305|
|   mean| 230417.5693788653|        null| 1.1243828986286637|184.39973476745283|          null|                null|
| stddev|51512.737109994916|        null|0.44279262402866904| 332.7313298843438|          null|                null|
|    min|            141234|20in Monitor|                  1|            109.99|01/01/19 03:07|1 11th St, Atlant...|
|    max|          Order ID|      iPhone|   Quantity Ordered|        Price Each|    Order Date|    Purchase Address|
+-------+------------------+------------+-------------------+---

In [11]:
df.filter(col("Order ID")=="Order ID").show()

+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantit

In [12]:
new_df=df.distinct()


In [13]:
new_df.filter(col("Order ID")=="Order ID").show()

+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+



In [14]:
new_df= new_df.filter(col("Order ID") != "Order ID")

In [15]:
new_df.filter(col("Order ID")=="Order ID").show()

+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+
+--------+-------+----------------+----------+----------+----------------+



In [16]:
new_df.describe("Order ID","Product","Quantity Ordered","Price Each","Order Date","Purchase Address").show(truncate=False)



+-------+------------------+------------+------------------+------------------+--------------+--------------------------------------+
|summary|Order ID          |Product     |Quantity Ordered  |Price Each        |Order Date    |Purchase Address                      |
+-------+------------------+------------+------------------+------------------+--------------+--------------------------------------+
|count  |185686            |185686      |185686            |185686            |185686        |185686                                |
|mean   |230411.37622653297|null        |1.1245435843305365|184.51925546352427|null          |null                                  |
|stddev |51511.71718332086 |null        |0.4430687383832874|332.8438383900525 |null          |null                                  |
|min    |141234            |20in Monitor|1                 |109.99            |01/01/19 03:07|1 11th St, Atlanta, GA 30301          |
|max    |319670            |iPhone      |9                 |99

                                                                                

### extract city and state from purchase address

In [17]:
new_df.select("Purchase Address",split(col("Purchase Address"),",")).show(10,truncate=False)

+----------------------------------------+--------------------------------------------+
|Purchase Address                        |split(Purchase Address, ,, -1)              |
+----------------------------------------+--------------------------------------------+
|283 Washington St, Boston, MA 02215     |[283 Washington St,  Boston,  MA 02215]     |
|968 8th St, Austin, TX 73301            |[968 8th St,  Austin,  TX 73301]            |
|857 Center St, Boston, MA 02215         |[857 Center St,  Boston,  MA 02215]         |
|679 Chestnut St, San Francisco, CA 94016|[679 Chestnut St,  San Francisco,  CA 94016]|
|58 Dogwood St, San Francisco, CA 94016  |[58 Dogwood St,  San Francisco,  CA 94016]  |
|355 Park St, Boston, MA 02215           |[355 Park St,  Boston,  MA 02215]           |
|542 9th St, New York City, NY 10001     |[542 9th St,  New York City,  NY 10001]     |
|708 Walnut St, New York City, NY 10001  |[708 Walnut St,  New York City,  NY 10001]  |
|538 Hickory St, San Francisco, 

In [18]:
new_df.select("Purchase Address",split(col("Purchase Address"),",").getItem(1)).show(10,truncate=False)


+----------------------------------------+---------------------------------+
|Purchase Address                        |split(Purchase Address, ,, -1)[1]|
+----------------------------------------+---------------------------------+
|283 Washington St, Boston, MA 02215     | Boston                          |
|968 8th St, Austin, TX 73301            | Austin                          |
|857 Center St, Boston, MA 02215         | Boston                          |
|679 Chestnut St, San Francisco, CA 94016| San Francisco                   |
|58 Dogwood St, San Francisco, CA 94016  | San Francisco                   |
|355 Park St, Boston, MA 02215           | Boston                          |
|542 9th St, New York City, NY 10001     | New York City                   |
|708 Walnut St, New York City, NY 10001  | New York City                   |
|538 Hickory St, San Francisco, CA 94016 | San Francisco                   |
|199 8th St, San Francisco, CA 94016     | San Francisco                   |

In [19]:
new_df.select("Purchase Address",split(col("Purchase Address"),",").getItem(2)).show(10,truncate=False)

+----------------------------------------+---------------------------------+
|Purchase Address                        |split(Purchase Address, ,, -1)[2]|
+----------------------------------------+---------------------------------+
|283 Washington St, Boston, MA 02215     | MA 02215                        |
|968 8th St, Austin, TX 73301            | TX 73301                        |
|857 Center St, Boston, MA 02215         | MA 02215                        |
|679 Chestnut St, San Francisco, CA 94016| CA 94016                        |
|58 Dogwood St, San Francisco, CA 94016  | CA 94016                        |
|355 Park St, Boston, MA 02215           | MA 02215                        |
|542 9th St, New York City, NY 10001     | NY 10001                        |
|708 Walnut St, New York City, NY 10001  | NY 10001                        |
|538 Hickory St, San Francisco, CA 94016 | CA 94016                        |
|199 8th St, San Francisco, CA 94016     | CA 94016                        |

In [20]:
new_df.select("Purchase Address",split(split(col("Purchase Address"),",").getItem(2)," ").getItem(1)).show(10,truncate=False)

+----------------------------------------+--------------------------------------------------+
|Purchase Address                        |split(split(Purchase Address, ,, -1)[2],  , -1)[1]|
+----------------------------------------+--------------------------------------------------+
|283 Washington St, Boston, MA 02215     |MA                                                |
|968 8th St, Austin, TX 73301            |TX                                                |
|857 Center St, Boston, MA 02215         |MA                                                |
|679 Chestnut St, San Francisco, CA 94016|CA                                                |
|58 Dogwood St, San Francisco, CA 94016  |CA                                                |
|355 Park St, Boston, MA 02215           |MA                                                |
|542 9th St, New York City, NY 10001     |NY                                                |
|708 Walnut St, New York City, NY 10001  |NY                

In [21]:
new_df=(new_df.withColumn("City",split(col("Purchase Address"),",").getItem(1))
       .withColumn("State",split(split(col("Purchase Address"),",").getItem(2)," ").getItem(1))
    )

In [22]:
new_df.show()

+--------+--------------------+----------------+----------+--------------+--------------------+--------------+-----+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|          City|State|
+--------+--------------------+----------------+----------+--------------+--------------------+--------------+-----+
|  295900|AA Batteries (4-p...|               1|      3.84|12/27/19 18:56|283 Washington St...|        Boston|   MA|
|  295923|Lightning Chargin...|               1|     14.95|12/21/19 13:41|968 8th St, Austi...|        Austin|   TX|
|  295991|Lightning Chargin...|               1|     14.95|12/15/19 20:16|857 Center St, Bo...|        Boston|   MA|
|  296076|  Macbook Pro Laptop|               1|      1700|12/03/19 15:19|679 Chestnut St, ...| San Francisco|   CA|
|  297015|AAA Batteries (4-...|               3|      2.99|12/13/19 08:43|58 Dogwood St, Sa...| San Francisco|   CA|
|  297237|Bose SoundSport H...|               1|     99.99|12/16

### Rename and change datatypes

In [23]:
new_df.printSchema()

root
 |-- Order ID: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity Ordered: string (nullable = true)
 |-- Price Each: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Purchase Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)



In [24]:
new_df=(new_df.withColumn("orderid", col("Order ID").cast(IntegerType()))
       .withColumn("price", col("Price Each").cast(FloatType()))
       .withColumn("Quantity", col("Quantity Ordered").cast(IntegerType()))
       .withColumn("orderdate", to_timestamp(col("Order Date"),"MM/dd/yy HH:mm"))
       .withColumnRenamed("Purchase Address","storeaddress")
       .drop("Order ID")
       .drop("Price Each")
       .drop("Quantity Ordered")
       .drop("Order Date"))
    

In [25]:
new_df.printSchema()

root
 |-- Product: string (nullable = true)
 |-- storeaddress: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- orderid: integer (nullable = true)
 |-- price: float (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- orderdate: timestamp (nullable = true)



In [26]:
new_df.show()

+--------------------+--------------------+--------------+-----+-------+------+--------+-------------------+
|             Product|        storeaddress|          City|State|orderid| price|Quantity|          orderdate|
+--------------------+--------------------+--------------+-----+-------+------+--------+-------------------+
|AA Batteries (4-p...|283 Washington St...|        Boston|   MA| 295900|  3.84|       1|2019-12-27 18:56:00|
|Lightning Chargin...|968 8th St, Austi...|        Austin|   TX| 295923| 14.95|       1|2019-12-21 13:41:00|
|Lightning Chargin...|857 Center St, Bo...|        Boston|   MA| 295991| 14.95|       1|2019-12-15 20:16:00|
|  Macbook Pro Laptop|679 Chestnut St, ...| San Francisco|   CA| 296076|1700.0|       1|2019-12-03 15:19:00|
|AAA Batteries (4-...|58 Dogwood St, Sa...| San Francisco|   CA| 297015|  2.99|       3|2019-12-13 08:43:00|
|Bose SoundSport H...|355 Park St, Bost...|        Boston|   MA| 297237| 99.99|       1|2019-12-16 10:28:00|
|    27in FHD Monit

### Adding columns: month, year

In [27]:
new_df=(new_df.withColumn("year", year(col("orderdate")))
               .withColumn("month",month(col("orderdate"))))

In [28]:
new_df.show()

+--------------------+--------------------+--------------+-----+-------+------+--------+-------------------+----+-----+
|             Product|        storeaddress|          City|State|orderid| price|Quantity|          orderdate|year|month|
+--------------------+--------------------+--------------+-----+-------+------+--------+-------------------+----+-----+
|AA Batteries (4-p...|283 Washington St...|        Boston|   MA| 295900|  3.84|       1|2019-12-27 18:56:00|2019|   12|
|Lightning Chargin...|968 8th St, Austi...|        Austin|   TX| 295923| 14.95|       1|2019-12-21 13:41:00|2019|   12|
|Lightning Chargin...|857 Center St, Bo...|        Boston|   MA| 295991| 14.95|       1|2019-12-15 20:16:00|2019|   12|
|  Macbook Pro Laptop|679 Chestnut St, ...| San Francisco|   CA| 296076|1700.0|       1|2019-12-03 15:19:00|2019|   12|
|AAA Batteries (4-...|58 Dogwood St, Sa...| San Francisco|   CA| 297015|  2.99|       3|2019-12-13 08:43:00|2019|   12|
|Bose SoundSport H...|355 Park St, Bost.

# LOADING THE DATA

### Writing the final dataframe to json

In [29]:
final_df=new_df.select("orderid","product","quantity","price","orderdate","storeaddress","city","state","year","month")
dfpath = './data/output1/sales'
final_df.write.format("json").mode("overwrite").partitionBy("year", "month").save(dfpath)

                                                                                