In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import Row
from pyspark.sql import SparkSession
from tests import Lab02_Tests

In [3]:
tests = Lab02_Tests()

## Reaquainting yourself with the Spark UI
In Lab 1_1, we briefly explored the Spark UI to get a feel where basic information about our Spark configuration is held. In this lab, we'll be making heavier use of the Spark UI to help debug and potentially optimize common issues around Spark jobs. Recall that in order to open the Spark UI, you visit http://localhost:4040/jobs. 

### The /jobs page in detail
Here we see the event timeline which is a rolling timeseries for running jobs. Below this, we see a list of all jobs, when they were submitted, how long a given job took, and whether or not tasks succeeded:

![jobs](assets/jobs.png)


Clicking on a job takes you to a more detailed summary of what stages executed for that particular job, as well as a visualization of the DAG Spark assembled for executing these stages:

![stages](assets/stages.png)

We can use this information to help us troubleshoot jobs that may be running non-optimally.

### Observing a long running Spark job
Let's create a long-running Spark job. Run the following cell (it'll take around 15 seconds).

In [None]:
slow_rdd = sc.parallelize(range(1, 10000000))
squared_rdd = slow_rdd.map(lambda x: x**2)
squared_rdd.collect()

Now click on the job in the Spark UI. Next, click on the Stage. And expand the Event Timeline. You'll see something like this: ![timeline](assets/timeline.png)  

The event timeline at this level represents a step-by-step execution of the job. In this case we see that most of our time is spent by the Executor computing our result (about 12 seconds). Let's try to speed this up.

## Improving Performance
### take()
One issue we should immediately note is that we are calling collect() on a sufficiently large RDD. Recall that collect() attempts to return the _entire_ RDD to the driver program. If the RDD is large enough, this could take a long time, or fail outright. For this reason, use caution when applying the collect() action. A better alternative is the take() action. take() allows you to return a list of `n` elements from the RDD. 

In [None]:
fast_rdd = sc.parallelize(range(1, 10000000))
squared_rdd = fast_rdd.map(lambda x: x**2)
squared_rdd.take(100)

This job is much faster, taking milliseconds instead of seconds. Note; however, that while the result appears ordered in this case, don't count on this. The take() method retrieves elements from all worker nodes which means the order of the elements that gets returned can't be guaranteed. 

### takeOrdered()
Suppose I do need a guarantee on the order though. If we assume the RDD is sorted, then takeOrdered() will return the top `n` elements. To demonstrate this, let's revisit 1_1. Write a Spark job that determines the top 10 words in Shakespeare's vocabulary. Implement this using sortBY() and collect() first, and then compare the performance using takeOrdered(). There's a simple unit test that take a list of tuples as input to make sure you have filtered the top ten correctly.

In [None]:
tests.part_a(hash(str('RDD_HERE')))

### Serializing files
File IO is a major bottleneck when working with Apache Spark. The best way to deal with this is to work with serialized files opposed to human-readable files within a pipeline. In this section we'll create a parquet file, and then compare the read performance of reading a parquet file versus reading a plain text file.

First, complete the following steps:
1. Create a SparkSession object
2. Read in the long_file.txt into an RDD
3. Apply the flatMap() transform to split into individual words 
3. Filter out non-alpha characters
4. Convert the resulting RDD into a dataframe with a single column
5. Run the test case to verify the RDD contains the expected number of rows

In [8]:
spark = SparkSession(sc)
rdd = sc.textFile('../data/long_file.txt').flatMap(lambda line: line.split()).filter(str.isalpha)
schema = Row('word')
df = rdd.map(schema).toDF(schema)

In [5]:
tests.part_b(df.count())

Take a look at the processing for the file. It takes a second+ to process about 64kb of data. Let's see if a serialized file gives us any performance improvement. First, we'll write the df to a parquet file

In [3]:
df.write.parquet('../data/outputs/out.parquet')

Now we'll read it back in and check out the performance

In [10]:
df2 = spark.read.load('../data/outputs/out.parquet')

From seconds+ to milliseconds. Note; however, that the conversion process is expensive. So there's an initial upfront cost to serialize the raw text file, and then reading in the serialized file grants us significant performance gains.

## Reuse
Spark offers the capability to optimize jobs by caching intermediate results, which we can then use in later transformations. This means that instead of creating a complex graph with redundant transformations, we can perform a single transform, cache it, and then reuse it later in the graph. 

### persist()

The persist() method allows us to persist RDD's and DataFrames to memory, disk, or both. The following summarizes the capabilities of each:
* __MEMORY_ONLY__: RDD is stored as a deserialized object directly in the JVM. If the RDD is bigger than the JVM's memory, then some or all of the partitions are dumped, and recomputed when needed.
* __MEMORY_AND_DISK__: RDD is stored as a deserialized object directly in the JVM. If the RDD is bigger than the JVM's memory, then some or all of the partitions are spilled to disk.
* __DISK_ONLY__: RDD is stored to directly to disk
* __MEMORY_ONLY_SER__: RDD is stored as serialized object directly in the JVM. If the RDD is bigger than the JVM's memory, then some or all of the partitions are dumped, and recomputed when needed.
* __MEMORY_AND_DISK_SER__: RDD is stored as a serialized object directly in the JVM. If the RDD is bigger than the JVM's memory, then some or all of the partitions are spilled to disk. 


To persist an RDD or DataFrame we first import the StorageLevel mmodule from PySpark

In [15]:
from pyspark import StorageLevel

Then to persist, we call the persist() method and pass in the desired StorageLevel

In [16]:
df2.persist(StorageLevel.MEMORY_ONLY)

DataFrame[word: string]

Then to reclaim that memory, we can either call unpersist(), or let the cache invalidate. Spark's cache implements a Least Recently Used (LRU) caching strategy. This means that the cache entry with the earliest mtime is invalidated first.

In [17]:
df2.unpersist()

DataFrame[word: string]

Alternatively, if we intend to persist to memory only, we can call the cache() method. cache() is a built-in method that utilizes StoageLevel.MEMORY_ONLY under the hood.

In [18]:
df2.cache()

DataFrame[word: string]

Finally, we can assert whether an RDD or DataFrame is cached by checking the storageLevel.useMemory property

In [23]:
df2.storageLevel.useMemory

True

In [24]:
df2.unpersist()

DataFrame[word: string]

In [25]:
df2.storageLevel.useMemory

False

## Variable Sharing
Our last optimization concept is the idea of variable sharing. Consider a scenario where we need a copy of the same variable across all executors. Spark supports this requirement in two ways. Broadcast variables, and Accumulators.

### Broadcast Variable Sharing
Broadcast variables are read-only variables that are cached to the executors one time, insteadof being shipped with each Task. This means local network overhead, which means jobs running more optimaly. Keep in mind that since these variables exist in the executor's cache, it is entirely possible that a cache eviction invalidates the variable under the LRU strategy. Thus, we favor small broadcast variables, opposed to large. Let's look at an example.

In [26]:
spark_versions = {
    'Python':'PySpark', 
    'Scala':'Spark', 
    'R':'RSpark'
}

broadcast_versions = spark.sparkContext.broadcast(spark_versions)

In this example, we're creating a dictionary of the different Spark versions by programming language, and then broadcasting it to the executors. We can access the values of the broadcasted versions by accessing the `value` property.

In [29]:
broadcast_versions.value['Python']

'PySpark'

### Accumulators
Accumulators are write-only variables that are cached to the executors one time. When an accumulator gets updated, it immediately returns the updated result back to the driver program instead of adding a bunch of shuffling overhead between workers. Unlike broadcast variables, the `value` property is only accessible by the driver program. This is particularly useful for creating counters for validating the success or failure of Spark jobs. Here's an example of what this might look like. 

In [31]:
accum=spark.sparkContext.accumulator(0)
rdd=spark.sparkContext.parallelize(range(1, 100, 2))
rdd.foreach(lambda x: accum.add(1))
accum.value

50

In this example, we create an accumulator initialized to a value 0. Next, we create an RDD and then for each element in the rdd, we increment the accumulator by 1. Finally, we output the value of the counter and see that we have called the accumulator 50 times. This is much faster than calling count() on the RDD, and we can further extend accumulators into methods for tracking bad or inconsistent records, dropped records, even duplicates.