# Working with Pyspark

### Introduction

As we know, Spark allows us to store data in memory in a distributed cluster of nodes.  In that cluster we have the driver node, as well as compute nodes, or executors that carry out tasks on the various partitions of data.

<img src="./spark_cluster_partition.jpg" width="80%">

We can see that the diagram above the SparkContext is our way of controlling the driver, and then we can use that Spark context to read in a data set like our table of movies above.  That table of movies is *distributed* across the nodes -- a distributed dataset if you will, and because Spark designs the dataset to be recoverable when a given node fails, Spark calls this dataset a Resilient Distributed Dataset, or RDD.  

> So above, the table of movies, distributed across the various nodes is an RDD.

In this lesson, we'll learn how to create our SparkContext -- the entrypoint to our cluster -- as well as our distributed dataset the RDD.

### Building our Spark Context

So as we saw above, our Spark Context is our way of connecting with our Spark cluster.  And we create our Spark Context with the following.

> First, we install the python Pyspark package with the following.

In [4]:
# !pip install pyspark

Now it's time to create the Spark context.

In [19]:
from pyspark import SparkContext, SparkConf

In [98]:
conf = SparkConf().setAppName("films").setMaster("local[2]")
sc = SparkContext.getOrCreate(conf=conf)

So above, we create the SparkContext by specifying the information needed to connect to our spark cluster.  Let's walk through the steps to doing so.

First, we set up some configuration for the spark session.  

In [99]:
conf = SparkConf().setAppName("films").setMaster("local[2]")

Now above, we call `SparkConf().setAppName("films")` to set up our application name, which is just a name that we specify.  And then we use the `setMaster("local[2]")` method to specify the location of the cluster we want to connect to.  Generally, we'll pass in the url to specify this location, but above we are connecting to the cluster on our local computer so we just use the string `"local"`.  The square brackets are there to indicate the number of parallel proccesses we want running.  

> So above we are specifying to processes running on two cores of our computer.  This means that two processes can be running in parallel on our spark cluster.  If we want to use as many cores as are available on our computer we can use `"local[*]"`.

Now that we've created the configuration for our SparkContext, in the second line above, we call the `SparkContext.getOrCreate` method to create a new context.

In [27]:
sc = SparkContext.getOrCreate(conf=conf)

That's it, our SparkContext is all set up.

In [31]:
sc

### Creating an RDD

Once we have created our SparkContext, we can now read in our data from an external resource.

<img src="./spark_cluster_partition.jpg" width="70%">

Now that we have created our context, we can use our context to create an resilient distributed dataset.  Now an RDD can be a really any data that can be split up among multiple processes.  To start, let's define some sample data, which can just be a list of dictionaries representing four movies below.

In [87]:
movies = [{'id': '287947',
  'title': 'Shazam!',
  'poster': 'https://image.tmdb.org/t/p/w500/xnopI5Xtky18MPhK40cZAGAOVeV.jpg',
  'overview': 'A boy is given the ability to become an adult superhero in times of need with a single magic word.',
  'release_date': 1553299200,
  'genres': ['Action', 'Comedy', 'Fantasy']},
 {'id': '299537',
  'title': 'Captain Marvel',
  'poster': 'https://image.tmdb.org/t/p/w500/AtsgWhDnHTq68L0lLsUrCnM7TjG.jpg',
  'overview': 'The story follows Carol Danvers as she becomes one of the universe’s most powerful heroes when Earth is caught in the middle of a galactic war between two alien races. Set in the 1990s, Captain Marvel is an all-new adventure from a previously unseen period in the history of the Marvel Cinematic Universe.',
  'release_date': 1551830400,
  'genres': ['Action', 'Adventure', 'Science Fiction']},
 {'id': '522681',
  'title': 'Escape Room',
  'poster': 'https://image.tmdb.org/t/p/w500/8Ls1tZ6qjGzfGHjBB7ihOnf7f0b.jpg',
  'overview': 'Six strangers find themselves in circumstances beyond their control, and must use their wits to survive.',
  'release_date': 1546473600,
  'genres': ['Thriller', 'Action', 'Horror', 'Science Fiction']},
 {'id': '166428',
  'title': 'How to Train Your Dragon: The Hidden World',
  'poster': 'https://image.tmdb.org/t/p/w500/xvx4Yhf0DVH8G4LzNISpMfFBDy2.jpg',
  'overview': 'As Hiccup fulfills his dream of creating a peaceful dragon utopia, Toothless’ discovery of an untamed, elusive mate draws the Night Fury away. When danger mounts at home and Hiccup’s reign as village chief is tested, both dragon and rider must make impossible decisions to save their kind.',
  'release_date': 1546473600,
  'genres': ['Animation', 'Family', 'Adventure']}]

Now we start with this data as a simple list of dictionaries in Python, but we can convert it into an RDD with the following:

In [96]:
movies_rdd = sc.parallelize(movies)
movies_rdd

ParallelCollectionRDD[15] at readRDDFromFile at PythonRDD.scala:274

In [97]:
movies_rdd.getNumPartitions()

2

And we can attempt to parallelize this across multiple four processes by adding a second argument where we specify the number of partitions.

In [102]:
movies_rdd = sc.parallelize(movies, 4)
movies_rdd

ParallelCollectionRDD[17] at readRDDFromFile at PythonRDD.scala:274

> We can see if this did separate the data into four partitions with the following:

In [103]:
movies_rdd.getNumPartitions()

4

The importance of this is now we can perform operations across four different partitions of the dataset simultanously.  So if we want to find a matching film record, we can search across four different partitions simultaneously.

### Summary

In the last lesson, we saw how RDDs are distributed datasets.  We also saw how we connect to the driver of our cluster, by creating a SparkContext with something like the following:

In [3]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("films").setMaster("local[2]")
sc = SparkContext.getOrCreate(conf=conf)

By specifying `setMaster("local[2]")`, above we said specified to distribute our tasks across two different cores.  And now with our SparkContext set up, we can turn a collection in Python into an RDD.

In [4]:
movies = ['Shazam!', 'Captain Marvel', 'Escape Room', 'How to Train Your Dragon: The Hidden World']

In [5]:
movies_rdd = sc.parallelize(movies)

This is the first attribute of an RDD, our data is now partitioned, and when we eventually operate on this data, we can operate on different components of it simultanously.  We can see this with the following:

In [6]:
movies_rdd.getNumPartitions()

2

# Resilient RDDs

### Introduction

So if in the last lesson, we saw how our RDDs are distributed, and that we distribute our dataset across a number of partitions, in this lesson we'll learn more about how RDDs achieve resiliency.

### Building Fault Tolerance In Memory

So as we know, the principal feature of Spark is that it provides for in memory storage.  Now in memory storage comes with some challenges -- mainly that even thouh we are not saving any updates to disk, we still want our spark cluster to be *fault tolerant*.  This means that even if one of our nodes goes down, we do not want the data on that node to be lost.

Normally, distributed databases achieve this by copying partitions of the data to multiple nodes.  This way if one node goes down, there is still a backup.

<img src="./copied_data.jpg" width="40%">

The cost to this, however, (1) this copying takes up a significant amount of space, and (2) that it requires copying data over a the cluster's network, whose bandwidth is lower than the RAM.

> Below, our data is moved from one node to another.  With a lot of data, and narrow bandwidth, this can be a slow process.

<img src="./network_slow.jpg" width="60%">

In Spark things are done differently.  Instead of copying the data over, from one node to another, Spark instead keeps track of all of the steps to recreate our dataset.  So if the node goes down, it can simply reapply those steps.

We'll learn more about this in the next section.

### DAGs in Spark

Let's get started by again, connecting to our cluster, and then creating an RDD.

In [8]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("films").setMaster("local[2]")
sc = SparkContext.getOrCreate(conf=conf)

In [11]:
movies = ['dark knight', 'dunkirk', 'pulp fiction', 'avatar']

In [12]:
rdd = sc.parallelize(movies)

This time we'll change our data.  We can do so by using the `map` function to capitalize each word in our list of movies.

In [20]:
rdd.map(lambda movie: movie.title()).collect()

['Dark Knight', 'Dunkirk', 'Pulp Fiction', 'Avatar']

> So above, `movie` represents each title in the list, and we call `title` on each one to capitalize.  We see the results of the transformation with `collect`.

Now let's see what happens under the hood.  As we said above, to keep our data fault tolerant, instead of copying data from one node to another, Spark instead keeps track of all of the steps to recreate the dataset.

<img src="./rdd_one_to_two.jpg" width="40%">

So what the diagram above shows, is that when we executed the line `rdd.map(lambda movie: movie.title()).collect()`, we actually created a second RDD.  And spark has logged the step needed to go from one RDD to the second one.  So if the node that has the capitalized `Avatar` goes down, we can just reapply the `map(lambda movie: movie.title())` function to recreate the data.

So there are two takeaways from the above.  The first is that when we apply a transformation to a dataset, we are actually creating a new RDD.  This is because RDD's are read only, so if apply a transformation, we'll create a new RDD in the process.  The second takeaway, is that Spark stores the steps to go from the first rdd to the second one, and reapplies those steps to the relevant data if a node goes down.

### Only Coarse Transformations

Because keeping track of every tiny change that happens to a dataset, Spark does not give us the functions to apply changes just to specific records.  Rather when we apply changes, we must apply these changes to the entire dataset.  For example, above, we capitalized every record with the `map` function.

In [21]:
rdd.map(lambda movie: movie.title()).collect()

['Dark Knight', 'Dunkirk', 'Pulp Fiction', 'Avatar']

Or, with our RDDs, we can also go through every record, and only select those that begin with the letter `d`.

In [31]:
rdd.filter(lambda movie: movie[0] == 'd').collect()

['dark knight', 'dunkirk']

And of course we can chain our two methods together.

In [37]:
rdd.map(lambda movie: movie.title()).filter(lambda movie: movie[0] == 'd' or movie[0] == 'D').collect()

['Dark Knight', 'Dunkirk']

> So above we first capitalized each of our movies, and then selected those that begin with a capital `D`.

The point from above, though is that whether we use `map` or `filter`, each step applies to *every* record.  These types of transformations are called `coarse grained transformations` - and these are the only kinds of transformations that Spark allows.  If we were to select individual records and then make changes to them, this would be fine-grained transformations.  By only allowing coarse grained transformations, this makes it easier on Spark to log the changes made from one RDD to another -- as only these coarse grained transformations are allowed.

<img src="./filter_map.jpg" width="40%">

### Summary

So we saw in this lesson that Spark achieves fault tolerance by keeping a recording of the transformations needed to recreate our data.  Because the RDDs are read only, so when we transform our data, really we are creating a new RDD.  And Spark keeps track of the steps necessary to go from one transformation to the other.

<img src="./rdd_one_to_two.jpg" width="40%">

To make recording these steps easier, on Spark RDDs, we can only apply coarse grained transformations, which apply to the entire Spark dataset.  We'll learn more of these transformations in the following lesson, but to start, `map` which applies the same change to every record, and `filter` which selects from a set of elements are two coarse grained transformations.

### Resources

[Presenting RDDs](https://www.usenix.org/system/files/conference/nsdi12/nsdi12-final138.pdf)

[RDD Programming Guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html)

* [RDDs Simplified](https://vishnuviswanath.com/spark_rdd)

* [Databricks RDDs](https://databricks.com/glossary/what-is-rdd)

[Databricks best practices](https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/index.html)