# Abstract

**Big data** has received a lot of attention over the last few years, and for good reason. Companies like Google and Yahoo! have grown their user bases significantly, and they're collecting more information on how people interact with their products.

While software companies have gotten better at collecting massive amounts of data, their ability to analyze and make sense of it hasn't always improved. Because existing technologies couldn't analyze such large quantities of data as the data collection trends increased, companies like Google, Facebook, Yahoo!, and LinkedIn had to build new tools and approaches that could do the job.

Engineers initially tried using bigger and more powerful computers to process the data, but they still ran into limits for many computational problems. Along the way, they developed paradigms like **MapReduce** that efficiently distribute calculations over hundreds or thousands of computers to calculate the result. Hadoop is an open source project that quickly became the dominant processing toolkit for big data.

# Hadoop

Hadoop consists of a file system (Hadoop Distributed File System, or HDFS) and its own implementation of the MapReduce paradigm. MapReduce converts computations into Map and Reduce steps that Hadoop can easily distribute over many machines. We'll cover how MapReduce works later in this lesson.

Hadoop made it possible to analyze large data sets, but it relied heavily on disk storage (rather than memory) for computation. While it's inexpensive to store large amounts of data this way, it makes accessing and processing it much slower.

Hadoop wasn't a great solution for calculations requiring multiple passes over the same data or many intermediate steps, due to the need to write to and read from the disk between each step. This drawback also made Hadoop difficult to use for interactive data analysis, the main task for data scientists.

Hadoop also suffered from suboptimal support for the additional libraries many data scientists needed, such as SQL and machine learning implementations. Once the cost of RAM (computer memory) started to drop significantly, augmenting or replacing Hadoop by storing data in-memory quickly emerged as an appealing alternative.

The UC Berkeley AMP Lab spearheaded groundbreaking work to develop Spark, which uses distributed, in-memory data structures to accelerate data processing workloads by several orders of magnitude. If you're interested in learning more, you can check out some of the original papers on the Apache Spark homepage.

https://spark.apache.org/research.html


## Data Structure in Spark

The core data structure in Spark is a **resilient distributed data set (RDD)**. As the name suggests, an RDD is Spark's representation of a data set that's distributed across the RAM, or memory, of a cluster of many machines. An RDD object is essentially a collection of elements we can use to hold lists of tuples, dictionaries, lists, etc. Similar to a pandas DataFrame, we can load a data set into an RDD, and then run any of the methods accessible to that object.

## PySpark

While the Spark toolkit is in Scala, a language that compiles down to bytecode for the JVM, the open source community has developed a wonderful toolkit called **PySpark** that allows us to **interface with RDDs in Python**. Thanks to a library called **Py4J**, Python can interface with Java objects (in our case RDDs). **Py4J** is also one of the tools that makes PySpark work.

## Data set

We'll work with a data set containing the names of all of the guests who have appeared on The Daily Show.

To begin, we'll load the data set into an RDD. We're using the TSV version of [FiveThirtyEight's](https://github.com/fivethirtyeight/data/tree/master/daily-show-guests) data set. TSV files use a tab character ("\t") as the delimiter, instead of the comma (",") that CSV files use.

In Spark, the **SparkContext object** manages the connection to the clusters, and it coordinates processes on those clusters. Specifically, it connects to the cluster managers. The cluster managers control the executors that run the computations. Here's a diagram from the Spark documentation that will help you visualize the architecture:

<Img src ="https://github.com/rhnyewale/Apache-Spark/blob/main/Images/SparkContext.jpg?raw=true">

We automatically have access to the SparkContext object sc. We then run the following code to read the TSV data set into an RDD object raw_data:

In [1]:
# Find path to Spark
import findspark
findspark.init()

In [2]:
#Import PySpark and initialize SparkContext object
import pyspark
sc = pyspark.SparkContext()

We automatically have access to the SparkContext object sc. We then run the following code to read the TSV data set into an RDD object raw_data:

In [6]:
# Read 'recent-grads.csv' in to an RDD
raw_data = sc.textFile('data/daily_show.tsv')

The RDD object raw_data resembles a list of string objects, with one object for each line in the data set. We then use the take() method to print the first five elements of the RDD:

In [7]:
raw_data.take(5)

['YEAR\tGoogleKnowlege_Occupation\tShow\tGroup\tRaw_Guest_List',
 '1999\tactor\t1/11/99\tActing\tMichael J. Fox',
 '1999\tComedian\t1/12/99\tComedy\tSandra Bernhard',
 '1999\ttelevision actress\t1/13/99\tActing\tTracey Ullman',
 '1999\tfilm actress\t1/14/99\tActing\tGillian Anderson']

To explore the other methods an RDD object has access to, check out the [PySpark documentation](https://spark.apache.org/docs/1.1.1/api/python/pyspark.rdd.RDD-class.html#take). take(n) will return the first n elements of the RDD.

**Advantages of Spark over Python**

So, if an RDD resembles a Python list, why don't we just use bracket notation to access elements in the RDD?

The answer is that Spark distributes RDD objects across many partitions, and the RDD specifically handles distributed data. We can't rely on the standard implementation of a list for these reasons.

Spark offers many advantages over regular Python. For example, thanks to RDD [abstraction](https://en.wikipedia.org/wiki/Abstraction_(computer_science)), you can run Spark locally on your own computer. Spark will simulate distributing your calculations over many machines by automatically slicing your computer's memory into partitions.

Spark's RDD implementation also lets us evaluate code "lazily," meaning we can postpone running a calculation until absolutely necessary. On the previous screen, Spark waited to load the TSV file into an RDD until raw_data.take(5) executed. When our code called raw_data = sc.textFile("dail_show.tsv"), Spark created a pointer to the file, but it didn't actually read it into raw_data until raw_data.take(5) needed that variable to run its logic.

The advantage of "lazy" evaluation is that we can build up a queue of tasks and let Spark optimize the workflow in the background. In regular Python, the interpreter can't do much workflow optimization. We'll look at more examples of lazy evaluation later on.


While Spark borrowed heavily from Hadoop's MapReduce pattern, it's different in many ways.

The main thing to know when working with Spark is data **pipelining**. Every operation or calculation in Spark is essentially a series of steps that we can chain together and run in succession to form a **pipeline**. Each step in the pipeline returns either a Python value (such as an integer), a Python data structure (such as a dictionary), or an RDD object. We'll begin with the **map()** function.

## Map()

The map(f) function applies the function f to every element in the RDD. Because RDDs are iterable objects (like most Python objects), Spark runs function f on each iteration and returns a new RDD. (We'll go through an example of a map function together so you can get a better sense of how it works.) If you look carefully, you'll see that raw_data is in a format that's hard to work with. While the elements are currently all strings, we'd like to convert each of them into a list to make the data more manageable. To do this the traditional way, we would do the following:

In [8]:
daily_show = raw_data.map(lambda line: line.split('\t'))
daily_show.take(5)

[['YEAR', 'GoogleKnowlege_Occupation', 'Show', 'Group', 'Raw_Guest_List'],
 ['1999', 'actor', '1/11/99', 'Acting', 'Michael J. Fox'],
 ['1999', 'Comedian', '1/12/99', 'Comedy', 'Sandra Bernhard'],
 ['1999', 'television actress', '1/13/99', 'Acting', 'Tracey Ullman'],
 ['1999', 'film actress', '1/14/99', 'Acting', 'Gillian Anderson']]

One of the wonderful features of PySpark is how it can separate our logic — which we prefer to write in Python — from the actual data transformation. In the previous code cell, we wrote this lambda function in Python code like this:

In [9]:
# raw_data.map(lambda line: line.split('\t'))

Even though the function was in Python, we took advantage of Scala when Spark actually ran the code over our RDD.<br/> This is the power of PySpark. Without learning any Scala, we get to harness the data processing performance from Spark's Scala architecture.<br/> Even better, when we ran the following code, it returned the results to us in Python-friendly notation:

In [10]:
# daily_show.take(5)

## Transformations and Actions

There are two types of methods in Spark:
1. Transformations — map(), reduceByKey()
2. Actions — take(), reduce(), saveAsTextFile(), collect()

Transformations are lazy operations that always return a reference to an RDD object.<br/> Spark doesn't actually run the transformations, however, until an action needs to use the RDD resulting from a transformation.<br/> Any function that returns an RDD is a transformation, and any function that returns a value is an action

**Immutability**

So, why couldn't we just split each string in place, instead of creating a new object daily_show? In Python, we could have modified the collection element-by-element in place, without returning and assigning the results to a new object.

RDD objects are immutable, meaning that we can't change their values once we've created them. In Python, list and dictionary objects are mutable (we can change their values), while tuple objects are immutable. The only way to modify a tuple object in Python is to create a new tuple object with the necessary updates.<br/> Spark uses the immutability of RDDs to improve calculation speeds.

### Using Map() and reduceByKey()
We want to tally the number of guests who have appeared on The Daily Show during each year. If daily_show were a list of lists, we could write the following Python code to get this result:

In [12]:
# tally = dict()
# for line in daily_show:
#   year = line[0]
#   if year in tally.keys():
#     tally[year] = tally[year] + 1
#   else:
#     tally[year] = 1

The keys in tally will be the years, and the values will be the totals for the number of lines associated with each year.

To get the same result with Spark, we'll have to use a Map step, then a ReduceByKey step.

In [13]:
tally = daily_show.map(lambda x: (x[0], 1)).reduceByKey(lambda x,y: x+y)
print(tally)

PythonRDD[15] at RDD at PythonRDD.scala:53


You may have noticed that printing tally didn't return the histogram we were hoping for.<br/>
Because of lazy evaluation, PySpark delays executing the map and reduceByKey steps until we actually need them.<br/>
Before we use take() to preview the first few elements in tally.

In [14]:
#daily_show.map(lambda x: (x[0], 1)).reduceByKey(lambda x, y: x+y)

During the map step, we used a lambda function to create a tuple consisting of the following:

* key: x[0] (the first value in the list)
* value: 1 (the integer)

Our high-level strategy was to create a tuple with the key representing the year and the value representing 1. After running the map step, Spark will maintain in memory a list of tuples resembling the following:

In [16]:
# ('YEAR', 1)
# ('1991', 1)
# ('1991', 1)
# ('1991', 1)
# ('1991', 1)

# We'd like to reduce that down to:
# ('YEAR', 1)
# ('1991', 4)

reduceByKey(f) combines tuples with the same key using the function we specify, f.

To see the results of these two steps, we'll use the take command, which forces lazy code to run immediately.<br/>
Because tally is an RDD, we can't use Python's len function to find out how many elements are in the collection.<br/>
Instead, we need to use the RDD count() function.

In [17]:
tally.take(tally.count())

[('YEAR', 1),
 ('2002', 159),
 ('2003', 166),
 ('2004', 164),
 ('2007', 141),
 ('2010', 165),
 ('2011', 163),
 ('2012', 164),
 ('2013', 166),
 ('2014', 163),
 ('2015', 100),
 ('1999', 166),
 ('2000', 169),
 ('2001', 157),
 ('2005', 162),
 ('2006', 161),
 ('2008', 164),
 ('2009', 163)]

### Removing Column Headers

Unlike pandas, Spark knows nothing about column headers, and didn't set them aside. We need a way to remove the element ('YEAR', 1) from our collection. We'll need a workaround, though, because RDD objects are immutable once we create them. The only way to remove that tuple is to create a new RDD object that doesn't have it.

Spark comes with a filter(f) function that creates a new RDD by filtering an existing one for specific criteria. If we specify a function f that returns a binary value, True or False, the resulting RDD will consist of elements where the function evaluated to True.

In [18]:
def filter_year(line):
    if line[0] == 'YEAR':
        return False
    else:
        return True

In [19]:
filtered_daily_show = daily_show.filter(lambda line: filter_year(line))

In [20]:
filtered_daily_show

PythonRDD[19] at RDD at PythonRDD.scala:53

To demonstrate Spark's capability, we'll demonstrate how to chain together a series of data transformations into a pipeline, and then we'll observe Spark managing everything in the background. The developers who wrote Spark specifically designed this functionality and optimized it for running tasks in succession.

Before Spark came along, running many tasks in succession in Hadoop was incredibly time-consuming. Hadoop had to write intermediate results to disk, and it wasn't aware of the full pipeline. Thanks to its aggressive approach to memory use and well-designed core, Spark improves Hadoop's turnaround time significantly.

We'll filter out professions for which the occupation is blank, lowercase each profession, generate a histogram of professions, and output the first five tuples in the histogram.

In [21]:
filtered_daily_show.filter(lambda line: line[1] != '') \
                   .map(lambda line: (line[1].lower(), 1)) \
                   .reduceByKey(lambda x,y: x+y) \
                   .take(5)

[('actor', 596),
 ('film actress', 21),
 ('model', 9),
 ('stand-up comedian', 44),
 ('actress', 271)]