## Introduction to PySpark on an EMR Cluster

[This notebook is meant to be run on an AWS EMR Cluster with Spark installed; follow the instructions in `emr_cheatsheet.md` to set up such a cluster]

Let's first take a look at the Spark version that we have available within our EMR cluster:

In [1]:
sc.version

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1714403316049_0004,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'3.0.1-amzn-0'

Suppose we have a standard Python list:

In [2]:
lst = [i for i in range(100)]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Let's create an RDD from this list (with as many partitions as we have nodes by default) to demonstrate some basic PySpark operations:

In [3]:
lst_p = sc.parallelize(lst)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
lst_p.getNumPartitions()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

4

In [5]:
lst_p

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:262

Once we have an RDD, we can perform "transformations" on it (such as `map` and `filter`):

In [6]:
filtered = lst_p.filter(lambda x: x < 10) # transformation 1
filtered

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PythonRDD[1] at RDD at PythonRDD.scala:53

In [7]:
mapped = filtered.map(lambda x: x * 10) # transformation 2
mapped

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PythonRDD[2] at RDD at PythonRDD.scala:53

Recall that transformations are not actually performed until we trigger an "action," such as one of the following:

In [8]:
mapped.reduce(lambda a, b: a + b)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

450

In [9]:
mapped.count() # also built-in reductions like count and sum

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

10

In [10]:
mapped.collect() # action to bring all data to primary node as list

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[0, 10, 20, 30, 40, 50, 60, 70, 80, 90]

In [11]:
mapped.take(3) # action to bring select number of data points to primary node as list

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[0, 10, 20]

If we plan to perform repeated transformations/actions on an RDD, note that we can persist to memory to speed up execution:

In [13]:
lst_p.persist()

# check if it has been persisted/cached
lst_p.is_cached

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

True

In [14]:
lst_p.filter(lambda x: x < 10) \
     .map(lambda x: x * 10) \
     .reduce(lambda a, b: a + b)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

450

## A Simple Application

Reduction by key after mapping/filtering is particularly common workflow. Let's look at a simple application that performs such a transformation. 

Assume that we have students and grades for two assignments and we want to return the average grade for each student who earned at least a 90 on an assignment, where student grades were stored in a list of tuples:

In [18]:
student_grades = [('John', 90), ('Sue', 95), ('John', 89), ('Sue', 92)]

sg_p = sc.parallelize(student_grades) \
         .filter(lambda x: x[1] > 90) \
         .reduceByKey(lambda a, b: (a + b)) \
         .map(lambda x: (x[0], x[1] / 2)) \
         .collect()
sg_p

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('Sue', 93.5)]

## Activity 1:

Suppose you wanted to parallelize a portion of the Monte Carlo Pi estimation approach we covered earlier in the course using PySpark RDDs and had the following code written thus far:

In [None]:
import numpy as np

n_sims = 100000
rand = np.random.uniform(low=-1, high=1, size=(n_sims, 2))

How would you parallelize the NumPy array rand as an RDD? Using this parallelized RDD, how might you estimate pi?

## Activity 2

1. Describe in words what is happening in each line of the PySpark code below. At which line will the code actually be executed? Why? You're encouraged to run the code in an EMR cluster and test it out!

2. Consider the scalability of this code; if your number of tokens increases (i.e. into the millions) would you want to continue using the `collect` method? Why? What problems might you run into on your EMR cluster?

In [None]:
import re
tokens = ['cat', 'dog', 'cat', 'rat', 'bat', 'frog', 'cat', 'dog']

tokens_p = sc.parallelize(tokens).persist() \
             .map(lambda t: (t, 1)) \
             .filter(lambda t: re.search(r'[at]+', t[0])) \
             .reduceByKey(lambda a, b: a + b) \
             .collect()