The core data structure in Spark is a resilient distributed data set (RDD). As the name suggests, an RDD is Spark's representation of a data set that's distributed across the RAM, or memory, of a cluster of many machines. An RDD object is essentially a collection of elements we can use to hold lists of tuples, dictionaries, lists, etc. Similar to a pandas DataFrame, we can load a data set into an RDD, and then run any of the methods accesible to that object.

PySpark

While the Spark toolkit is in Scala, a language that compiles down to bytecode for the JVM, the open source community has developed a wonderful toolkit called PySpark that allows us to interface with RDDs in Python. Thanks to a library called Py4J, Python can interface with Java objects (in our case RDDs). Py4J is also one of the tools that makes PySpark work.

In this mission, we'll work with a data set containing the names of all of the guests who have appeared on The Daily Show.

To start off, we'll load the data set into an RDD. We're using the TSV version of FiveThirtyEight's data set. TSV files use a tab character ("\t") as the delimiter, instead of the comma (",") that CSV files use.

['YEAR\tGoogleKnowlege_Occupation\tShow\tGroup\tRaw_Guest_List',
 '1999\tactor\t1/11/99\tActing\tMichael J. Fox',
 '1999\tComedian\t1/12/99\tComedy\tSandra Bernhard',
 '1999\ttelevision actress\t1/13/99\tActing\tTracey Ullman',
 '1999\tfilm actress\t1/14/99\tActing\tGillian Anderson']
 
textFile is a method of a org.apache.spark.SparkContext class that reads a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.

In [1]:
raw_data = sc.textFile("daily_show.tsv")
raw_data.take(5)



NameError: name 'sc' is not defined

In Spark, the SparkContext object manages the connection to the clusters, and coordinates the running of processes on those clusters. More specifically, it connects to the cluster managers. The cluster managers control the executors that run the computations. Here's a diagram from the Spark documentation that will help you visualize the architecture:

 ![title](cluster.png)
 
We automatically have access to the SparkContext object sc. We then run the following code to read the TSV data set into an RDD object raw_data:

raw_data = sc.textFile("daily_show.tsv")

The RDD object raw_data closely resembles a list of string objects, with one object for each line in the data set. We then use the take() method to print the first five elements of the RDD:

raw_data.take(5)

To explore the other methods an RDD object has access to, check out the PySpark documentation. take(n) will return the first n elements of the RDD.



You may be wondering why, if an RDD resembles a Python list, we don't just use bracket notation to access elements in the RDD.

The answer is that Spark distributes RDD objects across many partitions, and the RDD object is specifically designed to handle distributed data. We can't rely on the standard implementation of a list for these reasons.

Spark offers many advantages over regular Python, though. For example, thanks to RDD abstraction, you can run Spark locally on your own computer. Spark will simulate distributing your calculations over many machines by automatically slicing your computer's memory into partitions.

Spark's RDD implementation also lets us evaluate code "lazily," meaning we can postpone running a calculation until absolutely necessary. On the previous screen, Spark waited to load the TSV file into an RDD until raw_data.take(5) executed. When our code called raw_data = sc.textFile("dail_show.tsv"), Spark created a pointer to the file, but didn't actually read it into raw_data until raw_data.take(5) needed that variable to run its logic.

The advantage of "lazy" evaluation is that we can build up a queue of tasks and let Spark optimize the overall workflow in the background. In regular Python, the interpreter can't do much workflow optimization. We'll see more examples of lazy evaluation later on.

While Spark borrowed heavily from Hadoop's MapReduce pattern, it's still quite different in many ways. 

The key idea to understand when working with Spark is data pipelining. Every operation or calculation in Spark is essentially a series of steps that we can chain together and run in succession to form a pipeline. Each step in the pipeline returns either a Python value (such as an integer), a Python data structure (such as a dictionary), or an RDD object.

We'll start with the map() function.

Map()

The map(f) function applies the function f to every element in the RDD. Because RDDs are iterable objects (like most Python objects), Spark runs function f on each iteration and returns a new RDD.

We'll walk through an example of a map function so you can get a better sense of how it works. If you look carefully, you'll see that raw_data is in a format that's hard to work with. While the elements are currently all strings, we'd like to convert each of them into a list to make the data more manageable. To do this the traditional way, we would:

1. Use a 'for' loop to iterate over the collection
2. Split each `string` on the delimiter
3. Store the result in a `list`

Let's see how we can use map to do this with Spark instead.

In the code cell:

1. Call the RDD function `map()` to specify we want to apply the logic in the parentheses to every line in our data set.
2. Write a lambda function that splits each line using the tab delimiter (\t), and assign the resulting RDD to `daily_show`.
3. Call the RDD function `take()` on `daily_show` to display the first five elements (or rows) of the resulting RDD.

We call the map(f) function a transformation step. It requires either a named or lambda function f.



In [None]:
daily_show = raw_data.map(lambda line: line.split('\t'))
daily_show.take(5)
# Hit run to see the output

One of the wonderful features of PySpark is the ability to separate our logic - which we prefer to write in Python - from the actual data transformation. In the previous code cell, we wrote this lambda function in Python code:

raw_data.map(lambda line: line.split('\t'))

Even though the function was in Python, we also took advantage of Scala when Spark actually ran the code over our RDD. This is the power of PySpark. Without learning any Scala, we get to harness the data processing performance gains from Spark's Scala architecture. Even better, when we ran the following code, it returned the results to us in Python-friendly notation:

daily_show.take(5)

Transformations and Actions

There are two types of methods in Spark:

1. Transformations - map(), reduceByKey()
2. Actions - take(), reduce(), saveAsTextFile(), collect()

Transformations are lazy operations that always return a reference to an RDD object. Spark doesn't actually run the transformations, though, until an action needs to use the RDD resulting from a transformation. Any function that returns an RDD is a transformation, and any function that returns a value is an action. These concepts will become more clear as we work through this lesson and practice writing PySpark code.

Immutability

You may be wondering why we couldn't just split each string in place, instead of creating a new object daily_show. In Python, we could have modified the collection element-by-element in place, without returning and assigning the results to a new object.

RDD objects are immutable, meaning that we can't change their values once we've created them. In Python, list and dictionary objects are mutable (we can change their values), while tuple objects are immutable. The only way to modify a tuple object in Python is to create a new tuple object with the necessary updates. 


Spark uses the immutability of RDDs to enhance calculation speeds. The mechanics of how it does this is complex. HOW?
https://www.quora.com/Why-is-RDD-immutable-in-Spark
1. Immutability rules out a big set of potential problems due to updates from multiple threads at once. Immutable data is definitely safe to share across processes.

2. They're not just immutable but a deterministic function of their input. This plus immutability also means the RDD's parts can be recreated at any time. This makes caching, sharing and replication easy.

3. These are significant design wins, at the cost of having to copy data rather than mutate it in place. Generally, that's a decent tradeoff to make: gaining the fault tolerance and correctness with no developer effort is worth spending memory and CPU on, since the latter are cheap.

4. A corollary: immutable data can as easily live in memory as on disk. This makes it reasonable to easily move operations that hit disk to instead use data in memory, and again, adding memory is much easier than adding I/O bandwidth.

5. Of course, an RDD isn't really a collection of data, but just a recipe for making data from other data. It is not literally computed by materializing every RDD completely. That is, a lot of the "copy" can be optimized away too.

6. The idea of an RDD has no origin in MapReduce.



We'd like to tally up the number of guests who have appeared on The Daily Show during each year. If daily_show were a list of lists, we could write the following Python code to achieve this result:

The keys in tally will be the years, and the values will be the totals for the number of lines associated with each year.

To achieve the same result with Spark, we'll have to use a Map step, then a ReduceByKey step.

In [None]:
tally = dict()
for line in daily_show:
  year = line[0]
  if year in tally.keys():
    tally[year] = tally[year] + 1
  else:
    tally[year] = 1

In [None]:
# Apache Spark Code

tally = daily_show.map(lambda x: (x[0], 1)).reduceByKey(lambda x,y: x+y)
print(tally)

# Output --> PythonRDD[10] at RDD at PythonRDD.scala:48

# Variables
"""tallyPipelinedRDD (<class 'pyspark.rdd.PipelinedRDD'>)
PythonRDD[10] at RDD at PythonRDD.scala:48
 daily_showPipelinedRDD (<class 'pyspark.rdd.PipelinedRDD'>)
PythonRDD[11] at RDD at PythonRDD.scala:48"""



You may have noticed that printing tally didn't return the histogram we were hoping for. Because of lazy evaluation, PySpark delayed executing the map and reduceByKey steps until we actually need them. Before we use take() to preview the first few elements in tally, we'll walk through the code we just wrote.

daily_show.map(lambda x: (x[0], 1)).reduceByKey(lambda x, y: x+y)

During the map step, we used a lambda function to create a tuple consisting of:
key: x[0] (the first value in the list)
value: 1 (the integer)

Our high-level strategy was to create a tuple with the key representing the year, and the value representing 1. After running the map step, Spark will maintain in memory a list of tuples resembling the following:

('YEAR', 1)
('1991', 1)
('1991', 1)
('1991', 1)
('1991', 1)
...

We'd like to reduce that down to:

('YEAR', 1)
('1991', 4)
...

reduceByKey(f) combines tuples with the same key using the function we specify, f.

To see the results of these two steps, we'll use the take command, which forces lazy code to run immediately. Because tally is an RDD, we can't use Python's len function to find out how many elements are in the collection. Instead, we'll need to use the RDD count() function.

In [None]:
tally.take(tally.count())

"""Output
[('YEAR', 1),
 ('2013', 166),
 ('2001', 157),
 ('2004', 164),
 ('2000', 169),
 ('2015', 100),
 ('2010', 165),
 ('2006', 161),
 ('2014', 163),
 ('2003', 166),
 ('2002', 159),
 ('2011', 163),
 ('2012', 164),
 ('2008', 164),
 ('2007', 141),
 ('2005', 162),
 ('1999', 166),
 ('2009', 163)]"""

Unlike pandas, Spark knows nothing about column headers, and didn't set them aside. We need a way to remove the element ('YEAR', 1) from our collection. We'll need a workaround, though, because RDD objects are immutable once we create them. The only way to remove that tuple is to create a new RDD object that doesn't have it.

Spark comes with a filter(f) function that creates a new RDD by filtering an existing one for specific criteria. If we specify a function f that returns a binary value, True or False, the resulting RDD will consist of elements where the function evaluated to True. You can read more about the filter function in the Spark documentation.


In [None]:
"""Write a function named filter_year that we can use to filter out the element that 
begins with the text YEAR, instead of an actual year."""

def filter_year(line):
    # logic here
    if line[0] == 'YEAR':
        return False
    else:
        return True
    return True

filtered_daily_show = daily_show.filter(lambda line: filter_year(line))



What are the advantages of DAG (directed acyclic graph) execution of big data algorithms over MapReduce? I know that Apache Spark, Storm, and Tez use the DAG execution model, over MapReduce. Why? Are there any disadvantages?

https://www.quora.com/What-are-the-advantages-of-DAG-directed-acyclic-graph-execution-of-big-data-algorithms-over-MapReduce-I-know-that-Apache-Spark-Storm-and-Tez-use-the-DAG-execution-model-over-MapReduce-Why-Are-there-any-disadvantages/answer/Tathagata-Das?share=1&srid=umKP

1. Conceptually DAG model is a strict generalization of MapReduce model. DAG-based systems like Spark and Tez that are aware of the whole DAG of operations can do better global optimizations than systems like Hadoop MapReduce which are unaware of the DAG to be executed.

2. MapReduce model simply states that distributed computation on a large dataset can be boiled down to two kinds of computation steps - a map step and a reduce step. One pair of map and reduce does one level of aggregation over the data. Complex computations typically require multiple such steps. When you have multiple such steps, it essentially forms a DAG of operations. So a DAG execution model is essentially a generalization of the MapReduce model. (Elaboration of 1 above)

3. Computations expressed in Hadoop MapReduce boil down to multiple iterations of: 
    (i) read data from HDFS, 
    (ii) apply map and reduce, 
    (iii) write back to HDFS. 
    So, Each map-reduce round is completely independent of each other, and Hadoop does not have any global knowledge of what MR steps are going to come after each MR. For many iterative algorithms this is inefficient as the data between each map-reduce pair gets written and read from filesystem. Newer systems like Spark and Tez improves performance over Hadoop by considering the whole DAG of map-reduce steps and optimizing it globally (e.g., pipelining consecutive map steps into one, not write intermediate data to HDFS). This prevents writing data back and forth after every reduce.
    
4. Spark Streaming does not pre-allocate, rather uses the underlying Spark's mechanisms to dynamically allocate tasks to available resources.

To flex Spark's muscles, we'll demonstrate how to chain together a series of data transformations into a pipeline, and observe Spark managing everything in the background. The developers who wrote Spark had this functionality in mind, and optimized it for running tasks in succession.

Before Spark came along, running lots of tasks in succession in Hadoop was incredibly time consuming. Hadoop had to write intermediate results to disk, and wasn't aware of the full pipeline. Thanks to its aggressive approach to memory use and well-architected core, Spark improves on Hadoop's turnaround time significantly. 

In the following code cell, we'll filter out actors for whom the profession is blank, lowercase each profession, generate a histogram of professions, and output the first five tuples in the histogram.

In [None]:
filtered_daily_show.filter(lambda line: line[1] != '') \
                   .map(lambda line: (line[1].lower(), 1)) \
                   .reduceByKey(lambda x,y: x+y) \
                   .take(5)

MapReduce Patterns implemented in Apache Spark

https://mapr.com/blog/mapreduce-design-patterns-implemented-apache-spark/

How to set up Spark on your own computer and integrate PySpark with Jupyter Notebook? 

We can use Spark in two modes:

1. Local mode - The entire Spark application runs on a single machine. Local mode is what you'll use to prototype Spark code on your own computer. It's also easier to set up.

2. Cluster mode - The Spark application runs across multiple machines. Cluster mode is what you'll use when you want to run your Spark application across multiple machines in a cloud environment like Amazon Web Services, Microsoft Azure, or Digital Ocean.

For now, we'll walk through the instructions for installing Spark in local mode on Windows, Mac, and Linux. We'll cover how to install Spark in cluster mode as part of the data engineering track.

Here's a diagram describing the high-level components you'll be setting up today:

![title](sparkpytharch.png)

In [3]:
pip install pyspark

Collecting pyspark
  Downloading https://files.pythonhosted.org/packages/9a/5a/271c416c1c2185b6cb0151b29a91fff6fcaed80173c8584ff6d20e46b465/pyspark-2.4.5.tar.gz (217.8MB)
Collecting py4j==0.10.7
  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py): started
  Building wheel for pyspark (setup.py): still running...
  Building wheel for pyspark (setup.py): finished with status 'done'
  Created wheel for pyspark: filename=pyspark-2.4.5-py2.py3-none-any.whl size=218257935 sha256=7c50a53bed32debf1bafeb45e79121995220a9f44c69a095fa4598c332f94762
  Stored in directory: C:\Users\Bidhan\AppData\Local\pip\Cache\wheels\bf\db\04\61d66a5939364e756eb1c1be4ec5bdce6e04047fc7929a3c3c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.5
Note: yo

In [1]:
pip show pyspark

Name: pyspark
Version: 2.4.5
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: dev@spark.apache.org
License: http://www.apache.org/licenses/LICENSE-2.0
Location: c:\users\bidhan\anaconda3\lib\site-packages
Requires: py4j
Required-by: 
Note: you may need to restart the kernel to use updated packages.


In [2]:
pip install findspark

Collecting findspark
  Downloading https://files.pythonhosted.org/packages/b1/c8/e6e1f6a303ae5122dc28d131b5a67c5eb87cbf8f7ac5b9f87764ea1b1e1e/findspark-1.3.0-py2.py3-none-any.whl
Installing collected packages: findspark
Successfully installed findspark-1.3.0
Note: you may need to restart the kernel to use updated packages.


In [8]:
import findspark
findspark.init('C:/Users/Bidhan/spark-2.4.5-bin-hadoop2.6')

In [9]:
import pyspark
sc = pyspark.SparkContext()

In [10]:
# Read `recent-grads.csv` in to an RDD.
f = sc.textFile('recent-grads.csv')
data = f.map(lambda line: line.split('\n'))

In [13]:
data.take(10)

[['Rank,Major_code,Major,Total,Men,Women,Major_category,ShareWomen,Sample_size,Employed,Full_time,Part_time,Full_time_year_round,Unemployed,Unemployment_rate,Median,P25th,P75th,College_jobs,Non_college_jobs,Low_wage_jobs'],
 ['1,2419,PETROLEUM ENGINEERING,2339,2057,282,Engineering,0.120564344,36,1976,1849,270,1207,37,0.018380527,110000,95000,125000,1534,364,193'],
 ['2,2416,MINING AND MINERAL ENGINEERING,756,679,77,Engineering,0.101851852,7,640,556,170,388,85,0.117241379,75000,55000,90000,350,257,50'],
 ['3,2415,METALLURGICAL ENGINEERING,856,725,131,Engineering,0.153037383,3,648,558,133,340,16,0.024096386,73000,50000,105000,456,176,0'],
 ['4,2417,NAVAL ARCHITECTURE AND MARINE ENGINEERING,1258,1123,135,Engineering,0.107313196,16,758,1069,150,692,40,0.050125313,70000,43000,80000,529,102,0'],
 ['5,2405,CHEMICAL ENGINEERING,32260,21239,11021,Engineering,0.341630502,289,25694,23170,5180,16697,1672,0.061097712,65000,50000,75000,18314,4440,972'],
 ['6,2418,NUCLEAR ENGINEERING,2573,2200,373,En

https://blog.tanka.la/2018/09/02/run-your-first-spark-program-using-pyspark-and-jupyter-notebook/

In [None]:
raw_hamlet = sc.textFile('hamlet.txt')
raw_hamlet.take(5)

In [None]:
split_hamlet = raw_hamlet.map(lambda line: line.split('\t'))

Lambda functions are great for writing quick functions we can pass into PySpark methods with simple logic. They fall short when we need to write more customized logic, though. Thankfully, PySpark lets us define a function in Python first, then pass it in. Any function that returns a sequence of data in PySpark (versus a guaranteed Boolean value, like filter() requires) must use a yield statement to specify the values that should be pulled later.

If you're unfamiliar with the yield statement in Python, read this excellent Stack Overflow answer (https://stackoverflow.com/questions/231767/what-does-the-yield-keyword-do/231855#231855) on the topic. 

To summarize, yield is a Python technique that allows the interpreter to generate data on the fly and pull it when necessary, instead of storing it to memory immediately. Because of its unique architecture, Spark takes advantage of this technique to reduce overhead and improve the speed of computations.

Spark runs the named function on every element in the RDD and restricts it in scope. Each instance of the function only has access to the object(s) you pass into the function, and the Python libraries available in your environment. If you try to refer to variables outside the scope of the function or import libraries, those actions may cause the computation to crash. That's because Spark compiles the function's code to Java to run on the RDD objects (which are also in Java).

Finally, not all functions require us to use yield; only the ones that generate a custom sequence of data do. For map() or filter(), we use return to return a value for every single element in the RDD we're running the functions on.



flatMap() is different than map() because it doesn't require an output for every element in the RDD. The flatMap() method is useful whenever we want to generate a sequence of values from an RDD.

We can't use the map() method for this because it requires a return value for every element in the RDD.

Difference between map() and flatMap(): https://www.baeldung.com/java-difference-map-and-flatmap

In [None]:
def hamlet_speaks(line):
    id = line[0]
    speaketh = False
    
    if "HAMLET" in line:
        speaketh = True
    
    if speaketh:
        yield id,"hamlet speaketh!"

hamlet_spoken = split_hamlet.flatMap(lambda x: hamlet_speaks(x))
#hamlet_spoken.take(10)

Write a named function filter_hamlet_speaks to pass into filter(). Apply it to split_hamlet to return an RDD with the elements containing the word HAMLET.

In [None]:


def filter_hamlet_speaks(line):
    if "HAMLET" in line:
        return True
    else:
        return False

hamlet_spoken_lines = split_hamlet.filter(lambda line: filter_hamlet_speaks(line))
hamlet_spoken_lines.take(5)

Whenever we use an action method, Spark forces the evaluation of lazy code. If we only chain together transformation methods and print the resulting RDD object, we'll see the type of RDD (e.g. a PythonRDD or PipelinedRDD object), but not the elements within it. That's because the computation hasn't actually happened yet.

Even though Spark simplifies chaining lots of transformations together, it's good practice to use actions to observe the intermediate RDD objects between those transformations. This will let you know whether your transformations are working the way you expect them to.

1. The count() method returns the number of elements in an RDD.

2. We've used take() to preview the first few elements of an RDD, similar to the way we've used head() in pandas. But what about returning all of the elements in a collection? We need to do this to write an RDD to a CSV, for example. It's also useful for running some basic Python code over a collection without going through PySpark.

3. Running .collect() on an RDD returns a list representation of it. 

In [None]:
spoken_count = hamlet_spoken_lines.count()
spoken_101 = list(hamlet_spoken_lines.collect())[100]

We hope you have a better idea of how to use PySpark to transform a dataset into a format that's better for data analysis. 

We also learned how to use actions to explore an RDD before chaining another transformation to it.

The next mission is a challenge that will test your understanding of Spark, transformations, actions, lambda functions, and the MapReduce paradigm in general. 

After that challenge, we'll explore Spark DataFrames and how to analyze data using all the techniques we've learned.



https://training.databricks.com/visualapi.pdf

## Spark Challenge

In [None]:
raw_hamlet = sc.textFile("hamlet.txt")
split_hamlet = raw_hamlet.map(lambda line: line.split('\t'))
split_hamlet.take(5)

def format_id(x):
    id = x[0].split('@')[1]
    results = list()
    results.append(id)
    if len(x) > 1:
        for y in x[1:]:
            results.append(y)
    return results

hamlet_with_ids = split_hamlet.map(lambda line: format_id(line))

In [None]:
hamlet_with_ids.take(5)

real_text = hamlet_with_ids.filter(lambda line: len(line) > 1)
hamlet_text_only = real_text.map(lambda line: [l for l in line if l != ''])
hamlet_text_only.take(10)

In [None]:
hamlet_text_only.take(10)
def fix_pipe(line):
    results = list()
    for l in line:
        if l == "|":
            pass
        elif "|" in l:
            fmtd = l.replace("|", "")
            results.append(fmtd)
        else:
            results.append(l)
    return results

clean_hamlet = hamlet_text_only.map(lambda line: fix_pipe(line))

## Spark Dataframes

The Spark DataFrame is a feature that allows you to create and work with DataFrame objects. As you may have guessed, pandas inspired it.

Spark is well known for its ability to process large data sets. Spark DataFrames combine the scale and speed of Spark with the familiar query, filter, and analysis capabilities of pandas. Unlike pandas, which can only run on one computer, Spark can use distributed memory (and disk when necessary) to handle larger data sets and run computations more quickly.

Spark DataFrames allow us to modify and reuse our existing pandas code to scale up to much larger data sets. They also have better support for various data formats. We can even use a SQL interface to write distributed SQL queries that query large database systems and other data stores.

In [None]:
with open('census_2010.json') as f:
    for i in range(4):
        print(f.readline())
        


In previous missions, we explored reading data into an RDD object. Recall that an RDD is essentially a list of tuples with no enforced schema or structure of any kind. An RDD can have a variable number of elements in each tuple, and combinations of types between tuples.

RDDs are useful for representing unstructured data like text. Without them, we'd need to write a lot of custom Python code to interact with such data.

We use the SparkContext object to read data into an RDD:

raw_data = sc.textFile(\"daily_show.tsv\")
daily_show = raw_data.map(lambda line: line.split('\t'))

To use the familiar DataFrame query interface from pandas, however, the data representation needs to include rows, columns, and types. Spark's implementation of DataFrames mirrors the pandas implementation, with logic for rows and columns.

The Spark SQL class is very powerful. It gives Spark more information about the data structure we're using and the computations we want to perform. Spark uses that information to optimize processes.

To take advantage of these features, we'll have to use the SQLContext object to structure external data as a DataFrame, instead of the SparkContext object.

We can query Spark DataFrame objects with SQL, which we'll explore in the next mission. The SQLContext class gets its name from this capability.

This class allows us to read in data and create new DataFrames from a wide range of sources. It can do this because it takes advantage of Spark's powerful Data Sources API.

File Formats

JSON, CSV/TSV, XML
Parquet, Amazon S3 (cloud storage service)

Big Data Systems
Hive, Avro, HBase

SQL Database Systems
MySQL, PostgreSQL

Data science organizations often use a wide range of systems to collect and store data, and they're constantly making changes to those systems. Spark DataFrames allow us to interface with different types of data, and ensure that our analysis logic will still work as the data storage mechanisms change.

In [None]:
# Import SQLContext
from pyspark.sql import SQLContext

# Pass in the SparkContext object `sc`
sqlCtx = SQLContext(sc)

# Read JSON data into a DataFrame object `df`
df = sqlCtx.read.json("census_2010.json")

# Print the type
print(type(df))

When we read data into the SQLContext object, Spark:

1. Instantiates a Spark DataFrame object
2. Infers the schema from the data and associates it with the DataFrame
3. Reads in the data and distributes it across clusters (if multiple clusters are available)
4. Returns the DataFrame object

We expect the DataFrame Spark created to have the following columns, which were the keys in the JSON data set:

age
females
males
total
year

Spark has its own type system that's similar to the pandas type system. To create a DataFrame, Spark iterates over the data set twice - once to extract the structure of the columns, and once to infer each column's type. Let's use one of the Spark DataFrame instance methods to display the schema for the DataFrame we're working with.

In [None]:
sqlCtx = SQLContext(sc)
df = sqlCtx.read.json("census_2010.json")
df.printSchema()

As we mentioned before, the pandas DataFrame heavily influenced the Spark DataFrame implementation. Here are some of the methods we can find in both:

1. agg()
2. join()
3. sort()
4. where()

Unlike pandas DataFrames, however, Spark DataFrames are immutable, which means we can't modify existing objects. Most transformations on an object return a new DataFrame reflecting the changes instead. As we discussed in previous missions, Spark's creators deliberately designed immutability into Spark to make it easier to work with distributed data structures.

Pandas and Spark DataFrames also have different underlying data structures. Pandas DataFrames are built around Series objects, while Spark DataFrames are built around RDDs. We can perform most of the same computations and transformations on Spark DataFrames that we can on pandas DataFrames, but the styles and methods are somewhat different. We'll explore how to perform common pandas functions with Spark in this mission.


In [None]:
df.show(5) #only prints the top 5 rows

In pandas, we used the head() method to return the first n rows. This is one of the differences between the DataFrame implementations. Instead of returning a nicely formatted table of values, the head() method in Spark returns a list of row objects. Spark needs to return row objects for certain methods, such as head(), collect() and take().

Instructions:

Use the head() method to return the first five rows in the DataFrame as a list of row objects, and assign the result to the variable first_five.

Print the age value for each row object in first_five.

In [None]:
# You can access a row's attributes by the column name using dot notation, 
# and by position using bracket notation with an index:

row_one = df.head(5)[0]
# Access value for age
row_one.age
# Access the first value
row_one[0]

###

first_five = df.head(5)
for i in range(5):
    print(first_five[i].age)
    

In pandas, we passed a string into a single pair of brackets ([]) to select an individual column, and passed in a list to select multiple columns:

# Pandas DataFrame

df['age']

df[['age', 'males']]

We can still use bracket notation in Spark. We'll need to pass in a list of string objects, though, even when we're only selecting one column.

Spark takes advantage of lazy loading with DataFrames, and will only display the results of an operation when we call the show() method. Instead of using bracket notation, we can also use the select() method to select columns:

# Spark DataFrame

df.select('age')

df.select('age', 'males')

In the following code cell, we demonstrate how to select and display the age column. Use what you've learned to take this a step farther and select multiple columns.

In [None]:
df[['age']].show()
df.select('age','males','females').show()

In pandas, we used Boolean filtering to select only the rows we were interested in. 

Spark preserves the very same functionality and notation.

1. Use the pandas notation for Boolean filtering to select the rows where age is greater than five.
2. Assign the resulting DataFrame to the variable five_plus.
3. Use the show() method to display five_plus.

In [None]:
five_plus = df[df['age']>5]
five_plus.show()

We can compare the columns in Spark DataFrames with each other, and use the comparison criteria as a filter. 

For example, to get the rows where the population of males exceeded females in 2010, we'd write the same notation that we would use in pandas.

Q. Find all of the rows where females is less than males, and use show() to display the first 20 results.

In [None]:
df[df['females']<df['males']].show(20)

The Spark DataFrame is fairly new, and the library's still a bit limited. There's no easy way to create a histogram of the data in a column, for example, or a line plot of the values in two columns.

To handle some of these shortcomings, we can convert a Spark DataFrame to a pandas DataFrame using the toPandas() method. Converting an entire Spark DataFrame to a pandas DataFrame works just fine for small data sets. For larger ones, though, we'll want to select a subset of the data that's more manageable for pandas.

In [None]:
pandas_df = df.toPandas()
pandas_df['total'].hist()

So we have more or less explored the Spark DataFrame, and how to work with its methods to query and analyze data. 

In the next coming cells, we'll use SQL to interface with DataFrames.

# Spark SQL

Before we can write and run SQL queries, we need to tell Spark to treat the DataFrame as a SQL table. Spark internally maintains a virtual database within the SQLContext object. This object, which we enter as sqlCtx, has methods for registering temporary tables.

To register a DataFrame as a table, call the registerTempTable() method on that DataFrame object. This method requires one string parameter, name, that we use to set the table name for reference in our SQL queries.

In [None]:
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)
df = sqlCtx.read.json("census_2010.json")
df.registerTempTable("census2010")
tables = sqlCtx.tableNames()
print(tables)

sqlCtx.sql("select age from census2010").show()

query = 'select males, females from census2010 where age>5 and age<15'
sqlCtx.sql(query).show()

query = 'select males,females from census2010'
sqlCtx.sql(query).describe().show()

df_2000 = sqlCtx.read.json("census_2000.json")
df_1990 = sqlCtx.read.json("census_1990.json")
df_1980 = sqlCtx.read.json("census_1980.json")

df_2000.registerTempTable('census2000')
df_1990.registerTempTable('census1990')
df_1980.registerTempTable('census1980')
tables = sqlCtx.tableNames()
print(tables)

query = """
 select census2010.total, census2000.total
 from census2010
 inner join census2000
 on census2010.age=census2000.age
"""

sqlCtx.sql(query).show()

Now that we've registered the table within sqlCtx, we can start writing and running SQL queries. With Spark SQL, we represent our query as a string and pass it into the sql() method within the SQLContext object. The sql() method requires a single parameter, the query string. Spark will return the query results as a DataFrame object. This means you'll have to use show() to display the results, due to lazy loading.

While SQLite requires that queries end with a semi-colon, Spark SQL will actually throw an error if you include it. Other than this difference in syntax, Spark's flavor of SQL is identical to SQLite.

In [None]:
sqlCtx.sql("select age from census2010").show()

Previously we used DataFrame methods to find all of the rows where age was greater than 5. If we only wanted to retrieve data from the males and females columns where that criteria were true, we'd need to chain additional operations to the Spark DataFrame. To return the results in descending order instead of ascending order, we'd have to chain another method. The DataFrame methods are quick and powerful for simple queries, but chaining them can be cumbersome for more advanced queries.

SQL shines at expressing complex logic in a more compact manner. Let's brush up on SQL by writing a query that expresses more specific criteria.

In [None]:
query = 'select males, females from census2010 where age>5 and age<15'
sqlCtx.sql(query).show()

Because the results of SQL queries are DataFrame objects, we can combine the best aspects of both DataFrames and SQL to enhance our workflow. For example, we can write a SQL query that quickly returns a subset of our data as a DataFrame.

In [None]:
query = 'select males,females from census2010'
sqlCtx.sql(query).describe().show()

One of the most powerful use cases in SQL is joining tables. Spark SQL takes this a step further by enabling you to run join queries across data from multiple file types. Spark will read any of the file types and formats it supports into DataFrame objects and we can register each of these as tables within the SQLContext object to use for querying.

As we mentioned briefly in the previous mission, most data science organizations use a variety of file formats and data storage mechanisms. Spark SQL was built with the industry use cases in mind and enables data professionals to use one common query language, SQL, to interact with lots of different data sources. 

In [None]:
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)
df = sqlCtx.read.json("census_2010.json")
df.registerTempTable('census2010')
df_2000 = sqlCtx.read.json("census_2000.json")
df_1990 = sqlCtx.read.json("census_1990.json")
df_1980 = sqlCtx.read.json("census_1980.json")

df_2000.registerTempTable('census2000')
df_1990.registerTempTable('census1990')
df_1980.registerTempTable('census1980')
tables = sqlCtx.tableNames()
print(tables)

Now that we have a table for each dataset, we can write join queries to compare values across them. Since we're working with Census data, let's use the age column as the joining column.

In [None]:
query = """
 select census2010.total, census2000.total
 from census2010
 inner join census2000
 on census2010.age=census2000.age
"""

sqlCtx.sql(query).show()

The functions and operators from SQLite that we've used in the past are available for us to use in Spark SQL:

1. COUNT()
2. AVG()
3. SUM()
4. AND
5. OR

In [None]:
query = """
 select sum(census2010.total), sum(census2000.total), sum(census1990.total)
 from census2010
 inner join census2000
 on census2010.age=census2000.age
 inner join census1990
 on census2010.age=census1990.age
"""
sqlCtx.sql(query).show()

What is the purpose of Spark SQL?

https://www.quora.com/What-is-the-purpose-of-Spark-SQL