# Spark & MapReduce



## Hadoop:

Hadoop consists of a file system (Hadoop Distributed File System, or HDFS) and its own implementation of the MapReduce paradigm. MapReduce converts computations into Map and Reduce steps that Hadoop can easily distribute over many machines. We'll cover how MapReduce works in greater depth later in this lesson.

Hadoop made it possible to analyze large data sets, but relied heavily on disk storage (rather than memory) for computation. While it's inexpensive to store large volumes of data this way, it makes accessing and processing it much slower.


Hadoop wasn't a great solution for calculations requiring multiple passes over the same data or many intermediate steps, due to the need to write to and read from the disk between each step. This drawback also made Hadoop difficult to use for interactive data analysis, the main task data scientists need to do.


Hadoop also suffered from suboptimal support for the additional libraries many data scientists needed, such as SQL and machine learning implementations. Once the cost of RAM (computer memory) started to drop significantly, augmenting or replacing Hadoop by storing data in-memory quickly emerged as an appealing alternative.




## Spark:

The UC Berkeley AMP Lab spearheaded groundbreaking work to develop Spark, which uses distributed, in-memory data structures to improve speeds for many data processing workloads by several orders of magnitude. 

## PySpark:

While the Spark toolkit is in Scala, a language that compiles down to bytecode for the JVM, the open source community has developed a wonderful toolkit called PySpark that allows us to interface with RDDs in Python. Thanks to a library called Py4J, Python can interface with Java objects (in our case RDDs). Py4J is also one of the tools that makes PySpark work.

In [6]:
# Find path to PySpark.
import findspark
findspark.init()

# Import PySpark and initialize SparkContext object.
import pyspark
sc = pyspark.SparkContext()

# Read `recent-grads.csv` in to an RDD.
f = sc.textFile('data/recent-grads.csv')
data = f.map(lambda line: line.split('\n'))
data.take(10)

ValueError: Couldn't find Spark, make sure SPARK_HOME env is set or Spark is in an expected location (e.g. from homebrew installation).

## RDDs

The core data structure in Spark is a resilient distributed data set (RDD). As the name suggests, an RDD is Spark's representation of a data set that's distributed across the RAM, or memory, of a cluster of many machines. An RDD object is essentially a collection of elements we can use to hold lists of tuples, dictionaries, lists, etc. Similar to a pandas DataFrame, we can load a data set into an RDD, and then run any of the methods accesible to that object.


Spark's RDD implementation also lets us evaluate code "lazily," meaning we can postpone running a calculation until absolutely necessary. eg. Spark created a pointer to the file, but didn't actually read it into raw_data until raw_data.take(5) needed that variable to run its logic. The advantage of "lazy" evaluation is that we can build up a queue of tasks and let Spark optimize the overall workflow in the background. In regular Python, the interpreter can't do much workflow optimization


RDD objects are **immutable**, meaning that we can't change their values once we've created them. In Python, list and dictionary objects are mutable (we can change their values), while tuple objects are immutable. The only way to modify a tuple object in Python is to create a new tuple object with the necessary updates. Spark uses the immutability of RDDs to enhance calculation speeds. T



## Pipelines

Every operation or calculation in Spark is essentially a series of steps that we can chain together and run in succession to form a **pipeline**. Each step in the pipeline returns either a Python value (such as an integer), a Python data structure (such as a dictionary), or an RDD object. 

In the following code cell, we'll filter out actors for whom the profession is blank, lowercase each profession, generate a histogram of professions, and output the first five tuples in the histogram.:


In [None]:
filtered_daily_show.filter(lambda line: line[1] != '') \
                   .map(lambda line: (line[1].lower(), 1)) \
                   .reduceByKey(lambda x,y: x+y) \
                   .take(5))

# Methods
Two types of methods in Spark:

	1. Transformations - map(), reduceByKey()
	2. Actions - take(), reduce(), saveAsTextFile(), collect()


NOTE: Even though Spark simplifies chaining lots of transformations together, it's good practice to use actions to observe the intermediate RDD objects between those transformations. This will let you know whether your transformations are working the way you expect them to.



## Transformations
are lazy operations that always return a reference to an RDD object. Spark doesn't actually run the transformations, though, until an action needs to use the RDD resulting from a transformation. 

Any function that returns an RDD is a transformation, and any function that returns a value is an action. 

In [None]:

raw_hamlet = sc.textFile('hamlet.txt')
#= f.map(lambda line: line.split('\n'))
raw_hamlet.take(5)

## 2. The Map Method ##

split_hamlet = raw_hamlet.map(lambda line: line.split('\t'))

## 4. The FlatMap Method ##

def hamlet_speaks(line):
    id = line[0]
    speaketh = False
    
    if "HAMLET" in line:
        speaketh = True
    
    if speaketh:
        yield id,"hamlet speaketh!"

hamlet_spoken = split_hamlet.flatMap(lambda x: hamlet_speaks(x))
hamlet_spoken.take(10)

## 5. Filter Using a Named Function ##

def filter_hamlet_speaks(line):
    
    if 'HAMLET' in line:
        return True
    else:
        return False

hamlet_spoken_lines = split_hamlet.filter(lambda line: filter_hamlet_speaks(line))
hamlet_spoken_lines.take(5)

## Actions
Spark forces the evaluation of lazy code. If we only chain together transformation methods and print the resulting RDD object, we'll see the type of RDD (e.g. a PythonRDD or PipelinedRDD object), but not the elements within it. That's because the computation hasn't actually happened yet.



In [None]:
spoken_count = 0
spoken_101 = list()
spoken_count = hamlet_spoken_lines.count()
spoken_collect = hamlet_spoken_lines.collect()
spoken_101 = spoken_collect[100]

## SparkContext Object:

the SparkContext object manages the connection to the clusters, and coordinates the running of processes on those clusters. More specifically, it connects to the cluster managers. The cluster managers control the executors that run the computations


In [None]:
raw_data = sc.textFile("daily_show.tsv")

## Print first 5 elements of RDD
The RDD object raw_data closely resembles a list of string objects, with one object for each line in the data set.

We then use the take() method to print the first five elements of the RDD:

In [None]:
raw_data.take(5)

## Map Function:

The map(f) function applies the function f to every element in the RDD. Because RDDs are iterable objects (like most Python objects), Spark runs function f on each iteration and returns a new RDD.
                   


In [None]:
# apply to each line
hamlet_text_only = hamlet_filter_lines.map(lambda line: [item for item in line if item !=''])

# Remove any list items that only contain the pipe character (|), and replace any pipe characters that appear within strings with an empty character.
clean_hamlet = hamlet_text_only.map(lambda x: [word.replace('|', '') for word in x if word != '|'])

# Find out how many elements are in the collection:
print(tally.take(tally.count()))

## ReduceByKey:

reduceByKey(f) combines tuples with the same key using the function we specify, f.

	('YEAR', 1)
	('1991', 1)
	('1991', 1)
	('1991', 1)
	('1991', 1)

We'd like to reduce that down to:

	('YEAR', 1)
	('1991', 4)
	...


In [None]:
daily_show.map(lambda x: (x[0], 1)).reduceByKey(lambda x, y: x+y)

## Filter an RDD:

Spark comes with a filter(f) function that creates a new RDD by filtering an existing one for specific criteria. If we specify a function f that returns a binary value, True or False, the resulting RDD will consist of elements where the function evaluated to True.



In [None]:
filtered_daily_show = daily_show.filter(lambda line: line[0] != 'YEAR')

# or:

hamlet_filter_lines = hamlet_with_ids.filter(lambda line: len(line)>1)


## Spark DataFrames:

Spark DataFrames allow us to interface with different types of data, and ensure that our analysis logic will still work as the data storage mechanisms change. 

Pandas and Spark DataFrames also have different underlying data structures. Pandas DataFrames are built around Series objects, while Spark DataFrames are built around RDDs. 

We can perform most of the same computations and transformations on Spark DataFrames that we can on pandas DataFrames, but the styles and methods are somewhat different.

Other shared df methods include:

	agg()
	join()
	sort()
	where()


In [5]:
# Import SQLContext
from pyspark.sql import SQLContext

# Pass in the SparkContext object `sc`
sqlCtx = SQLContext(sc)

# Read JSON data into a DataFrame object `df`
df = sqlCtx.read.json("data/census_2010.json")

#Display the inferred Schema:
df.printSchema()

#Show n rows:
df.show(5)

# return rows:
row_one = df.head(5)[0]

# Access value for age
row_one.age

# Access the first value
row_one[0]

# Selecting Columns:
df.select('age','males','females').show()
# or
df[['age','males','females']].show()


NameError: name 'sc' is not defined

Print Schema Results in:

	root
	 |-- age: long (nullable = true)
	 |-- females: long (nullable = true)
	 |-- males: long (nullable = true)
	 |-- total: long (nullable = true)
	 |-- year: long (nullable = true)



## Convert to Pandas Dataframe:

convert a Spark DataFrame to a pandas DataFrame using the toPandas() method. Converting an entire Spark DataFrame to a pandas DataFrame works just fine for small data sets. For larger ones, though, we'll want to select a subset of the data that's more manageable for pandas.


In [None]:

pandas_df = df.toPandas()
pandas_df['total'].hist()
plt.show()

## Register a Spark DataFrame as a SQL DB Table:

Call the registerTempTable() method on that DataFrame object. This method requires one string parameter, name, that we use to set the table name for reference in our SQL queries.


In [None]:
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)
df = sqlCtx.read.json("census_2010.json")

df.registerTempTable('census2010')

tables = sqlCtx.tableNames()
print(tables)

## Running SQL Queries

In [None]:
query ='SELECT males, females FROM census2010'
df = sqlCtx.sql(query)

# then get summary stats on the resulting data frame:
df.describe().show()


query = "SELECT age FROM census2010"
sqlCtx.sql(query).show(20)

## 4. Filtering ##

query = 'SELECT males,females FROM census2010 WHERE age > 5 AND age <15'
sqlCtx.sql(query).show()

## 5. Mixing Functionality ##

query ='SELECT males, females FROM census2010'
df = sqlCtx.sql(query)
df.describe().show()


## 6. Multiple tables ##

from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)
df = sqlCtx.read.json("census_2010.json")
df.registerTempTable('census2010')

df = sqlCtx.read.json("census_1980.json")
df.registerTempTable('census1980')

df = sqlCtx.read.json("census_1990.json")
df.registerTempTable('census1990')

df = sqlCtx.read.json("census_2000.json")
df.registerTempTable('census2000')

tables = sqlCtx.tableNames()
print(tables)

## 7. Joins ##

query = 'SELECT census2010.total, census2000.total FROM census2010, census2000 WHERE census2010.age =census2000.age'
sqlCtx.sql(query).show(20)


## 8. SQL Functions ##

query = 'SELECT sum(census2010.total), sum(census2000.total), sum(census1990.total)\
         FROM census2010, census2000, census1990 \
         WHERE census2010.age= census2000.age AND \
         census2000.age = census1990.age'
            
sqlCtx.sql(query).show()

## Basic Example

In [2]:
## 2. Extract Line Numbers ##

raw_hamlet = sc.textFile("hamlet.txt")
split_hamlet = raw_hamlet.map(lambda line: line.split('\t'))
split_hamlet.take(5)

def format_id(line):
    
    new_line = list()
    
    for item in line:
        
        if item == line[0]:
            
            item = item.split("@")[1]

        new_line.append(item)

    return new_line

hamlet_with_ids = split_hamlet.map(lambda line: format_id(line))
hamlet_with_ids.take(5)

## 3. Remove Blank Values ##

hamlet_with_ids.take(10)

hamlet_filter_lines = hamlet_with_ids.filter(lambda line: len(line)>1)
hamlet_filter_lines.take(10)

def remove_empties(line):
    
    new_line = list()
    for item in line:
        if item != "":
            new_line.append(item)
    
    return new_line
                   
hamlet_text_only = hamlet_filter_lines.map(lambda line: [item for item in line if item !=''])
hamlet_text_only.take(10)
#hamlet_with_ids.take(10)

## 4. Remove Pipe Characters ##

def clean_pipes(line):
    
    new_line = []
    
    for item in line:
        
        if item !="|":
            
           item = item.replace("|"," ")
           new_line.append(item)
    
    return new_line

#clean_pipes_hamlet = hamlet_text_only.map(lambda line: clean_pipes(line))
#clean_hamlet = clean_pipes_hamlet.filter(lambda line: len(line)>1)


clean_hamlet = hamlet_text_only.map(lambda x: [word.replace('|', '') for word in x if word != '|'])

