# Map Reduce Lab

### Introduction

In this lesson, we'll practice working with Pyspark by looking at sales at different grocery store chains.  Let's get started.

### Creating our Spark Context

Let's begin by creating our spark context.  As we know, our spark context is what connects us to our driver.  We create our context in two steps.  The first is to set up our configuration.  

> Create a configuration has an application name of `groceryStores`, that connects to the local cluster on two cores.

In [4]:
from pyspark import SparkContext, SparkConf

In [5]:
conf = SparkConf().setAppName("groceryStores").setMaster("local[2]")

> And from there create the Spark Context.

In [6]:
sc = SparkContext.getOrCreate(conf=conf)

> Now if we look at the app name we can check that our app name and cluster was set up properly.

In [7]:
sc.appName
# 'groceryStores'

'groceryStores'

In [8]:
sc.master

# 'local[2]'

'local[2]'

### Creating our RDD

Now to create our RDD, we'll load in some data from a pandas dataframe, and convert this dataframe to a list of dictionaries.

In [1]:
import pandas as pd
url = "https://raw.githubusercontent.com/jigsawlabs-student/pyspark-map-reduce-lab/main/supermarket_sales.csv"
df = pd.read_csv(url)
sales = df.to_dict('records')

In [19]:
sales[:1]

[{'Invoice ID': '750-67-8428',
  'Branch': 'A',
  'City': 'Yangon',
  'Customer type': 'Member',
  'Gender': 'Female',
  'Product line': 'Health and beauty',
  'Unit price': 74.69,
  'Quantity': 7,
  'Tax 5%': 26.1415,
  'Total': 548.9715,
  'Date': '1/5/2019',
  'Time': '13:08',
  'Payment': 'Ewallet',
  'cogs': 522.83,
  'gross margin percentage': 4.761904762,
  'gross income': 26.1415,
  'Rating': 9.1}]

Ok, so currently, our `sales` is a list of dictionaries.  Turn this list of dictionaries into an RDD by calling the `parallelize` method and passing in the list of dictionaries.

In [39]:
sales_rdd = sc.parallelize(sales)

In [40]:
sales_rdd
# ParallelCollectionRDD[4] at readRDDFromFile at PythonRDD

ParallelCollectionRDD[11] at readRDDFromFile at PythonRDD.scala:274

> We can see that we have 1000 records.

In [41]:
sales_rdd.count()

1000

Now let's see the number of partitions for our rdd.

In [42]:
sales_rdd.getNumPartitions()
# 2

2

Ok, so now let's filter for only branch b (below you only need to use the `filter` method.  We'll call the `collect` method for you later on).

In [43]:
branch_b = sales_rdd.filter(lambda row: row['Branch'] == 'B')

In [37]:
branch_b.collect()[0]
# [{'Invoice ID': '692-92-5582',
#   'Branch': 'B',
#   'City': 'Mandalay',
#   'Customer type': 'Member',
#   'Gender': 'Female',
#   'Product line': 'Food and beverages',
#   'Unit price': 54.84,
#   'Quantity': 3,
#   'Tax 5%': 8.226,
#   'Total': 172.746,
#   'Date': '2/20/2019',
#   'Time': '13:27',
#   'Payment': 'Credit card',
#   'cogs': 164.52,
#   'gross margin percentage': 4.761904762,
#   'gross income': 8.226,
#   'Rating': 5.9}]

{'Invoice ID': '692-92-5582',
 'Branch': 'B',
 'City': 'Mandalay',
 'Customer type': 'Member',
 'Gender': 'Female',
 'Product line': 'Food and beverages',
 'Unit price': 54.84,
 'Quantity': 3,
 'Tax 5%': 8.226,
 'Total': 172.746,
 'Date': '2/20/2019',
 'Time': '13:27',
 'Payment': 'Credit card',
 'cogs': 164.52,
 'gross margin percentage': 4.761904762,
 'gross income': 8.226,
 'Rating': 5.9}

Ok, so from here, let's take a look at the spark ui.  We can look at the spark ui by simply typing in the spark context, and from there clicking on the link to the spark ui. 

> We can also directly go to http://localhost:4040, which is the designated port for spark ui.

In [38]:
sc

When we click on the spark ui, we should see a number of jobs listed.  As we know, the job  at the top is the most recent job that was called.  Click on the most recent job and view the DAG.

<img src="./dag_parallel.png" width="20%">

From there, click on the stage that has parallelize to dig a bit deeper into this.

<img src="./dag_collect.png" width="40%">

So here it looks like Spark has first created a partitioned dataset with the RDD.  It says `readRDDFromFile`, but this is when it reads from our list of dictionaries. And then we can see that we returned a PythonRDD.  This is the list of dictionaries that was returned to us when we called `collect()`. 

> So above, Spark applied the filter function across the two partitions of the dataset, and then returned the results in the Python list of dictionaries in Python.  

We can see that this was performed in parallel, if we scroll down and view the event timeline.

<img src="./green_parallel.png" width="80%">

So notice, here, we see two lines, each of them represents a task run on a separate core.  Also, notice that these tasks overlab with one another, indicating that they were run in parallel.

> If we look at the color coding, we can see that we see a blue for scheduling of tasks, and a longer green line for the compute time.

### Seeing our Shuffle

Ok, now let's run a procedure that will result in shuffle.  For us, this is our group by.  We can begin with our `sales_rdd`, and from there let's group by the stores, and count the number of sales in each store.

> To see the results, end with a call to `collect()`.

In [49]:
sales_rdd.groupBy(lambda sale: sale['Branch']).map(lambda branch_sales: (branch_sales[0], len(branch_sales[1]))).collect()

# [('C', 328), ('A', 340), ('B', 332)]

[('C', 328), ('A', 340), ('B', 332)]

After getting the correct result, take a look at the spark ui to see what occurred in the job.  For our most recent job, we should see something like the following:

<img src="./dag_viz.png" width="40%">

And then below we see the following.

<img src="./completed_stage.png" width="90%">

So the first step was the `groupBy`, which occurred in parallel.  And then in the following stage partitionBy and mapPartitions occurred --  this was the count function. Let's take a closer look at the first stage -- which has the `parallelize`.

<img src="./groupby_stage.png" width="60%">

And on the same panel we can see the event timeline.

<img src="./shuffle_event.png" width="100%">

So looking at the event timeliene, we can see that first the compute finds the records by the branch, and then, in yellow, we see that the shuffle occurs to repartition the the records by key.  

### Summary

In this lab, we practiced working with Pyspark, and using the Spark UI to see the operations that occur, even when we call relatively simple methods.  We saw that by partitioning our data, methods like `filter` are performed in parallel.  And we can see that by looking at the tasks performede in a stage, and seeing that they occur simultaneously.  

Then, we performed a groupby, which as we know results in shuffling.  This means that our data needs to be repartitioned, and often transferred between worker nodes to perform the operation.  We saw the shuffling occur, by taking a closer look at the stage that performed the groupby and looking at the event timeline.

<img src="./shuffle_event.png" width="100%">

### Resouorces

[From Google Cloud to Pyspark](https://medium.com/@kashif.sohail/read-files-from-google-cloud-storage-bucket-using-local-pyspark-and-jupyter-notebooks-f8bd43f4b42e)