# Step 9: Spark and Map-Reduce (Working With Large Datasets)

__[Markdown Cheatsheet](https://github.com/adam-p/markdown-here/wiki/Markdown-Cheatsheet)__

Goal: learn how to use Apache Spark and the map-reduce technique to clean and analyze large datasets.

    (1). Learn the map-reduce framework for breaking down tasks for many computers to run
    (2). Learn how to use Spark to process and transform larger, raw files
    (3). Explore how Spark SQL and Spark DataFrames make it easy to work with large, structured datasets

1. Introduction to Spark
    * A brief history of big dat
    * How the RDD object works in Sparl
    * The basics of counting in Spark
2. Project: Spark installation and Jupyter Notebook Integration
    * How to install Spark and PySpark
    * How to integrate PySpark with Jupyter Notebook
3. Transformations and Actions
    * How to read TSV files into Spark
    * How to apply lambda functions over RDD objects
4. Challenge: Transforming Hamlet into a Data Set
    * Transforming data from text files into RDD objects
    * Cleaning data using lambda functions
5. Spark DataFrames
    * How to work with Spark dataframes
    * The difference between pandas and Spark dataframes
    * How to perform basic filters with Spark dataframes
6. Spark SQL
    * How to query Spark dataframes using SQL
    * How to work with multuple tables in Spark SQL

## 1. Introduction to Spark

### 1.1 A Brief Hisory of Big Data

There's been a lot of buzz about __big data__ over the last few years, and it's finally become mainstream. Companies like Google and Yahoo! have grown their user bases significantly, and are collecting more information on how people interact with their products. __[Moore's law](https://en.wikipedia.org/wiki/Moore%27s_law)__ and the rapidly __[falling cost of storage](https://www.aei.org/wp-content/uploads/2013/04/storage3_f.jpg)__ have contributed greatly to the big data phenomena.

While software companies got better at collecting massive amounts of data, their ability to analyze and make sense of it didn't keep pace. Because existing technologies couldn't analyze such large quantities of data, companies like Google, Facebook, Yahoo!, and LinkedIn had to build new paradigms and tools that could do the job.

Engineers initially tried using bigger and more powerful computers to process the data, but still ran into limits for many computational problems. Along the way, they developed paradigms like __[MapReduce](https://en.wikipedia.org/wiki/MapReduce)__ that efficiently distribute calculations over hundreds or thousands of computers to calculate the result in parallel. Hadoop is an open source project that quickly became the dominant processing toolkit for big data.

__Hadoop__

Hadoop consists of a file system (Hadoop Distributed File System, or HDFS) and its own implementation of the MapReduce paradigm. MapReduce converts computations into Map and Reduce steps that Hadoop can easily distribute over many machines. We'll cover how MapReduce works in greater depth later in this lesson.

Hadoop made it possible to analyze large data sets, but relied heavily on disk storage (rather than memory) for computation. While it's inexpensive to store large volumes of data this way, it __[makes accessing and processing it much slower](https://www.cnet.com/news/understanding-ram-versus-hard-drive-space-via-an-analogy/)__.

Hadoop wasn't a great solution for calculations requiring multiple passes over the same data or many intermediate steps, due to the need to write to and read from the disk between each step. This drawback also made Hadoop difficult to use for interactive data analysis, the main task data scientists need to do.

Hadoop also suffered from suboptimal support for the additional libraries many data scientists needed, such as SQL and machine learning implementations. Once the cost of RAM (computer memory) started to drop significantly, augmenting or replacing Hadoop by storing data in-memory quickly emerged as an appealing alternative.

### 1.2 The Spark Revolution

The __[UC Berkeley AMP Lab](https://amplab.cs.berkeley.edu/projects/spark-lightning-fast-cluster-computing/)__ spearheaded groundbreaking work to develop Spark, which uses distributed, in-memory data structures to improve speeds for many data processing workloads by several orders of magnitude. If you're interested in learning more, you can read about __[why Spark is a crossover hit for data scientists](http://blog.cloudera.com/blog/2014/03/why-apache-spark-is-a-crossover-hit-for-data-scientists/)__, or check out some of the original papers on the __[Apache Spark homepage](http://spark.apache.org/research.html)__.

### 1.3 Resilient Distributed Data Sets (RDDs)

The core data structure in Spark is a resilient distributed data set (RDD). As the name suggests, an RDD is Spark's representation of a data set that's distributed across the RAM, or memory, of a cluster of many machines. An RDD object is essentially a collection of elements we can use to hold lists of tuples, dictionaries, lists, etc. Similar to a pandas DataFrame, we can load a data set into an RDD, and then run any of the methods accesible to that object.

__PySpark__

While the Spark toolkit is in Scala, a language that compiles down to bytecode for the JVM, the open source community has developed a wonderful toolkit called __[PySpark](https://spark.apache.org/docs/0.9.0/python-programming-guide.html)__ that allows us to interface with RDDs in Python. Thanks to a library called __[Py4J](https://github.com/bartdag/py4j)__, Python can interface with Java objects (in our case RDDs). Py4J is also one of the tools that makes PySpark work.

In this mission, we'll work with a data set containing the names of all of the guests who have appeared on __[The Daily Show](https://en.wikipedia.org/wiki/The_Daily_Show)__.

To start off, we'll load the data set into an RDD. We're using the __TSV__ version of __[FiveThirtyEight's data set](https://github.com/fivethirtyeight/data/tree/master/daily-show-guests)__. TSV files use a tab character (__"\t"__) as the delimiter, instead of the comma (__","__) that CSV files use.

```python
raw_data = sc.textFile("daily_show.tsv")
raw_data.take(5)
```

### 1.4 SparkContext

In Spark, the __SparkContext__ object manages the connection to the clusters, and coordinates the running of processes on those clusters. More specifically, it connects to the cluster managers. The cluster managers control the executors that run the computations. Here's a diagram from the Spark documentation that will help you visualize the architecture:

![alt text](https://spark.apache.org/docs/1.1.0/img/cluster-overview.png)


We automatically have access to the SparkContext object __sc__. We then run the following code to read the TSV data set into an RDD object __raw_data__:

```python
raw_data = sc.textFile("daily_show.tsv")
```

The RDD object __raw_data__ closely resembles a list of string objects, with one object for each line in the data set. We then use the take() method to print the first five elements of the RDD:

```python
raw_data.take(5)
```

To explore the other methods an RDD object has access to, check out the __[PySpark documentation](https://spark.apache.org/docs/1.1.1/api/python/pyspark.rdd.RDD-class.html#take)__. __take(n)__ will return the first n elements of the RDD.


### 1.5 Lazy Evaluation 

* My understanding: 
__Concept reference in Python Design__: iterable -> iterator -> generator (Not computing everything and save it in memory at once). This is what Lazy means.


You may be wondering why, if an RDD resembles a Python list, we don't just use bracket notation to access elements in the RDD.

The answer is that Spark distributes RDD objects across many partitions, and the RDD object is specifically designed to handle distributed data. We can't rely on the standard implementation of a list for these reasons.

Spark offers many advantages over regular Python, though. For example, thanks to RDD __[abstraction](https://en.wikipedia.org/wiki/Abstraction_(computer_science))__, you can run Spark locally on your own computer. Spark will simulate distributing your calculations over many machines by automatically slicing your computer's memory into partitions.

Spark's RDD implementation also lets us evaluate code "lazily," meaning we can __postpone running a calculation__ until absolutely necessary. On the previous screen, Spark waited to load the TSV file into an RDD until __raw_data.take(5)__ executed. When our code called __raw_data = sc.textFile("dail_show.tsv")__, Spark created a pointer to the file, but didn't actually read it into __raw_data__ until __raw_data.take(5)__ needed that variable to run its logic.

The advantage of "lazy" evaluation is that we can build up a queue of tasks and let Spark optimize the overall workflow in the background. In regular Python, the interpreter can't do much workflow optimization. We'll see more examples of lazy evaluation later on.

### 1.6 Pipelines

* My understanding: similar to shell command - (stdin, stdout, stderr) - pipelines.

While Spark borrowed heavily from Hadoop's MapReduce pattern, it's still quite different in many ways. If you have experience with Hadoop and traditional MapReduce, you may want to read this great __[post by Cloudera](http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/)__ about the difference between them. Don't worry if you've never worked with MapReduce or Hadoop before; we'll cover the concepts you need to know in this course.

The key idea to understand when working with Spark is data __pipelining__. Every operation or calculation in Spark is essentially a series of steps that we can chain together and run in succession to form a __pipeline__. Each step in the __pipeline__ returns either a Python value (such as an integer), a Python data structure (such as a dictionary), or an RDD object. We'll start with the __map()__ function.

__Map()__

The __map(f)__ function applies the function f to every element in the RDD. Because RDDs are __iterable objects__ (like most Python objects), Spark runs function f on each iteration and returns a new RDD.

We'll walk through an example of a __map__ function so you can get a better sense of how it works. If you look carefully, you'll see that __raw_data__ is in a format that's hard to work with. While the elements are currently all __strings__, we'd like to convert each of them into a __list__ to make the data more manageable. To do this the __traditional way__, we would:


1. Use a 'for' loop to iterate over the collection
2. Split each `string` on the delimiter
3. Store the result in a `list`

Let's see how we can use __map__ to do this with Spark instead.

In the code cell:


1. Call the RDD function `map()` to specify we want to apply the logic in the parentheses to every line in our data set.
2. Write a lambda function that splits each line using the tab delimiter (\t), and assign the resulting RDD to `daily_show`.
3. Call the RDD function `take()` on `daily_show` to display the first five elements (or rows) of the resulting RDD.

We call the __map(f)__ function a transformation step. It requires either a named or lambda function f.

```python
daily_show = raw_data.map(lambda line: line.split('\t'))
daily_show.take(5)
```



### 1.7 Python and Scala, Friends Forever

One of the wonderful features of PySpark is the ability to separate our logic - which we prefer to write in Python - from the actual data transformation. In the previous code cell, we wrote this lambda function in Python code:

```python
raw_data.map(lambda line: line.split('\t'))
```

Even though the function was in Python, we also took advantage of Scala when Spark actually ran the code over our RDD. __This__ is the power of PySpark. Without learning any Scala, we get to harness the data processing performance gains from Spark's Scala architecture. Even better, when we ran the following code, it returned the results to us in Python-friendly notation:

```python
daily_show.take(5)
```

#### __Transformations and Actions__

There are __two__ types of methods in Spark:


1. Transformations - map(), reduceByKey()
2. Actions - take(), reduce(), saveAsTextFile(), collect()

Transformations are __lazy operations__ that always return a reference to an RDD object. Spark doesn't actually run the transformations, though, until an action needs to use the RDD resulting from a transformation. __Any function that returns an RDD is a transformation, and any function that returns a value is an action__. These concepts will become more clear as we work through this lesson and practice writing PySpark code.

#### __Immutability__

You may be wondering why we couldn't just split each string in place, instead of creating a new object daily_show. In Python, we could have modified the collection element-by-element in place, without returning and assigning the results to a new object.

__RDD objects__ are __immutable__, meaning that we can't change their values once we've created them. In Python, list and dictionary objects are mutable (we can change their values), while tuple objects are immutable. The only way to modify a tuple object in Python is to create a new tuple object with the necessary updates. Spark uses the immutability of RDDs to enhance calculation speeds. The mechanics of how it does this are outside the scope of this lesson.

### 1.8 ReduceByKey()

We'd like to tally up the number of guests who have appeared on The Daily Show during each year. If daily_show were a list of lists, we could write the following Python code to achieve this result:

```python
tally = dict()
for line in daily_show:
  year = line[0]
  if year in tally.keys():
    tally[year] = tally[year] + 1
  else:
    tally[year] = 1
```    

The keys in tally will be the years, and the values will be the totals for the number of lines associated with each year.

To achieve the same result with Spark, we'll have to use a __Map__ step, then a __ReduceByKey__ step.

```python
tally = daily_show.map(lambda x: (x[0], 1)).reduceByKey(lambda x,y: x+y)
print(tally)
```

### 1.9 Explanation

You may have noticed that printing __tally__ didn't return the histogram we were hoping for. Because of __lazy evaluation__, PySpark delayed executing the __map__ and __reduceByKey__ steps until we actually need them. Before we use __take()__ to preview the first few elements in __tally__, we'll walk through the code we just wrote.

```python
daily_show.map(lambda x: (x[0], 1)).reduceByKey(lambda x, y: x+y)
```

During the __map__ step, we used a lambda function to create a tuple consisting of:

* key: x[0] (the first value in the list)
* value: 1 (the integer)

Our high-level strategy was to create a tuple with the key representing the __year__, and the value representing __1__. After running the __map__ step, Spark will maintain in memory a list of tuples resembling the following:

```python
('YEAR', 1)
('1991', 1)
('1991', 1)
('1991', 1)
('1991', 1)
...
```

We'd like to reduce that down to:

```python
('YEAR', 1)
('1991', 4)
...
```
__reduceByKey(f)__ combines tuples with the same key using the function we specify, f.

To see the results of these two steps, we'll use the __take__ command, which __forces lazy code to run immediately__. Because tally is an RDD, we can't use Python's __len__ function to find out how many elements are in the collection. Instead, we'll need to use the RDD __count()__ function.

```python
tally.take(tally.count())
```

### 1.10 Filter

Unlike pandas, Spark knows nothing about column headers, and didn't set them aside. We need a way to remove the element __('YEAR', 1)__ from our collection. We'll need a workaround, though, because RDD objects are immutable once we create them. The only way to remove that tuple is to create a new RDD object that doesn't have it.

Spark comes with a __filter(f)__ function that creates a new RDD by filtering an existing one for specific criteria. If we specify a function f that returns a binary value, __True__ or __False__, the resulting RDD will consist of elements where the function evaluated to True. You can read more about the filter function in the __[Spark documentation](https://spark.apache.org/docs/1.1.1/api/python/pyspark.rdd.RDD-class.html#filter)__.

#### Instructions

* Write a function named filter_year that we can use to filter out the element that begins with the text YEAR, instead of an actual year.

#### Answers

```python
def filter_year(line):
    if line[0] == 'YEAR':
        return False
    else:
        return True

filtered_daily_show = daily_show.filter(lambda line: filter_year(line))

or

filtered_daily_show = daily_show.filter(filter_year) ## no need to use lambda since a user function is already defined.
```



### 1.11 Practice with Pipelines

To flex Spark's muscles, we'll demonstrate how to chain together a series of data transformations into a pipeline, and observe Spark managing everything in the background. The developers who wrote Spark had this functionality in mind, and optimized it for running tasks in succession.

Before Spark came along, running lots of tasks in succession in Hadoop was incredibly time consuming. Hadoop had to write intermediate results to disk, and wasn't aware of the full pipeline. Thanks to its aggressive approach to memory use and well-architected core, Spark improves on Hadoop's turnaround time significantly. If you're curious, you can read more about this topic in a __[Quora thread](https://www.quora.com/What-are-the-advantages-of-DAG-directed-acyclic-graph-execution-of-big-data-algorithms-over-MapReduce-I-know-that-Apache-Spark-Storm-and-Tez-use-the-DAG-execution-model-over-MapReduce-Why-Are-there-any-disadvantages/answer/Tathagata-Das?share=1&srid=umKP)__.

In the following code cell, we'll filter out actors for whom the profession is blank, lowercase each profession, generate a histogram of professions, and output the first five tuples in the histogram.

```python
filtered_daily_show.filter(lambda line: line[1] != '') \
                   .map(lambda line: (line[1].lower(), 1)) \
                   .reduceByKey(lambda x,y: x+y) \
                   .take(5)
```

* Notes: Using backslash "\" to break line into multiple lines.
* line contunation character - 续行符

### 1.12 Next Steps

In this mission, we introduced the MapReduce paradigm, the fundamentals of Spark, and PySpark data transformations. Next, you'll install Spark and PySpark on your own machine.

## 2 Project: Spark Installation and Jupyter Notebook Integration

### 2.1 Intriduction

In the last mission, we introduced the Spark cluster computing framework and explored some basic PySpark methods, all within the Dataquest interface. In this project, we'll walk through how to set up Spark on your own computer and integrate PySpark with Jupyter Notebook. We can use Spark in two modes:

* Local mode - The entire Spark application runs on a single machine. Local mode is what you'll use to prototype Spark code on your own computer. It's also easier to set up.
* Cluster mode - The Spark application runs across multiple machines. Cluster mode is what you'll use when you want to run your Spark application across multiple machines in a cloud environment like Amazon Web Services, Microsoft Azure, or Digital Ocean.

For now, we'll walk through the instructions for installing Spark in local mode on Windows, Mac, and Linux. We'll cover how to install Spark in cluster mode as part of the data engineering track.

Here's a diagram describing the high-level components you'll be setting up today:

![text alt](https://dq-content.s3.amazonaws.com/xgRnU89.png)


### 2.2 Java

Spark runs on the Java Virtual Machine, or JVM for short, which comes in the Java SE Development Kit (JDK for short). We recommend installing Java SE Development Kit version 7 or higher, which you can download from Oracle’s website:

* http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

As of this writing, Java SE Development Kit 8u111 and 8u112 are the two latest releases of the JDK. Any version after JDK 7 works, so you can download any of the versions on this page. Select the appropriate installation file for your operating system.

If you're on Windows or Linux, be sure to choose the correct instruction set architecture (x86 or x64) for your computer. Each computer chip has a specific instruction set architecture that determines the maximum amount of memory it can work with. The two main types are x86 (32 bit) and x64 (64-bit). If you're not sure which one your computer has, you can find out by following __[this guide if you're on Windows](https://support.wdc.com/knowledgebase/answer.aspx?ID=9405)__ or __[this one if you're on Linux](https://www.howtogeek.com/198615/how-to-check-if-your-linux-system-is-32-bit-or-64-bit/)__.

To verify that the installation worked, launch your command line application (__Command Prompt__ for Windows and __Terminal__ for Mac and Linux) and run:

```python
java -version
```

The output should be similar to:

```python
java version "1.7.0_79"
Java(TM) SE Runtime Environment (build 1.7.0_79-b15)
Java HotSpot(TM) 64-Bit Server VM (build 24.79-b02, mixed mode)
```

While the exact numbers probably won't match, the key thing to verify is that the version is larger than 1.7. This number actually represents Version 7. If you're interested, you can read why at __[Oracle's website](http://www.oracle.com/technetwork/java/javase/jdk7-naming-418744.html)__.

If running java -version returned an error or a different version than the one you just installed, your Java JDK installation most likely wasn't added to your PATH properly. Read this __[post](https://community.akamai.com/customers/s/article/Welcome-to-the-Akamai-Community?language=en_US)__ to learn more about how to properly add the Java executable to your __PATH__.

Now that we have the JVM set up, let's move on to Spark.

![text alt](https://dq-content.s3.amazonaws.com/HiuPSEj.png)

### 2.3 Spark

Because you've installed JDK, you could technically download the original source code and build Spark on your computer. Building from the source code is the process of generating an executable program for your machine. It involves __[many steps](https://stackoverflow.com/questions/1622506/programming-definitions-what-exactly-is-building/1622520#1622520)__. While there are some performance benefits to building Spark from source, it takes a while to do, and it's hard to debug if the build fails.

We'll download and work with a pre-built version of Spark instead. Navigate to the __[Spark downloads page](http://spark.apache.org/downloads.html)__ and select the following options:

1. 1.6.2
2. Pre-built for Hadoop 2.6
3. Direct Download

Next, click the link that appears in Step 4 to download Spark as a __.TGZ__ file to your computer. Open your command line application and navigate to the folder you downloaded it to. Unzip the file and move the resulting folder into your home directory. Windows does not have a built in utility that can unzip tgz files - we recommend downloading and using __7-Zip__. Once you have unzipped the file, move the resulting folder into your home directory.

![text alt](https://dq-content.s3.amazonaws.com/82TDOgt.png)

### 2.4 PySpark Shell

In the last mission, you learned that PySpark is a Python library that allows us to interact with Spark objects. The source code for the PySpark library is located in the python/pyspark directory, but the executable version of the library is located in bin/pyspark. To test whether your installation built Spark properly, run the command bin/pyspark to start up the PySpark shell. The output should be similar to this:

![text alt](https://dq-content.s3.amazonaws.com/vgMMYkC.png)

While the output is verbose, you can see that the shell automatically initialized the __SparkContext object__ and assigned it to the variable __sc__.

You don't have to run bin/pyspark from the folder that contains it. Because it's in your home directory, you can use ~/spark-1.6.1-bin-hadoop2.6/bin/pyspark to launch the PySpark shell from other directories on your machine(Note: replace 1.6.1 with 1.6.2 for newer version users). This way, you can switch to the directory that contains the data you want to use, launch the PySpark shell, and read the data in without having to use its full path. The folder you're in when you launch the PySpark shell will be the local context for working with files in Spark.

![text alt](https://dq-content.s3.amazonaws.com/qCuQs4E.png)

### 2.5 Jupyter Notebook

You can make your Jupyter Notebook application aware of Spark in a few different ways. One is to create a configuration file and launch Jupyter Notebook with that configuration. Another is to import PySpark at runtime. We'll focus on the latter approach, so you won't have to restart Jupyter Notebook each time you want to use Spark.

First, you'll need to copy the full path to the pre-built Spark folder and set it as a shell environment variable. This way, you can specify Spark's location a single time, and every Python program you write will have access to it. If you move the Spark folder, you can change the path specification once and your code will work just fine.

#### Mac / Linux

* Use nano or another text editor to open your shell environment's configuration file. If you're using the default Terminal application, the file should be in ~/.bash_profile . If you're using ZSH instead, your configuration file will be in ~/.zshrc.

* Add the following line to the end of the file, replacing {full path to Spark} with the actual path to Spark:

```python
export SPARK_HOME="{full path to Spark, eg /users/home/jeff/spark-2.0.1-bin-hadoop2.7/}"
```

* Exit the text editor and run either source ~/.bash_profile or source ~/.zshrc so the shell reads in and applies the update you made.

#### Windows

* If you've never added environment variables, read __[this tutorial](https://www.pythoncentral.io/add-python-to-path-python-is-not-recognized-as-an-internal-or-external-command/)__ before you proceed.
* Set the SPARK_HOME environment variable to the full path of the Spark folder (e.g. c:/Users/Jeff/spark-2.0.1-bin-hadoop2.7/).

Next, let's install the __[findspark](https://github.com/minrk/findspark)__ Python library, which looks up the location of PySpark using the environment variable we just set. Use pip to install the findspark library:

```python
pip install findspark
```

Now that we've set up all of the tools we need, let's test the installation!

### 2.6 Testing your Installation

Download __[recent-grads.csv](https://raw.githubusercontent.com/fivethirtyeight/data/master/college-majors/recent-grads.csv)__ to your computer and use the command line to navigate to its location. Start Jupyter Notebook, create a new notebook, and run the following code to test your installation:

```python
* # Find path to PySpark.
import findspark
findspark.init()

* # Import PySpark and initialize SparkContext object.
import pyspark
sc = pyspark.SparkContext()

* # Read `recent-grads.csv` in to an RDD.
f = sc.textFile('recent-grads.csv')
data = f.map(lambda line: line.split('\n'))
```

If you don't get any errors and can see the first 10 lines of __recent-grads.csv__, then you're good to go! You can use Google, StackOverflow, or the members-only Slack community to get help if you need it.

![text alt](https://dq-content.s3.amazonaws.com/3Ws6xgo.png)

## 3 Transformations and Actions

### 3.1 Introduction to the Data

In a previous lesson, we touched briefly on transformations and actions, and how these two methods affect the execution of code. In this lesson, we'll dive deeper into how those mechanisms work, and explore a wider range of the functions built into __the [Spark core](http://spark.apache.org/docs/latest/api/python/pyspark.html)__.

The file __hamlet.txt__ contains the entire text of __[Shakespeare's play Hamlet](https://en.wikipedia.org/wiki/Hamlet)__. Shakespeare is well-known for his unique writing style and arguably one of the most influential writers in history. Hamlet is one of his most popular plays.

Let's perform some text analysis on it. The file is in pure text format, though, and not ready for analysis. Before we can proceed, we'll have to clean up and reformat the data.

#### Instructions:

* Read the text file into an RDD named raw_hamlet using the textFile() method from SparkContext (this object instantiates to sc on our end).
* Display the first five elements of the RDD.

#### Answers:

```python
raw_hamlet = sc.textFile('hamlet.txt')
raw_hamlet.take(5)
print(type(raw_hamlet))
print(type(raw_hamlet.take(5)))
print(raw_hamlet)
print(raw_hamlet.take(5))
```

output:
```python
<class 'pyspark.rdd.RDD'>
<class 'list'>
hamlet.txt MapPartitionsRDD[17] at textFile at NativeMethodAccessorImpl.java:-2
['hamlet@0\t\tHAMLET', 'hamlet@8', 'hamlet@9', 'hamlet@10\t\tDRAMATIS PERSONAE', 'hamlet@29']
```


### 3.2 The Map Method

The text file uses the tab character (\t) as a delimiter. We'll need to split the file on the tab delimiter and convert the results into an RDD that's more manageable.

#### Instructions:

Use the __map__ method to convert:

```python
 ['hamlet@0\t\tHAMLET',
    'hamlet@8',
    'hamlet@9',
    'hamlet@10\t\tDRAMATIS PERSONAE',
    'hamlet@29']
```

to 

```python
 [['hamlet@0', '', 'HAMLET'],
     ['hamlet@8'],
     ['hamlet@9'],
     ['hamlet@10', '', 'DRAMATIS PERSONAE'],
     ['hamlet@29']]
```
Name the resulting RDD split_hamlet.

#### Answers:

```python
split_hamlet = raw_hamlet.map(lambda line: line.split('\t'))
split_hamlet.take(5)
```

output:
```python
[['hamlet@0', '', 'HAMLET'],
 ['hamlet@8'],
 ['hamlet@9'],
 ['hamlet@10', '', 'DRAMATIS PERSONAE'],
 ['hamlet@29']]
 ```
 

### 3.3 Beyond Lambda Functions

* My understanding: Generators in Python 3.XX
* iterable object --> \__iter\__
* iterator object --> \__next\__ & StopIteration Exception
* iter(iterable) ----> iterator

Lambda functions are great for writing quick functions we can pass into PySpark methods with simple logic. They __fall short__ when we need to write more customized logic, though. Thankfully, PySpark lets us __define a function in Python first__, then pass it in. Any function that returns a sequence of data in PySpark (versus a guaranteed Boolean value, like __filter()__ requires) must use a __yield__ statement to specify the values that should be pulled later.

If you're unfamiliar with the __yield__ statement in Python, read this excellent __[Stack Overflow answer](https://stackoverflow.com/questions/231767/what-does-the-yield-keyword-do/231855#231855)__ on the topic. To summarize, __yield__ is a Python technique that allows the interpreter to generate data on the fly and pull it when necessary, instead of storing it to memory immediately. Because of its unique architecture, Spark takes advantage of this technique to reduce overhead and improve the speed of computations.

Spark runs the named function on every element in the RDD and restricts it in scope. Each instance of the function only has access to the object(s) you pass into the function, and the Python libraries available in your environment. If you try to refer to variables outside the scope of the function or import libraries, those actions may cause the computation to crash. That's because Spark compiles the function's code to Java to run on the RDD objects (which are also in Java).

Finally, not all functions require us to use __yield__; only the ones that generate a custom sequence of data do. For __map()__ or __filter()__, we use return to __return__ a value for every single element in the RDD we're running the functions on.

### 3.4 The FlatMap Method

In the following code cell, we'll use the __flatMap()__ method with the named function __hamlet_speaks__ to check whether a line in the play contains the text __HAMLET__ in all caps (indicating that Hamlet spoke). __flatMap()__ is different than __map()__ because it doesn't require an output for every element in the RDD. The __flatMap()__ method is useful whenever we want to generate a sequence of values from an RDD.

In this case, we want an RDD object that contains tuples of the unique line IDs and the text "hamlet speaketh!," but __only for the elements in the RDD that have "HAMLET" in one of the values__. We can't use the __map()__ method for this because it requires a return value for every element in the RDD.

We want each element in the resulting RDD to have the following format:

1. The first value should be the unique line ID (e.g.'hamlet@0') , which is the first value in each of the elements in the split_hamlet RDD.

2. The second value should be the string "hamlet speaketh!"

```python
def hamlet_speaks(line):
    id = line[0]
    speaketh = False
    
    if "HAMLET" in line:
        speaketh = True
    
    if speaketh:
        yield id,"hamlet speaketh!"

hamlet_spoken = split_hamlet.flatMap(lambda x: hamlet_speaks(x))
hamlet_spoken.take(10)
```

output:
```python
[('hamlet@0', 'hamlet speaketh!'),
 ('hamlet@75', 'hamlet speaketh!'),
 ('hamlet@1004', 'hamlet speaketh!'),
 ('hamlet@9144', 'hamlet speaketh!'),
 ('hamlet@12313', 'hamlet speaketh!'),
 ('hamlet@12434', 'hamlet speaketh!'),
 ('hamlet@12760', 'hamlet speaketh!'),
 ('hamlet@12858', 'hamlet speaketh!'),
 ('hamlet@14821', 'hamlet speaketh!'),
 ('hamlet@15261', 'hamlet speaketh!')]
 ```

### 3.5 Filter Using a Named Function

__hamlet_spoken__ now contains the line numbers for the lines where Hamlet spoke. While this is handy, we don't have the full line anymore. Instead, let's use a __filter()__ with a named function to extract the original lines where Hamlet spoke. The functions we pass into __filter()__ must return values, which will be either __True__ or __False__.

#### Instructions:
* Write a named function __filter_hamlet_speaks__ to pass into __filter()__. Apply it to __split_hamlet__ to return an RDD with the elements containing the word __HAMLET__.
    * Assign the resulting RDD to __hamlet_spoken_lines__.
    
    
#### Answers:

```python
def filter_hamlet_speaks(line):
    if "HAMLET" in line:
        return True
    else:
        return False
    
hamlet_spoken_lines = split_hamlet.filter(lambda line: filter_hamlet_speaks(line))
hamlet_spoken_lines.take(5)
```

output:
```python
[['hamlet@0', '', 'HAMLET'],
 ['hamlet@75', 'HAMLET', 'son to the late, and nephew to the present king.'],
 ['hamlet@1004', '', 'HAMLET'],
 ['hamlet@9144', '', 'HAMLET'],
 ['hamlet@12313',
  'HAMLET',
  '[Aside]  A little more than kin, and less than kind.']]
```

### 3.6 Actions

As we've discussed before, Spark has two kinds of methods, transformations and actions. While we've explored some of the transformations, we haven't used any actions other than __take()__.

Whenever we use an action method, Spark forces the evaluation of lazy code. If we only chain together transformation methods and print the resulting RDD object, we'll see the type of RDD (e.g. a PythonRDD or PipelinedRDD object), but not the elements within it. That's because the computation hasn't actually happened yet.

Even though Spark simplifies chaining lots of transformations together, it's good practice to use actions to observe the intermediate RDD objects between those transformations. This will let you know whether your transformations are working the way you expect them to.

__Count()__

The __count()__ method returns the number of elements in an RDD. __count()__ is useful when we want to make sure the result of a transformation contains the right number of elements. For example, if we know there should be an element in the resulting RDD for every element in the initial RDD, we can compare the counts of both to ensure they match.

To get the number of elements in the RDD __hamlet_spoken_lines__, run __.count()__ on it:

```python
hamlet_spoken_lines.count()
```

__Collect()__

We've used __take()__ to preview the first few elements of an RDD, similar to the way we've use __head()__ in pandas. But what about returning all of the elements in a collection? We need to do this to write an RDD to a CSV, for example. It's also useful for running some basic Python code over a collection without going through PySpark.

Running __.collect()__ on an RDD returns a list representation of it. To get a list of all the elements in __hamlet_spoken_lines__, for example, we would write:

```python
hamlet_spoken_lines.collect()
```

#### Instructions:
* Compute the number of elements in hamlet_spoken_lines, and assign the result to the variable named spoken_count.
* Grab the 101st element in hamlet_spoken_lines (which has the list index 100), and assign that list to spoken_101.

#### Answers:

```python
spoken_count = 0
spoken_101 = list()

spoken_count = hamlet_spoken_lines.count()
spoken_101 = hamlet_spoken_lines.collect()[100]
```

output:
```python

spoken_101list (<class 'list'>)
['hamlet@58478', 'HAMLET', 'A goodly one; in which there are many confines,']

spoken_countint (<class 'int'>)
381

hamlet_spoken_linesPipelinedRDD (<class 'pyspark.rdd.PipelinedRDD'>)
PythonRDD[3] at collect at <ipython-input-1-a6aa1cad19dc>:5
```

### 3.7 Next Steps

While we've done some initial cleanup of the Hamlet data set, we hope you have a better idea of how to use PySpark to transform it into a format that's better for data analysis. We also learned how to use actions to explore an RDD before chaining another transformation to it.

The next mission is a challenge that will test your understanding of Spark, transformations, actions, lambda functions, and the MapReduce paradigm in general. After that challenge, we'll explore Spark DataFrames and how to analyze data using all the techniques we've learned.

If you'd like to learn how to install PySpark and integrate it with IPython Notebook, __[this wonderful blog post](https://ramhiser.com/2015/02/01/configuring-ipython-notebook-support-for-pyspark/)__ will walk you through the steps. As always, if you have feedback on this mission or Dataquest in general, please reach out to us at hello@dataquest.io. If you're not in our Slack community already, head over to https://www.dataquest.io/chat.

## 4 Challenge: Transforming Halmet into a Dataset

### 4.1 Introduction

In the previous two missions, we covered the basics of PySpark, the MapReduce paradigm, transformations and actions, and how to do basic data cleanup in PySpark. In this challenge, you'll use the techniques you've learned to transform the text of Hamlet into a format that's more useful for data analysis.

__Resources__

* __[PySpark's documentation for the RDD data structure](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD)__
* __[Visual representation of methods (IPython Notebook format)](http://nbviewer.jupyter.org/github/jkthompson/pyspark-pictures/blob/master/pyspark-pictures.ipynb)__
* __[Visual representation of methods (PDF format)](https://training.databricks.com/visualapi.pdf)__


### 4.2 Extract Line Numbers

The first value in each element (or line from the play) is a line number that identifies the line of the play the text is from. It appears in the following format:

```python
'hamlet@0'
'hamlet@8',
'hamlet@9',
...
```

We don't need the __hamlet@__ at the beginning of these IDs for our data analysis. Let's extract just the integer part of the ID from each line, which is much more useful.

#### Instructions:
Transform the RDD __split_hamlet__ into a new RDD __hamlet_with_ids__ that contains the clean version of the line ID for each element.

* For example, we want to transform __hamlet@0__ to 0, and leave the rest of the values in that element untouched.
    * Recall that the __map()__ function will run on each element in the RDD, where each element is a list that we can access using regular Python mechanics.
    
#### Answers:
```python
raw_hamlet = sc.textFile("hamlet.txt")
split_hamlet = raw_hamlet.map(lambda line: line.split('\t'))
split_hamlet.take(5)
def format_id(x):
    id = x[0].split('@')[1]
    results = list()
    results.append(id)
    if len(x) > 1:
        for y in x[1:]:
            results.append(y)
    return results

hamlet_with_ids = split_hamlet.map(lambda line: format_id(line))
hamlet_with_ids.take(10)
```

#### My_Answer:
```python
raw_hamlet = sc.textFile("hamlet.txt")
split_hamlet = raw_hamlet.map(lambda line: line.split('\t'))
split_hamlet.take(5)

def remove_id_prefix(line):
    line[0]=line[0].replace('hamlet@', '')
    return line

hamlet_with_ids = split_hamlet.map(remove_id_prefix)
hamlet_with_ids.take(10)
```

output:
```python
[['0', '', 'HAMLET'],
 ['8'],
 ['9'],
 ['10', '', 'DRAMATIS PERSONAE'],
 ['29'],
 ['30'],
 ['31', 'CLAUDIUS', 'king of Denmark. (KING CLAUDIUS:)'],
 ['74'],
 ['75', 'HAMLET', 'son to the late, and nephew to the present king.'],
 ['131']]
```

### 4.3 Remove Blank Values

Next, we want to get rid of elements that don't contain any actual words (and just have an ID as the first value). These typically represent blank lines between paragraphs or sections in the play. We also want to remove any blank values (__''__) within elements, which don't contain any useful information for our analysis.

#### Instructions
* Clean up the RDD and store the result as a new RDD hamlet_text_only.

#### Answers:

```python
hamlet_with_ids.take(5)
real_text = hamlet_with_ids.filter(lambda line: len(line) > 1)
hamlet_text_only = real_text.map(lambda line: [l for l in line if l != ''])
hamlet_text_only.take(10)
```

#### My_Answers:
```python
hamlet_with_ids.take(5)

def clean_up(line):
    line = [i for i in line if i!='']
    return line

hamlet_text_only = hamlet_with_ids.filter(lambda line: len(line)>1).map(lambda line: clean_up(line))
hamlet_text_only.take(10)
```

Output:

```python
[['0', 'HAMLET'],
 ['10', 'DRAMATIS PERSONAE'],
 ['31', 'CLAUDIUS', 'king of Denmark. (KING CLAUDIUS:)'],
 ['75', 'HAMLET', 'son to the late, and nephew to the present king.'],
 ['132', 'POLONIUS', 'lord chamberlain. (LORD POLONIUS:)'],
 ['177', 'HORATIO', 'friend to Hamlet.'],
 ['204', 'LAERTES', 'son to Polonius.'],
 ['230', 'LUCIANUS', 'nephew to the king.'],
 ['261', 'VOLTIMAND', '|'],
 ['273', '|']]
```

### 4.4 Remove Pipe Characters

If you've been using __take()__ to preview the RDD after each task, you may have noticed there are some pipe characters (__|__) in odd places that add no value for us. The pipe character may appear as a standalone value in an element, or as part of an otherwise useful string value.

#### Instructions:
* Remove any list items that only contain the pipe character (__|__), and replace any pipe characters that appear within strings with an empty character.
    * Assign the resulting RDD to __clean_hamlet__.
    
#### Answers:

```python
hamlet_text_only.take(10)
def fix_pipe(line):
    results = list()
    for l in line:
        if l == "|":
            pass
        elif "|" in l:
            fmtd = l.replace("|", "")
            results.append(fmtd)
        else:
            results.append(l)
    return results

clean_hamlet = hamlet_text_only.map(lambda line: fix_pipe(line))
```

#### My_Answers:

```python
hamlet_text_only.take(10)

def fix_pipe(line):
    results = []
    for item in line:
        if item == "|":
            pass
        elif "|" in item:
            item = item.replace('|', '')
            results.append(item)
        else:
            results.append(item)
    return results
    
clean_hamlet = hamlet_text_only.map(lambda line: fix_pipe(line))
clean_hamlet.take(10)
```

Outputs:

```python
[['0', 'HAMLET'],
 ['10', 'DRAMATIS PERSONAE'],
 ['31', 'CLAUDIUS', 'king of Denmark. (KING CLAUDIUS:)'],
 ['75', 'HAMLET', 'son to the late, and nephew to the present king.'],
 ['132', 'POLONIUS', 'lord chamberlain. (LORD POLONIUS:)'],
 ['177', 'HORATIO', 'friend to Hamlet.'],
 ['204', 'LAERTES', 'son to Polonius.'],
 ['230', 'LUCIANUS', 'nephew to the king.'],
 ['261', 'VOLTIMAND'],
 ['273']]
```

## 5 Spark DataFrames

### 5.1 The Spark DataFrame: An Introduction

The Spark DataFrame is a feature that allows you to create and work with DataFrame objects. As you may have guessed, pandas inspired it.

Spark is well known for its ability to __[process large data sets](https://opensource.com/business/15/1/apache-spark-new-world-record)__. Spark DataFrames combine the scale and speed of Spark with the familiar query, filter, and analysis capabilities of pandas. Unlike pandas, which can only run on one computer, Spark can use distributed memory (and disk when necessary) to handle larger data sets and run computations more quickly.

Spark DataFrames allow us to modify and reuse our existing pandas code to scale up to much larger data sets. They also have better support for various data formats. We can even use a SQL interface to write distributed SQL queries that query large database systems and other data stores.

For this mission, we'll be working with a JSON file containing data from the 2010 U.S. Census. It has the following columns:

* age - Age (year)
* females - Number of females
* males - Number of males
* total - Total number of individuals
* year - Year column (2010 for all rows)

Let's open and explore the data set before we dive into Spark DataFrames.

#### Instructions:

* Print the first four lines of census_2010.json.

#### Answers:
```python
f = open('census_2010.json')

for i in range(0,4):
    print(f.readline())
```

Outputs:
```python
{"females": 1994141, "total": 4079669, "males": 2085528, "age": 0, "year": 2010}
{"females": 1997991, "total": 4085341, "males": 2087350, "age": 1, "year": 2010}
{"females": 2000746, "total": 4089295, "males": 2088549, "age": 2, "year": 2010}
{"females": 2002756, "total": 4092221, "males": 2089465, "age": 3, "year": 2010}
```


In [24]:
import json

## json.load

with open('census_2010.json') as f: # read the whole file into one string
    text1 = f.read()
    
with open('census_2010.json') as f: # read the whole file into one string, then split the string into a list by '\n'
    text2 = f.read().split('\n')
    
results=[]
for row in text2:
    if row: # in case there are blank rows, which json.loads can't deal with and would raise an error
        data=json.loads(row) # json.load(file); json.loads(string); indent argument to make the output looks much better.
        results.append(data)
    


### 5.2 Reading in Data

In previous missions, we explored reading data into an RDD object. Recall that an RDD is essentially a list of tuples with no enforced schema or structure of any kind. An RDD can have a variable number of elements in each tuple, and combinations of types between tuples.

RDDs are useful for __representing unstructured data__ like text. Without them, we'd need to write a lot of custom Python code to interact with such data.

We use the __SparkContext__ object to read data into an RDD:

```python
raw_data = sc.textFile(\"daily_show.tsv\")
daily_show = raw_data.map(lambda line: line.split('\t'))
```

To use the familar DataFrame query interface from pandas, however, the data representation needs to include rows, columns, and types. Spark's implementation of DataFrames mirrors the pandas implementation, with logic for rows and columns.

The __Spark SQL class__ is very powerful. It gives Spark more information about the data structure we're using and the computations we want to perform. Spark uses that information to optimize processes.

To take advantage of these features, we'll have to use the __SQLContext__ object to structure external data as a DataFrame, instead of the SparkContext object.

We can query Spark DataFrame objects with SQL, which we'll explore in the next mission. The SQLContext class gets its name from this capability.

This class allows us to read in data and create new DataFrames from a wide range of __sources__. It can do this because it takes advantage of Spark's powerful __[Data Sources API](https://databricks.com/blog/2015/01/09/spark-sql-data-sources-api-unified-data-access-for-the-spark-platform.html)__.


__File Formats__

* JSON, CSV/TSV, XML
* Parquet, Amazon S3 (cloud storage service)

__Big Data Systems__

* Hive, Avro, HBase

__SQL Database Systems__

* MySQL, PostgreSQL

Data science organizations often use a wide range of systems to collect and store data, and they're constantly making changes to those systems. Spark DataFrames allow us to interface with different types of data, and ensure that our analysis logic will still work as the data storage mechanisms change.

Now that you've learned a bit about Spark DataFrames, let's read in __census_2010.json__. This data set contains valid JSON on each line, which is what Spark needs in order to read the data in properly.

In the following code cell, we:

* Import __SQLContext__ from the __pyspark.sql__ library
* __[Instantiate the SQLContext object](https://spark.apache.org/docs/1.5.0/api/python/pyspark.sql.html#pyspark.sql.SQLContext)__ (which requires the SparkContext object (__sc__) as a parameter), and assign it to the variable __sqlCtx__
* Use the SQLContext method __read.json()__ to read the JSON data set into a Spark DataFrame object named __df__
* Print __df__'s data type to confirm that we successfully read it in as a Spark DataFrame



scripts:
```python
# Import SQLContext
from pyspark.sql import SQLContext

# Pass in the SparkContext object `sc`
sqlCtx = SQLContext(sc)

# Read JSON data into a DataFrame object `df`
df = sqlCtx.read.json("census_2010.json")

# Print the type
print(type(df))
```

outputs:
```python
 SQLContext type (<class 'type'>)
    pyspark.sql.context.SQLContext
 type type (<class 'type'>)
    type
 sqlCtx SQLContext (<class 'pyspark.sql.context.SQLContext'>)
    <pyspark.sql.context.SQLContext at 0x7fe6be0b4390>
 df DataFrame (<class 'pyspark.sql.dataframe.DataFrame'>)
    DataFrame[age: bigint, females: bigint, males: bigint, total: bigint, year: bigint]
```



### 5.3 Schema

When we read data into the SQLContext object, Spark:

* Instantiates a Spark DataFrame object
* Infers the schema from the data and associates it with the DataFrame
* Reads in the data and distributes it across clusters (if multiple clusters are available)
* Returns the DataFrame object

We expect the DataFrame Spark created to have the following columns, which were the keys in the JSON data set:

* age
* females
* males
* total
* year

Spark has its own type system that's similar to the pandas type system. To create a DataFrame, Spark iterates over the data set __twice__ - __once__ to extract the structure of the columns, and __once__ to infer each column's type. Let's use one of the Spark DataFrame instance methods to display the schema for the DataFrame we're working with.

#### Instructions
* Call the __[printSchema() method](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.printSchema)__ on the Spark DataFrame df to display the schema that Spark inferred.

#### Answers:

```python
sqlCtx = SQLContext(sc)
df = sqlCtx.read.json("census_2010.json")
df.printSchema()
```

output:

```python
root
 |-- age: long (nullable = true)
 |-- females: long (nullable = true)
 |-- males: long (nullable = true)
 |-- total: long (nullable = true)
 |-- year: long (nullable = true)


df DataFrame (<class 'pyspark.sql.dataframe.DataFrame'>)
    DataFrame[age: bigint, females: bigint, males: bigint, total: bigint, year: bigint]
SQLContext type (<class 'type'>)
    pyspark.sql.context.SQLContext
sqlCtx SQLContext (<class 'pyspark.sql.context.SQLContext'>)
    <pyspark.sql.context.SQLContext at 0x7f4c15924ef0>
```


### 5.4 Pandas vs Spark DataFrames

As we mentioned before, the pandas DataFrame heavily influenced the Spark DataFrame implementation. Here are some of the methods we can find in both:

* agg()
* join()
* sort()
* where()

Unlike pandas DataFrames, however, Spark DataFrames are __immutable__, which means we can't modify existing objects. Most transformations on an object return a new DataFrame reflecting the changes instead. As we discussed in previous missions, Spark's creators __deliberately__ designed immutability into Spark to make it __easier__ to work with __distributed__ data structures.

Pandas and Spark DataFrames also have different underlying data structures. Pandas DataFrames are __built around Series__ objects, while Spark DataFrames are __built around RDDs__. We can perform most of the same computations and transformations on Spark DataFrames that we can on pandas DataFrames, but the styles and methods are somewhat different. We'll explore how to perform common pandas functions with Spark in this mission.

#### Instructions
* Use the __[show() method](https://spark.apache.org/docs/1.5.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.show)__ to print the first five rows of the DataFrame.

#### Answers:

```python
df
df.show(5)
```

Output:
```python
df DataFrame (<class 'pyspark.sql.dataframe.DataFrame'>)
    DataFrame[age: bigint, females: bigint, males: bigint, total: bigint, year: bigint]

+---+-------+-------+-------+----+
|age|females|  males|  total|year|
+---+-------+-------+-------+----+
|  0|1994141|2085528|4079669|2010|
|  1|1997991|2087350|4085341|2010|
|  2|2000746|2088549|4089295|2010|
|  3|2002756|2089465|4092221|2010|
|  4|2004366|2090436|4094802|2010|
+---+-------+-------+-------+----+
only showing top 5 rows
```

### 5.5 Row Objects

In pandas, we used the __head()__ method to return the first n rows. This is one of the differences between the DataFrame implementations. Instead of returning a nicely formatted table of values, the head() method in Spark returns a list of __[row](https://spark.apache.org/docs/1.5.0/api/python/pyspark.sql.html#pyspark.sql.Row)__ objects. Spark needs to return __row__ objects for certain methods, such as __head()__, __collect()__ and __take()__.

You can access a row's attributes by the column name using dot notation, and by position using bracket notation with an index:

```python
row_one = df.head(5)[0]
# Access value for age
row_one.age
# Access the first value
row_one[0]
```

#### Instructions:
* Use the __head()__ method to return the first five rows in the DataFrame as a list of __row__ objects, and assign the result to the variable __first_five__.
* Print the __age__ value for each row object in __first_five__.

#### Answers:

```python
first_five = df.head(5)
for r in first_five:
    print(r.age)
```


Outputs:
```python
first_five list (<class 'list'>)

[Row(age=0, females=1994141, males=2085528, total=4079669, year=2010),
 Row(age=1, females=1997991, males=2087350, total=4085341, year=2010),
 Row(age=2, females=2000746, males=2088549, total=4089295, year=2010),
 Row(age=3, females=2002756, males=2089465, total=4092221, year=2010),
 Row(age=4, females=2004366, males=2090436, total=4094802, year=2010)]

0
1
2
3
4
```


### 5.6 Selecting Columns

In pandas, we passed a string into a single pair of brackets ([]) to select an individual column, and passed in a list to select multiple columns:

```python
# Pandas DataFrame
df['age']
df[['age', 'males']]
```

We can still use bracket notation in Spark. We'll need to pass in a __list of string objects__, though, even when we're only selecting one column.

Spark takes advantage of lazy loading with DataFrames, and will only display the results of an operation when we call the __show() method__. Instead of using bracket notation, we can also use the __select() method__ to select columns:


```python
# Spark DataFrame
df.select('age')
df.select('age', 'males')
```

In the following code cell, we demonstrate how to select and display the age column. Use what you've learned to take this a step farther and select multiple columns.

#### Instructions:
* Select the age, males, and females columns from the DataFrame and display them using the show() method.

#### Answers:

```python
df[['age']].show()
df[['age', 'males', 'females']].show()
df.select('age', 'males', 'females').show()
```

outputs:
```python
+---+-------+-------+
|age|  males|females|
+---+-------+-------+
|  0|2085528|1994141|
|  1|2087350|1997991|
|  2|2088549|2000746|
|  3|2089465|2002756|
|  4|2090436|2004366|
|  5|2091803|2005925|
|  6|2093905|2007781|
|  7|2097080|2010281|
|  8|2101670|2013771|
|  9|2108014|2018603|
| 10|2114217|2023289|
| 11|2118390|2026352|
| 12|2132030|2037286|
| 13|2159943|2060100|
| 14|2195773|2089651|
| 15|2229339|2117689|
| 16|2263862|2146942|
| 17|2285295|2165852|
| 18|2285990|2168175|
| 19|2272689|2159571|
+---+-------+-------+
only showing top 20 rows
```


### 5.7 Filtering Rows

In pandas, we used Boolean filtering to select only the rows we were interested in. Spark preserves the very same functionality and notation.

#### Instructions:

* Use the pandas notation for Boolean filtering to select the rows where age is greater than five.
* Assign the resulting DataFrame to the variable five_plus.
* Use the show() method to display five_plus.

#### Answers:

```python
five_plus = df[df['age'] > 5]
five_plus.show()
```

output:
```python
+---+-------+-------+-------+----+
|age|females|  males|  total|year|
+---+-------+-------+-------+----+
|  6|2007781|2093905|4101686|2010|
|  7|2010281|2097080|4107361|2010|
|  8|2013771|2101670|4115441|2010|
|  9|2018603|2108014|4126617|2010|
| 10|2023289|2114217|4137506|2010|
| 11|2026352|2118390|4144742|2010|
| 12|2037286|2132030|4169316|2010|
| 13|2060100|2159943|4220043|2010|
| 14|2089651|2195773|4285424|2010|
| 15|2117689|2229339|4347028|2010|
| 16|2146942|2263862|4410804|2010|
| 17|2165852|2285295|4451147|2010|
| 18|2168175|2285990|4454165|2010|
| 19|2159571|2272689|4432260|2010|
| 20|2151448|2259690|4411138|2010|
| 21|2140926|2244039|4384965|2010|
| 22|2133510|2229168|4362678|2010|
| 23|2132897|2218195|4351092|2010|
| 24|2135789|2208905|4344694|2010|
| 25|2136497|2197148|4333645|2010|
+---+-------+-------+-------+----+
only showing top 20 rows
```

### 5.8 Using Column Comparisons as Filters

We can compare the columns in Spark DataFrames with each other, and use the comparison criteria as a filter. For example, to get the rows where the population of males execeeded females in 2010, we'd write the same notation that we would use in pandas.

#### Instructions

* Find all of the rows where females is less than males, and use show() to display the first 20 results.

#### Answers:

```python
df[df['females'] < df['males']].show()
```

outputs:

```python
+---+-------+-------+-------+----+
|age|females|  males|  total|year|
+---+-------+-------+-------+----+
|  0|1994141|2085528|4079669|2010|
|  1|1997991|2087350|4085341|2010|
|  2|2000746|2088549|4089295|2010|
|  3|2002756|2089465|4092221|2010|
|  4|2004366|2090436|4094802|2010|
|  5|2005925|2091803|4097728|2010|
|  6|2007781|2093905|4101686|2010|
|  7|2010281|2097080|4107361|2010|
|  8|2013771|2101670|4115441|2010|
|  9|2018603|2108014|4126617|2010|
| 10|2023289|2114217|4137506|2010|
| 11|2026352|2118390|4144742|2010|
| 12|2037286|2132030|4169316|2010|
| 13|2060100|2159943|4220043|2010|
| 14|2089651|2195773|4285424|2010|
| 15|2117689|2229339|4347028|2010|
| 16|2146942|2263862|4410804|2010|
| 17|2165852|2285295|4451147|2010|
| 18|2168175|2285990|4454165|2010|
| 19|2159571|2272689|4432260|2010|
+---+-------+-------+-------+----+
only showing top 20 rows
```

### 5.9 Converting Spark DataFrames to pandas DataFrames

The Spark DataFrame is fairly new, and the library's still a bit limited. There's no easy way to create a histogram of the data in a column, for example, or a line plot of the values in two columns.

To handle some of these shortcomings, we can convert a Spark DataFrame to a pandas DataFrame using the __toPandas()__ method. Converting an entire Spark DataFrame to a pandas DataFrame works __just fine for small data sets__. For larger ones, though, we'll want to select a subset of the data that's more manageable for pandas.


#### Instructions:

* Use the __[toPandas() method](https://spark.apache.org/docs/1.5.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.toPandas)__ to convert the Spark DataFrame to a Pandas DataFrame, and assign it to the variable pandas_df.
* Then, plot a histogram of the total column using the __[hist() method](http://pandas.pydata.org/pandas-docs/version/0.17.0/generated/pandas.DataFrame.hist.html)__.

#### Answers:

```python
pandas_df = df.toPandas()
pandas_df['total'].hist()
```

Outputs:

```python

```

### 5.10 Next Steps

In this mission, we explored the Spark DataFrame, and how to work with its methods to query and analyze data. In the next mission, we'll use SQL to interface with DataFrames.

## 6 Spark SQL

### 6.1 Overview

In the previous mission, we learned how to read JSON into a Spark DataFrame, as well as some basic techniques for interacting with DataFrames. In this mission, we'll learn how to use Spark's SQL interface to query and interact with the data. We'll continue to work with the 2010 U.S. Census data set in this mission. Later on, we'll add other files to demonstrate how to take advantage of SQL to work with __multiple data sets__.



### 6.2 Register the DataFrame as a Table

Before we can write and run SQL queries, we need to tell Spark to treat the DataFrame as a SQL table. Spark internally maintains a virtual database within the SQLContext object. This object, which we enter as __sqlCtx__, has methods for registering temporary tables.

To register a DataFrame as a table, call the __[registerTempTable() method](https://spark.apache.org/docs/1.5.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.registerTempTable)__ on that DataFrame object. This method requires one string parameter, __name__, that we use to set the table name for reference in our SQL queries.

#### Instructions

* Use the __registerTempTable()__ method to register the DataFrame df as a table named census2010.

* Then, run the SQLContext method __tableNames__ to return the list of tables.

    * Assign the resulting list to tables, and use the print function to display it.
    
#### Answers

```python
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)
df = sqlCtx.read.json("census_2010.json")
df.registerTempTable('census2010')
tables = sqlCtx.tableNames()
print(tables)
```

Outputs:
```python
['census2010']

df DataFrame (<class 'pyspark.sql.dataframe.DataFrame'>)
    DataFrame[age: bigint, females: bigint, males: bigint, total: bigint, year: bigint]
sqlCtx SQLContext (<class 'pyspark.sql.context.SQLContext'>)
    <pyspark.sql.context.SQLContext at 0x7f6e6e2a2a58>
tables list (<class 'list'>)
    ['census2010']
SQLContext type (<class 'type'>)
    pyspark.sql.context.SQLContext
```

### 6.3 Querying

Now that we've registered the table within __sqlCtx__, we can start writing and running SQL queries. With Spark SQL, we represent our query as a string and pass it into the sql() method within the SQLContext object. The __[sql() method](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SQLContext.sql)__ requires a single parameter, the query string. Spark will return the query results as a __DataFrame object__. This means you'll have to use show() to display the results, due to __lazy loading__.

While SQLite requires that queries end with a __semi-colon__, Spark SQL will actually throw an __error__ if you include it. Other than this difference in syntax, Spark's flavor of SQL is __identical__ to SQLite, and all the queries you've written for the SQL course will work here as well.

#### Instructions

```python
Write a SQL query that returns the age column from the table census2010, and use the show() method to display the first 20 results.
```

#### Answers:
```python
sqlCtx.sql('select age from census2010').show()
```

Outputs:
```python
+---+
|age|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+
```

### 6.4 Filtering

In the previous mission, we used DataFrame methods to find all of the rows where __age__ was greater than 5. If we only wanted to retrieve data from the males and females columns where that criteria were true, we'd need to __chain__ additional operations to the Spark DataFrame. To return the results in descending order instead of ascending order, we'd have to chain another method. The DataFrame methods are quick and powerful for simple queries, but __chaining__ them can be __cumbersome__ for more advanced queries.

SQL shines at expressing complex logic in a more compact manner. Let's brush up on SQL by writing a query that expresses more specific criteria.

#### Instructions

* Write and run a SQL query that returns:

* The males and females columns (in that order) where age > 5 and age < 15

#### Answers:
```python
query = 'SELECT males, females FROM census2010 WHERE age>5 AND age<15'
sqlCtx.sql(query).show()
```

Outputs:
```python
+-------+-------+
|  males|females|
+-------+-------+
|2093905|2007781|
|2097080|2010281|
|2101670|2013771|
|2108014|2018603|
|2114217|2023289|
|2118390|2026352|
|2132030|2037286|
|2159943|2060100|
|2195773|2089651|
+-------+-------+
```


### 6.5 Mixing Functionality

Because the results of SQL queries are __DataFrame objects__, we can combine the best aspects of both DataFrames and SQL to enhance our workflow. For example, we can write a SQL query that quickly returns a subset of our data as a DataFrame.

#### Instructions
* Write a SQL query that returns a DataFrame containing the males and females columns from the census2010 table.
* Use the __[describe() method](https://spark.apache.org/docs/1.5.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.describe)__ to calculate summary statistics for the DataFrame and the show() method to display the results.

#### Answers
```python
query = 'select males,females from census2010' ## sqlCtx.sql() returns a Spark DataFrame Object
sqlCtx.sql(query).describe().show()
```

Outputs:
```python
+-------+------------------+-----------------+
|summary|             males|          females|
+-------+------------------+-----------------+
|  count|               101|              101|
|   mean|1520095.3168316833|1571460.287128713|
| stddev|  818587.208016823|748671.0493484351|
|    min|              4612|            25673|
|    max|           2285990|          2331572|
+-------+------------------+-----------------+
```

### 6.6 Multiple Tables

One of the most powerful use cases in SQL is __joining tables__. Spark SQL takes this __a step further__ by enabling you to run join queries across data from __multiple file types__. Spark will read any of the file types and formats it supports into DataFrame objects and we can register each of these as tables within the SQLContext object to use for querying.

As we mentioned briefly in the previous mission, most data science organizations use a variety of file formats and data storage mechanisms. Spark SQL was built with the industry use cases in mind and enables data professionals to use one common query language, SQL, to interact with lots of different data sources. We'll explore joins in Spark SQL further, but first let's introduce the other datasets we'll be using:

* census_1980.json - 1980 U.S. Census data
* census_1990.json - 1990 U.S. Census data
* census_2000.json - 2000 U.S. Census data

#### Instructions
Read these additional datasets into DataFrame objects and then use the __registerTempTable()__ function to register these tables individually within SQLContext:

* census_1980.json as census1980,
* census_1990.json as census1990,
* census_2000.json as census2000.

Then use the method __tableNames()__ to list the tables within the SQLContext object, assign to __tables__, and finally print tables.

#### Answers
```python
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)

df = sqlCtx.read.json("census_2010.json")
df.registerTempTable('census2010')
df_2000 = sqlCtx.read.json("census_2000.json")
df_1990 = sqlCtx.read.json("census_1990.json")
df_1980 = sqlCtx.read.json("census_1980.json")

df_2000.registerTempTable('census2000')
df_1990.registerTempTable('census1990')
df_1980.registerTempTable('census1980')

tables = sqlCtx.tableNames() ## return a Python List
print(tables)
```


Outputs:
```python
['census1980', 'census1990', 'census2000', 'census2010']

df_1980 DataFrame (<class 'pyspark.sql.dataframe.DataFrame'>)
    DataFrame[age: bigint, females: bigint, males: bigint, total: bigint, year: bigint]
    
SQLContext type (<class 'type'>)
    pyspark.sql.context.SQLContext
    
sqlCtx SQLContext (<class 'pyspark.sql.context.SQLContext'>)
    <pyspark.sql.context.SQLContext at 0x7fad7e02f320>
    
df_1990 DataFrame (<class 'pyspark.sql.dataframe.DataFrame'>)
    DataFrame[age: bigint, females: bigint, males: bigint, total: bigint, year: bigint]
    
tables list (<class 'list'>)
    ['census1980', 'census1990', 'census2000', 'census2010']
    
df_2000 DataFrame (<class 'pyspark.sql.dataframe.DataFrame'>)
    DataFrame[age: bigint, females: bigint, males: bigint, total: bigint, year: bigint]
    
df DataFrame (<class 'pyspark.sql.dataframe.DataFrame'>)
    DataFrame[age: bigint, females: bigint, males: bigint, total: bigint, year: bigint]
```

### 6.7 Joins

Now that we have a table for each dataset, we can write join queries to compare values across them. Since we're working with Census data, let's use the age column as the joining column.

#### Instructions

* Write a query that returns a DataFrame with the total columns for the tables census2010 and census2000 (in that order).
* Then, run the query and use the show() method to display the first 20 results.

#### Answers
```python
query = '''
SELECT census2010.total, census2000.total 
FROM census2010 
INNER JOIN census2000 
ON census2000.age=census2010.age
'''
sqlCtx.sql(query).show(20)
```


Outputs
```python
query str (<class 'str'>)
    '\nSELECT census2010.total, census2000.total \nFROM census2010 \nINNER JOIN census2000 \nON census2000.age=census2010.age\n'

sqlCtx SQLContext (<class 'pyspark.sql.context.SQLContext'>)
    <pyspark.sql.context.SQLContext at 0x7f50bfbb8f60>

+-------+-------+
|  total|  total|
+-------+-------+
|4079669|3733034|
|4085341|3825896|
|4089295|3904845|
|4092221|3970865|
|4094802|4024943|
|4097728|4068061|
|4101686|4101204|
|4107361|4125360|
|4115441|4141510|
|4126617|4150640|
|4137506|4152174|
|4144742|4145530|
|4169316|4139512|
|4220043|4138230|
|4285424|4137982|
|4347028|4133932|
|4410804|4130632|
|4451147|4111244|
|4454165|4068058|
|4432260|4011192|
+-------+-------+
```

### 6.8 SQL Functions

The functions and operators from SQLite that we've used in the past are available for us to use in Spark SQL:

* COUNT()
* AVG()
* SUM()
* AND
* OR

#### Instructions

Write a query that calculates the sums of the __total__ column from each of the tables, in the following order:

* census2010,
* census2000,
* census1990.

You'll need to perform two inner joins for this query (all datasets have the same values for __age__, which makes things convenient for joining).


#### Answers
```python
query = '''
SELECT SUM(census2010.total), SUM(census2000.total), SUM(census1990.total)
FROM census2010 
INNER JOIN census2000 ON census2010.age=census2000.age 
INNER JOIN census1990 ON census2000.age=census1990.age ## 2nd join is based on the results of 1st join
'''

sqlCtx.sql(query).show()
```


Outputs:
```python
+----------+----------+----------+
|sum(total)|sum(total)|sum(total)|
+----------+----------+----------+
| 312247116| 284594395| 254506647|
+----------+----------+----------+

query str (<class 'str'>)
    '\nSELECT SUM(census2010.total), SUM(census2000.total), SUM(census1990.total)\nFROM census2010 \nINNER JOIN census2000 ON census2010.age=census2000.age \nINNER JOIN census1990 ON census2000.age=census1990.age\n'

sqlCtx SQLContext (<class 'pyspark.sql.context.SQLContext'>)
    <pyspark.sql.context.SQLContext at 0x7f50bfbee320>
```
