# Parallel Processing Keystones

### Map Reduce and Shuffling

### Introduction

In [1]:
from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("films").setMaster("local[2]")
sc = SparkContext.getOrCreate(conf=conf)

### Map Reduce in Pyspark

Now from here, we'll create our list of movies in Python.

In [3]:
movies = ['Shazam!', 'Minari', 'Captain Marvel', 
          'Pulp Fiction', 'Casablanca', 'Michael Clayton',
          'Sicario']

And from there turn it into an RDD.

In [2]:
movies = ['Shazam!', 'Minari', 'Captain Marvel', 
          'Pulp Fiction', 'Casablanca', 'Michael Clayton',
          'Sicario']

In [4]:
rdd = sc.parallelize(movies)

In [7]:
rdd.filter(lambda movie: movie == 'Michael Clayton').collect()

['Michael Clayton']

In [9]:
rdd.getNumPartitions()

2

*  if we say use a `filter` to look for the movie 'Michael Clayton' across the entire dataset, we perform this filter task four times across the cluster in parallel.

<img src="./map_red_cluster.jpg" width="100%">

In [5]:
movie_name = 'Michael Clayton'


### Shuffling

* Shuffling occurs when an operation requires us to send our data across partitions to successfully perform a query.  

> Why we care: Sending data across nodes is often a time intensive.

> <img src="./network_slow.jpg" width="60%">

#### An example

Say we have the following movies across the following partitions. 

* Partition 1
    * Shazam!
    * Minari
* Partition 2
    * Captain Marvel
    * Pulp Fiction
* Partition 3
    * Casablanca
    * Michael Clayton
* Partition 4
    * Sicario

* Group the movies together by their first letter.  

This grouping will require sending some movies from one worker node to another so that they can reside together.

In [10]:
rdd.groupBy(lambda movie: movie[0]).map(lambda group: (group[0], list(group[1]))).collect()

[('M', ['Minari', 'Michael Clayton']),
 ('S', ['Shazam!', 'Sicario']),
 ('C', ['Captain Marvel', 'Casablanca']),
 ('P', ['Pulp Fiction'])]

In [47]:
grouped_rdd.collect()

[('M', ['Minari', 'Michael Clayton']),
 ('S', ['Shazam!', 'Sicario']),
 ('C', ['Captain Marvel', 'Casablanca']),
 ('P', ['Pulp Fiction'])]

### Looking under the hood

A great way to understand Pyspark is with the spark UI.

In [48]:
sc

And then if we click on the Spark UI link, we'll see something like the following. 

> <img src="./completed_jobs.png" width="60%">

* Understanding Map partitions

* map
    * will execute our code one time for each record.  
        * So 2,000 records our related code will be called 2000 times.  
* map partitions
    * function is called only once per partition.  
    * so if those 2,000 records are divided into 20 partitions, then the related function is only called 20 times.  This is more efficient call.  

In [12]:
sc.stop()

### Resources

[Shuffling Documentation](https://spark.apache.org/docs/latest/rdd-programming-guide.html#shuffle-operations)

[5 hour pyspark tutorial](https://www.youtube.com/watch?v=GFC2gOL1p9k&t=2743s)

[Map vs Map Partition](https://sparkbyexamples.com/spark/spark-map-vs-mappartitions-transformation/)

[Group by key](https://backtobazics.com/big-data/spark/apache-spark-groupbykey-example/)

[Pyspark Google Colab](https://www.analyticsvidhya.com/blog/2020/11/a-must-read-guide-on-how-to-work-with-pyspark-on-google-colab-for-data-scientists/)