# Global Sales Data Synthetic Dataset

In this homework we're going to play around with a synthetically generated sales dataset from Databricks. As you run the commands below, pay attention to the jobs/tasks that get launched with each cell. Remember that these cells are executed on  a hosted cluster running in some datacenter!

Some Spark Tips:

1. You'll find the following cheat-sheet useful for finding commands [PySpark SparkSQL Cheat Sheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf) 
2. The Official Documentation is also [available](https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#), but is quite dense. It's useful as a reference, and specifically, the [`pyspark.sql.functions`](https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#module-pyspark.sql.functions#) is very useful for auxilliary functions.
3. If you do search for help with Spark commands, remember that we're specifically using Python (pyspark) and SparkSQL

## Intitialization and Setup

Make sure you run the following cells before you begin:

In [3]:
%run "./Includes/Classroom Setup"

In [4]:
%run "./Includes/Utility-Methods"

The files for this homework are located in `/mnt/training/global-sales/`, and the following cell displays the readme file for this dataset.

In [6]:
%fs head /mnt/training/global-sales/README.md

Since this is a very large dataset (approx 30 million - 40 million records for each year), we'll work only on a single year (2018), as we're playing around with the free (community) version of Databricks. Our "Cluster" consists of a single computer with 8 "cores", so we can run 8 tasks in parallel at any given time.

We're interested in three of the files:
- `txn_2018` A list of all transactions from all retailers throughout 2018 along with `city_id` and `retailer_id`, which refer to the tables below
- `cities`: A list of cities
- `retailers`: A list of 100 retailers, the volume of US sales and non-US sales 

Thus, `txn_2018` refers to a *fact table* and `cities` and `retailers` are *dimension tables*. You'll need to join the fact on dimension tables to get useful data.

## 1. Partitioning and Loading Large Data (20 points)

In [10]:
# Run this cell to import all the requried spark functions.
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import StorageLevel

In the cell below, we've provided paths to two versions of `txn_2018`, which have the same data, but partitioned in different ways.

In [12]:
# File locations
txn_2018_1 = "dbfs:/mnt/training/global-sales/transactions/2018.parquet"
txn_2018_2 = "dbfs:/mnt/training/global-sales/solutions/2018-fixed.parquet/"

The following function allows us to get file statistics for a file location provided as a parameter

In [14]:
# Run this cell, don't modify it.
# The following function prints the file size and count information for a file path
def print_file_stats(pathname):
  (count, bytes) = computeFileStats(pathname)
  print("{:14,} files".format(count))
  print("{:14,} bytes".format(bytes))
  print("{:14,} KB/file".format(int(bytes/count/1024)))
  print("-"*80)

**Q1.1** Use the `print_file_stats` function above on the two file locations and report back the difference between the two datasets

In [16]:
# Write your answer below

print_file_stats(txn_2018_1)
print_file_stats(txn_2018_2)



In [17]:
# Comment on the difference between the two file paths below
#The first file path has far more files while the second file path has few files. However, the second file path had far more KB/file while the first file path had only a few KB/file. This indicates that for the first file path, there are a lot of files but not much information per file, while with the second file path, there are few files but a lot of information per file.


**Q1.2** Both the datasets are in the [`parquet`](https://en.wikipedia.org/wiki/Apache_Parquet) file format, use the [`spark.read.parquet()`](https://spark.apache.org/docs/latest/sql-data-sources-parquet.html) function to load them into data frames, `df_2018_1` and `df_2018_2` respectively. The loads will potentially take a while to run

In [19]:
# Load the first dataset here
df_2018_1 = spark.read.parquet(txn_2018_1)

In [20]:
# Load the second dataset here
df_2018_2 = spark.read.parquet(txn_2018_2)

Notice that even though you've provided the load function, we've not computed anything on the dataframe. Let's go ahead and count the number of rows in each dataframe, this should trigger a spark job that takes time to complete.

Run the two cells below, one by one. Once they have finished running, answer the questions below:

**While the jobs run, pay attention to the jobs, stages and tasks that are launched, as well as the number of tasks that can run concurrently.**

**Note**: One of the cells below takes ~ **10 - 20 minutes to complete**.

In [22]:
df_2018_1.count()

In [23]:
df_2018_2.count()

**Q1.3** How many Jobs/Stages/Tasks were launched for df_2018_1? How long did it take to run?

In [25]:
# Write your answers here
#1 Job (job 3) with 2 stages run. There were different number of tasks in each stage. The first had 1152 tasks and the second had one task. There were 8 tasks running at once. It took 14 minutes.

**Q1.4** How many Jobs/Stages/Tasks were launched for df_2018_2? How long did it take to run?

In [27]:
# Write your answers here

#1 job with 2 stages run. There were different number of tasks in each stage. The first had 11 tasks and the second had one tasks. It took a second.

**Q1.5** There were the same number of records in each dataframe. Why is there a discrepancy in the time taken to run the operations?

In [29]:
# Write your answers here
# It takes more time to switch from file to file rather than to parse through information within a file. THerefore, since the second file path had less overall files and more information per file, it was singificantly faster. Each task was a file, so less files meant less tasks which meant less time.

Let's load the `cities` and `retailers` dimension tables as well. run all the cells below to load the files into the individual dataframes
. Run the following cells to load and display them.

In [31]:
citiesPath = "dbfs:/mnt/training/global-sales/cities/all.parquet/"
retailersPath = "dbfs:/mnt/training/global-sales/retailers/all.parquet/"


In [32]:
citiesDF = spark.read.parquet(citiesPath).filter(col('state_abv').isNotNull()) # Remove any city without a state abbreviation.
retailersDF = spark.read.parquet(retailersPath)

In [33]:
display(citiesDF)

city_id,city,state,state_abv,country
2074005445,Montpelier,Vermont,VT,USA
2055198208,Bismarck,North Dakota,ND,USA
1717498102,Lincoln,Nebraska,NE,USA
1217211842,Nashville,Tennessee,TN,USA
2117800149,Boise,Idaho,ID,USA
781290085,Honolulu,Hawaii,HI,USA
287177635,Sacramento,California,CA,USA
485114748,Baton Rouge,Louisiana,LA,USA
1690909999,Springfield,Illinois,IL,USA
455476705,Salem,Oregon,OR,USA


In [34]:
display(retailersDF)

retailer_id,retailer,us_sales,other_sales,all_sales,us_vs_world
2001148981,Costco,79694000,31836000,111530000,0.7145521384380884
1953761884,Home Depot,74203000,8992000,83195000,0.891916581525332
1076023740,Aldi,11728000,41794000,53522000,0.2191248458577781
683159064,Safeway,36330000,0,36330000,1.0
1654681099,Delhaize America,17069000,11348000,28417000,0.6006615758172925
1126742091,H-E-B Grocery,19819000,1347000,21166000,0.93636020032127
1245928212,Gap,13071000,3885000,16956000,0.7708775654635527
44686722,Ace Hardware,14299000,46000,14345000,0.9967933077727432
202319369,J.C. Penney,12184000,73000,12257000,0.9940442196295994
1647858807,Verizon Wireless,10959000,0,10959000,1.0


##2. Spark User-Defined Functions (5 Points)

Let's say we want to look up the zipcodes for each of the cities in our dataset. We'll be using an external web-based service, [Zippopotam.us](http://www.zippopotam.us/), to automatically fetch the zip code for a given city,state. The `fetch` function defined below will contact the zippopotumus service via the web and lookup the zipcode for a city,state combination.

In [37]:
# The following function is a User-Defined Function that is registered with spark
def fetch(city, state):
    try:
      import urllib.request as urllib2
    except ImportError:
      import urllib2
    
    url = "http://api.zippopotam.us/us/{}/{}".format(state, city.replace(" ", "%20"))
    json = urllib2.urlopen(url).read().decode("utf-8")
    posA = json.index("\"post code\": \"")+14
    posB = json.index("\"", posA)
    return json[posA:posB]

# Register the function with spark so that it can run on all the machines in the cluster
fetchUDF = spark.udf.register("fetch", fetch)

In [38]:
# Test the function below, for any city/state
fetch('Chicago','IL')

**Q2.1** Add a `zip_code` column to the `citiesDF` dataframe, constructed from the `city` and `state_abv` columns, using the UDF `fetchUDF`

In [40]:
#Write your answer below
citiesDF=citiesDF.withColumn("zip_code", fetchUDF("city", "state_abv"))

In [41]:
# Run the cell to show the cities dataframe and verify that the zipcode is added to each row
display(citiesDF)

city_id,city,state,state_abv,country,zip_code
2074005445,Montpelier,Vermont,VT,USA,5601
2055198208,Bismarck,North Dakota,ND,USA,58501
1717498102,Lincoln,Nebraska,NE,USA,68501
1217211842,Nashville,Tennessee,TN,USA,37201
2117800149,Boise,Idaho,ID,USA,83701
781290085,Honolulu,Hawaii,HI,USA,96801
287177635,Sacramento,California,CA,USA,94203
485114748,Baton Rouge,Louisiana,LA,USA,70801
1690909999,Springfield,Illinois,IL,USA,62701
455476705,Salem,Oregon,OR,USA,97301


##3. Joining Tables in Spark (20 points)

**Q3.1** Join `citiesDF` and `df_2018_2` on `city_id` to create a dataframe called `txn_city`

In [44]:
#Answer below
#df.join(df2, df.name == df2.name, 'outer').select(df.name, df2.height).collect()
#df.join(df2, 'name', 'outer')
txn_city = citiesDF.join(df_2018_2, 'city_id', 'outer')


In [45]:
display(txn_city)

city_id,city,state,state_abv,country,zip_code,transacted_at,trx_id,retailer_id,description,amount,year,month
559832710,Jackson,Mississippi,MS,USA,39200,2018-12-10T23:00:00.000+0000,1228095611,1273066548,7-Eleven ppd id: 107645 Jackson 12-12,1449.22,2018,12
559832710,Jackson,Mississippi,MS,USA,39200,2018-12-26T11:00:00.000+0000,2126990310,1381060274,Price Chopper Supermarkets 12-28,1924.96,2018,12
559832710,Jackson,Mississippi,MS,USA,39200,2018-12-29T11:00:00.000+0000,917106599,2077350195,Walgreen ccd id: 528574,55.21,2018,12
559832710,Jackson,Mississippi,MS,USA,39200,2018-12-14T11:00:00.000+0000,1643296334,2139149619,unkn Jackson,34.97,2018,12
559832710,Jackson,Mississippi,MS,USA,39200,2018-12-25T11:00:00.000+0000,360551390,486576507,iTunes ccd id: 795080 12-26,505.15,2018,12
559832710,Jackson,Mississippi,MS,USA,39200,2018-12-10T17:00:00.000+0000,1217711318,847200066,unkn ppd id: 516221 Jackson,34.93,2018,12
559832710,Jackson,Mississippi,MS,USA,39200,2018-12-12T11:00:00.000+0000,205432942,1953761884,unkn ccd id: 761526,13.44,2018,12
559832710,Jackson,Mississippi,MS,USA,39200,2018-12-14T11:00:00.000+0000,598428307,683159064,Safeway ccd id: 118484,1974.51,2018,12
559832710,Jackson,Mississippi,MS,USA,39200,2018-12-10T13:00:00.000+0000,1962687919,1334799521,unkn,7.16,2018,12
559832710,Jackson,Mississippi,MS,USA,39200,2018-12-09T23:00:00.000+0000,561224669,1903529855,Starbucks arc id: 558456 Jackson 12-09,42.85,2018,12


**Q3.2** How long did the join operation take? (Remember that the join is actually executed when you do something with the dataframe, like `display()`-ing it)

In [47]:
# Write your answer as a comment below
# It took ~1.74 minutes to run the above cell and do the join operation 

**Q3.3** Let's try to optimize the join. Spark provides a [`broadcast`](https://spark.apache.org/docs/latest/rdd-programming-guide.html#broadcast-variables) operation which loads one of the tables in the memory of each of the cluster nodes. Which of the two dataframes should be broadcast?

In [49]:
# Write your answer here as a comment (either citiesDF or df_2018_2)

# The smaller dataset should be broadcasted, which is citiesDF. 

**Q3.4** Perform the join again using a broadcast variable. How long did this operation take?

In [51]:
# Join command here

broadcastVar = broadcast(citiesDF)
print(broadcastVar)

txn_city = df_2018_2.join(broadcastVar, 'city_id', 'outer')
display(txn_city)
# Write out how long the operation took below as a comment:
#It took ~43 seconds.

city_id,transacted_at,trx_id,retailer_id,description,amount,year,month,city,state,state_abv,country,zip_code
559832710,2018-12-10T23:00:00.000+0000,1228095611,1273066548,7-Eleven ppd id: 107645 Jackson 12-12,1449.22,2018,12,Jackson,Mississippi,MS,USA,39200
559832710,2018-12-26T11:00:00.000+0000,2126990310,1381060274,Price Chopper Supermarkets 12-28,1924.96,2018,12,Jackson,Mississippi,MS,USA,39200
559832710,2018-12-29T11:00:00.000+0000,917106599,2077350195,Walgreen ccd id: 528574,55.21,2018,12,Jackson,Mississippi,MS,USA,39200
559832710,2018-12-14T11:00:00.000+0000,1643296334,2139149619,unkn Jackson,34.97,2018,12,Jackson,Mississippi,MS,USA,39200
559832710,2018-12-25T11:00:00.000+0000,360551390,486576507,iTunes ccd id: 795080 12-26,505.15,2018,12,Jackson,Mississippi,MS,USA,39200
559832710,2018-12-10T17:00:00.000+0000,1217711318,847200066,unkn ppd id: 516221 Jackson,34.93,2018,12,Jackson,Mississippi,MS,USA,39200
559832710,2018-12-12T11:00:00.000+0000,205432942,1953761884,unkn ccd id: 761526,13.44,2018,12,Jackson,Mississippi,MS,USA,39200
559832710,2018-12-14T11:00:00.000+0000,598428307,683159064,Safeway ccd id: 118484,1974.51,2018,12,Jackson,Mississippi,MS,USA,39200
559832710,2018-12-10T13:00:00.000+0000,1962687919,1334799521,unkn,7.16,2018,12,Jackson,Mississippi,MS,USA,39200
559832710,2018-12-09T23:00:00.000+0000,561224669,1903529855,Starbucks arc id: 558456 Jackson 12-09,42.85,2018,12,Jackson,Mississippi,MS,USA,39200


**Q3.5** Let us perform one last join using our new dataframe `txn_city`, to make sure that we have the `retailer` name associated with each `retailer_id`. Store the final table in `finalDF`

In [53]:
#Write your solution below
broadcastVar2 = broadcast(retailersDF)

finalDF = txn_city.join(broadcastVar2, 'retailer_id', 'outer')
display(finalDF)
#How long did the final join operation take? Mention if you used a broadcast hint or not.
#The final join operation took ~1.67 minutes. However, regardless of which dataset was broadcasted, it took the same amount of time.

retailer_id,city_id,transacted_at,trx_id,description,amount,year,month,city,state,state_abv,country,zip_code,retailer,us_sales,other_sales,all_sales,us_vs_world
1157343460,559832710,2018-12-12T11:00:00.000+0000,963416706,Burlington Coat Factory arc id: 692018,2779.97,2018,12,Jackson,Mississippi,MS,USA,39200.0,Burlington Coat Factory,4707000,54000,4761000,0.9886578449905482
1157343460,559832710,2018-02-28T19:00:00.000+0000,965597811,Burlington Coat Factory ccd id: 1001628,39.97,2018,2,Jackson,Mississippi,MS,USA,39200.0,Burlington Coat Factory,4707000,54000,4761000,0.9886578449905482
1157343460,559832710,2018-01-13T21:00:00.000+0000,207696001,Burlington Coat Factory ccd id: 432804,2300.49,2018,1,Jackson,Mississippi,MS,USA,39200.0,Burlington Coat Factory,4707000,54000,4761000,0.9886578449905482
1157343460,559832710,2018-04-24T20:00:00.000+0000,1437618268,Burlington Coat Factory,5.22,2018,4,Jackson,Mississippi,MS,USA,39200.0,Burlington Coat Factory,4707000,54000,4761000,0.9886578449905482
1157343460,559832710,2018-02-14T18:00:00.000+0000,2137189177,Burlington Coat Factory ppd id: 719062 Jackson,26.63,2018,2,Jackson,Mississippi,MS,USA,39200.0,Burlington Coat Factory,4707000,54000,4761000,0.9886578449905482
1157343460,559832710,2018-09-10T11:00:00.000+0000,1480221214,unkn ppd id: 383317,19.66,2018,9,Jackson,Mississippi,MS,USA,39200.0,Burlington Coat Factory,4707000,54000,4761000,0.9886578449905482
1157343460,559832710,2018-08-14T16:00:00.000+0000,1440658531,Burlington Coat Factory,9.66,2018,8,Jackson,Mississippi,MS,USA,39200.0,Burlington Coat Factory,4707000,54000,4761000,0.9886578449905482
1157343460,559832710,2018-06-04T15:00:00.000+0000,1530940384,Burlington Coat Factory arc id: 784629 Jackson 06-04,35.15,2018,6,Jackson,Mississippi,MS,USA,39200.0,Burlington Coat Factory,4707000,54000,4761000,0.9886578449905482
1157343460,559832710,2018-08-03T07:00:00.000+0000,371539866,Burlington Coat Factory arc id: 457255,3.13,2018,8,Jackson,Mississippi,MS,USA,39200.0,Burlington Coat Factory,4707000,54000,4761000,0.9886578449905482
1157343460,559832710,2018-04-01T02:00:00.000+0000,795563242,unkn ccd id: 503092,6.58,2018,4,Jackson,Mississippi,MS,USA,39200.0,Burlington Coat Factory,4707000,54000,4761000,0.9886578449905482


##4. Data Analysis and Visualization (5 points)

**Q4.1** Find the retailer with the most sales in 2018. Display the information as a pie chart (you can click on the output UI to display any dataframe as a chart once displayed)

In [56]:
# Your answer here
retailer_sum = retailersDF.groupBy('retailer').sum()
display(retailer_sum)
# Walmart has the most sales in 2018.

retailer,sum(retailer_id),sum(us_sales),sum(other_sales),sum(all_sales),sum(us_vs_world)
Alimentation Couche-Tard,1468277998,5167000,3816000,8983000,0.5751975954580875
GameStop,83040202,6225000,2458000,8683000,0.7169181158585742
Michael's,771066397,4277000,461000,4738000,0.902701561840439
Nordstrom,865681996,13259000,21000,13280000,0.9984186746987952
QVC,1678836396,5972000,2746000,8718000,0.6850194998852948
SUPERVALU,2092104004,11499000,0,11499000,1.0
Walgreen,2077350195,72671000,2414000,75085000,0.9678497702603716
Verizon Wireless,1647858807,10959000,0,10959000,1.0
Dick's Sporting Goods,1295306792,6811000,0,6811000,1.0
Gap,1245928212,13071000,3885000,16956000,0.7708775654635527


## Submitting your Solutions
To submit your solution, make sure that you've run all the cells in the notebook (i.e. all the cell outputs are visibile). Then click on **File**->**Export**->**IPython Notebook** and upload the resulting file to canvas.

## Wrap-Up
This homework is just an introduction into big-data processing using Spark. We've covered only a few topics regarding *paritioning*, *broadcasting* and other techniques used to speed up processing of data by modifying the data layout or order of operations. The main take-away we expect you to understand is that these things matter when working with large distributed datasets on clusters of computers. Optimizing them can lead to orders of magnitude of performance and can be the difference between seconds and hours to run the same analyses.