# Pyspark Cluster Lab

### Introduction

In this lesson, we'll practice connecting to a Pyspark cluster, and partitioning our dataset.  We'll do so by. working with a dataset of Netflix titles.  Let's get started.

### Getting Setup (On Google Colab)

* Begin by installing some pip packages and the java development kit.

In [None]:
!pip install pyspark --quiet
!pip install -U -q PyDrive --quiet 
!apt install openjdk-8-jdk-headless &> /dev/null

* Then set the java environmental variable

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

* Then connect to a SparkSession, setting the spark ui port to `4050`.

In [None]:
from pyspark import SparkContext, SparkConf

conf = SparkConf().set('spark.ui.port', '4050').setAppName("netflix").setMaster("local[2]")
sc = SparkContext.getOrCreate(conf=conf)

* Then we need to install ngrok which will allow us to place our local spark ui on the web.

In [1]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip &> /dev/null
!unzip ngrok-stable-linux-amd64.zip &> /dev/null
get_ipython().system_raw('./ngrok http 4050 &')

* And finally we get a link our Spark UI

In [None]:
!curl -s http://localhost:4040/api/tunnels | python3 -c \
    "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

### Viewing our Cluster

Now get a link to an interface to view our cluster, simply by referencing our Spark Context, or we can view the link outputted above.

In [6]:
sc

If we click on the blue `Spark UI` link above, this will take us to the Spark UI dashboard.  From there, the toolbar at the top and click on the `executors` tab.  From there, we'll see something like the following.

<img src="./executors.png" width="100%">

So we can see that under `Cores`, it says 4.  And notice that our executor id says driver, which makes sense -- because when everything is local we do not have worker nodes that are separate from our driver node.

### Reading some data

Ok, now let's load some data into our Spark cluster.  To do so, we can first read in our Netflix data using a separate library, pandas.

In [2]:
import pandas as pd
url = "https://raw.githubusercontent.com/jigsawlabs-student/pyspark-cluster-lab/main/netflix_titles.csv"
df = pd.read_csv(url)
movies = df.to_dict('records')

So above, movies is a list of dictionaries in Python.

In [8]:
movies[:1]

[{'show_id': 's1',
  'type': 'TV Show',
  'title': '3%',
  'director': nan,
  'cast': 'João Miguel, Bianca Comparato, Michel Gomes, Rodolfo Valente, Vaneza Oliveira, Rafael Lozano, Viviane Porto, Mel Fronckowiak, Sergio Mamberti, Zezé Motta, Celso Frateschi',
  'country': 'Brazil',
  'date_added': 'August 14, 2020',
  'release_year': 2020,
  'rating': 'TV-MA',
  'duration': '4 Seasons',
  'listed_in': 'International TV Shows, TV Dramas, TV Sci-Fi & Fantasy',
  'description': 'In a future where the elite inhabit an island paradise far from the crowded slums, you get one chance to join the 3% saved from squalor.'}]

Ok, from here, let's distribute our data across the executors of our cluster.  We can do see by passing our data into the `parallelize` method.

In [10]:
movies_rdd = None
movies_rdd

# ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274

And from there, let's see the number of partitions that our data is broken into.

In [47]:

# 4

4

Now we'll learn more about how to use the Spark UI to see what Spark is doing, but perhaps one way is to look at the memory consumed. We can view this by again going to the executors tab, and then looking at the storage memory.   Notice that none of the memory was consumed, even though we directed Spark to create an RDD.

> <img src="./executors_mem.png" width="100%">

### Performing Operations

Ok, next use the `filter` and method to see all of movies from the country `'Brazil'`.  We'll call the `collect` method for you in the next line, so that you can see the results.

In [49]:
brazil_movies_rdd = None

In [51]:
brazil_movies_rdd.collect()[:1]

# [{'show_id': 's1',
#   'type': 'TV Show',
#   'title': '3%',
#   'director': nan,
#   'cast': 'João Miguel, Bianca Comparato, Michel Gomes, Rodolfo Valente, Vaneza Oliveira, Rafael Lozano, Viviane Porto, Mel Fronckowiak, Sergio Mamberti, Zezé Motta, Celso Frateschi',
#   'country': 'Brazil',
#   'date_added': 'August 14, 2020',
#   'release_year': 2020,
#   'rating': 'TV-MA',
#   'duration': '4 Seasons',
#   'listed_in': 'International TV Shows, TV Dramas, TV Sci-Fi & Fantasy',
#   'description': 'In a future where the elite inhabit an island paradise far from the crowded slums, you get one chance to join the 3% saved from squalor.'}]

[{'show_id': 's1',
  'type': 'TV Show',
  'title': '3%',
  'director': nan,
  'cast': 'João Miguel, Bianca Comparato, Michel Gomes, Rodolfo Valente, Vaneza Oliveira, Rafael Lozano, Viviane Porto, Mel Fronckowiak, Sergio Mamberti, Zezé Motta, Celso Frateschi',
  'country': 'Brazil',
  'date_added': 'August 14, 2020',
  'release_year': 2020,
  'rating': 'TV-MA',
  'duration': '4 Seasons',
  'listed_in': 'International TV Shows, TV Dramas, TV Sci-Fi & Fantasy',
  'description': 'In a future where the elite inhabit an island paradise far from the crowded slums, you get one chance to join the 3% saved from squalor.'}]

So now if we go back to the executors tab, we can see that some of our memory at this point was consumed.  This tells us that once we saw this result, Spark used some of the memory to save the result.

Ok, let's do another query.  This time, use spark to find the ratings of us movies.  
> Once again, we'll call collect for you in the next line.

In [52]:
us_ratings = None

In [53]:
us_ratings.collect()[:2]
# ['PG-13', 'PG-13']

['PG-13', 'PG-13']

And again we'll take another look at the storage memory that's consumed.  This time we can see that we consumed additional memory.

> <img src='./filter_map_mem.png' width='60%'>

> The important part is developing a sense for when Spark is storing our data in memory.  It seems like that when we perform queries, Spark is saving our results in memory.

Finally, it's a good idea to shutdown the context when we are done working with it.  We can do so with the following:

In [54]:
sc.stop()

> Notice that if we now try to go to the Spark UI by say refreshing the page, we cannot.

<img src="./site_not_reach.png" width="60%">

### Summary

In this lab, we practiced connecting to a cluster, creating a distributed dataset that we can query in parallel, and using the Spark UI to get a better understanding of the operations in our cluster.  By monitoring the memory consumption in our cluster, we saw that Spark is storing the results of our operations in memory.