![image1](image1.PNG)

## spark

Apache Spark is an open-source cluster-computing framework. Originally developed at the University of California, Berkeley's AMPLab, the Spark codebase was later donated to the Apache Software Foundation, which has maintained it since. Spark it is a fast and general engine for large-scale data processing.
Databricks is a company founded by the creators of Apache Spark, that aims to help clients with cloud-based big data processing using Spark.

## Spark context

In Spark, communication occurs between a driver and executors. The driver has Spark jobs that it needs to run and these jobs are split into tasks that are submitted to the executors for completion. Executor programs run on cluster nodes or in local threads. The results from these tasks are delivered back to the driver.
The code runs
+ Locally, in the driver
+ Distributed at the executors (Executors run in parallel and have much more memory)
+ Both at the driver and the executors

Problems with cheap hardware are:
+ failures.
+ network speeds versus shared memory.
+ much more latency.
+ network slower than storage.
+ uneven performance.

## Spark Capabilities

+ Performance
   + First, use RAM
   + Also, be smarter
   
+ Ease of Use
   + Python, Scala, Java first class citizens
   
+ New Paradigms
   + SparkSQL
   + Streaming
   + MLib
   + GraphX
   + …more

![image2](image2.PNG)

## Spark Formula
1. Create/Load RDD

2. Transform RDD

3. But don’t do anything yet!

4. Perform Actions that return data

Simple example:

In [1]:
from pyspark import SparkConf
from pyspark.context import SparkContext
sc = SparkContext.getOrCreate(SparkConf())
lines_rdd = sc.textFile ("Complete_Shakespeare.txt")
lines_rdd.count()

124787

### Spark Context
The first thing a Spark program requires is a context, which interfaces with some kind of cluster to use. Our
pyspark shell provides us with a convenient `sc`, using the local filesystem, to start. Your standalone programs
will have to specify one:

```PYTHON
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("Test_App")
sc = SparkContext(conf = conf)
```

In [7]:
lines_rdd = sc.textFile ("Complete_Shakespeare.txt")
QueenLines_rdd = lines_rdd.filter(lambda line: "queen" in line)
QueenLines_rdd.count()

192

In [8]:
QueenLines_rdd.first()

'  As on the finger of a throned queen,'

### Lambdas
A lambda is simply a function that is too simple to deserve its own subroutine.
Anywhere we have a lambda we could also just name a real subroutine that could go off and do anything.

When all you want to do is see if “given an input variable line, is “stanford” in there?”, it isn’t worth the
digression.

Most modern languages have adopted this nicety.

## Common Transformations

![image3](image3.PNG)

Click [here]("http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD") for full list.

# Common Actions

![image4](image4.PNG)

Clich [here]( http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD) for full list.

## Pair RDDs

+ Key/Value organization is a simple, but often very efficient schema, as we mentioned in our NoSQL discussion.
+  Spark provides special operations on RDDs that contain key/value pairs. They are similar to the general ones that we have seen.
+  On the language (Python, Scala, Java) side key/values are simply tuples. If you have an RDD all of whose elements happen to be tuples of two items, it is a Pair RDD and you can use the key/value operations that follow.

## Pair RDD Transformations

![image5](image5.PNG)

<mark> Note </mark>: All the regular transformations are also available.

## Pair RDD Actions

As with transformations, all of the regular actions are available to Pair RDDs, and there
are some additional ones that can take advantage of key/value structure.

![image6](image6.PNG)

Click [here]( http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD) for full list.

## Two Pair RDD Transformations

![image7](image7.PNG)

Click [here]( http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD) for full list.

## Joins

Any database designer can tell you how common joins are. Let's look at a simple
example. We have (here we create it) an RDD of our top purchasing customers.

And an RDD with all of our customers' addresses.

To create a mailing list of special coupons for those favored customers we can use a
join on the two datasets.

In [9]:
best_customers_rdd = sc.parallelize([("Joe", "$103"), ("Alice", "$2000"), ("Bob", "$1200")])

In [11]:
customer_addresses_rdd = sc.parallelize([("Joe", "23 State St."), ("Frank", "555 Timer Lane"), ("Sally", "44Forest Rd."), ("Alice", "3 Elm Road"), ("Bob", "88 West Oak")])

In [12]:
 promotion_mail_rdd = best_customers_rdd.join(customer_addresses_rdd)

In [14]:
promotion_mail_rdd.collect()
[('Bob', ('$1200', '88 West Oak')), ('Joe', ('$103', '23 State St.')), ('Alice', ('$2000', '3 Elm Road'))]

[('Bob', ('$1200', '88 West Oak')),
 ('Joe', ('$103', '23 State St.')),
 ('Alice', ('$2000', '3 Elm Road'))]

# Shakespeare, data analysis

1. Count the number of lines
2. Count the number of words (hint: Python "split" is a workhorse)
3. Count unique words
4. Count the occurrence of each word
5. Show the top 5 most frequent words

In [15]:
lines_rdd = sc.textFile("Complete_Shakespeare.txt")

In [16]:
lines_rdd.count()

124787

In [17]:
words_rdd = lines_rdd.flatMap(lambda x: x.split())
words_rdd.count()

904061

In [18]:
words_rdd.distinct().count()

67779

In [19]:
key_value_rdd = words_rdd.map(lambda x: (x,1))

In [20]:
key_value_rdd.take(5)

[('The', 1), ('Project', 1), ('Gutenberg', 1), ('EBook', 1), ('of', 1)]

In [21]:
word_counts_rdd = key_value_rdd.reduceByKey(lambda x,y: x+y)
word_counts_rdd.take(5)

[('The', 3977),
 ('Project', 85),
 ('Gutenberg', 26),
 ('EBook', 2),
 ('of', 15649)]

In [22]:
flipped_rdd = word_counts_rdd.map(lambda x: (x[1],x[0]))
flipped_rdd.take(5)

[(3977, 'The'),
 (85, 'Project'),
 (26, 'Gutenberg'),
 (2, 'EBook'),
 (15649, 'of')]

In [23]:
results_rdd = flipped_rdd.sortByKey(False)
results_rdd.take(5)

[(23407, 'the'), (19540, 'I'), (18358, 'and'), (15682, 'to'), (15649, 'of')]

## Optimizations

We said one of the advantages of Spark is that we can control things for better
performance. There are a multitude of optimization, performance, tuning and
programmatic features to enable better control. We quickly look at a few of the most
important.
+ Persistence
+ Partitioning
+ Parallel Programming Capabilities
+ Performance and Debugging Tools

## Persistence

+ Lazy evaluation implies by default that all the RDD dependencies will be computed when we call an action on that RDD.
+ If we intend to use that data multiple times (say we are filtering some log, then dumping the results, but we will analyze it further) we can tell Spark to persist the data.
+ We can specify different levels of persistence: `MEMORY_ONLY, MEMORY_ONLY_SER, MEMORY_AND_DISK, MEMORY_AND_DISK_SER, DISK_ONLY`

In [27]:
lines_rdd = sc.textFile("Complete_Shakespeare.txt")

In [28]:
QueenLines_rdd = lines_rdd.filter(lambda line: "queen" in line)

In [30]:
from pyspark import StorageLevel

In [31]:
QueenLines_rdd.persist(StorageLevel.MEMORY_AND_DISK)

PythonRDD[54] at RDD at PythonRDD.scala:53

In [33]:
QueenLines_rdd.count()

192

In [35]:
QueenLines_rdd.first()

'  As on the finger of a throned queen,'

## Partitions

+ Spark distributes the data of your RDDs across its resources. It tries to do some obvious things.
+ With key/value pairs we can help keep that data grouped efficiently.
+ We can create custom partitioners that beat the default (which is probably a hash or maybe range).
+ Use persist() if you have partitioned your data in some smart way. Otherwise it will keep getting re-partitioned.

## Parallel Programming Features

Spark has several parallel programming features that make it easier and more efficient to do operations in parallel in a more explicit way.

Accumulators are variables that allow many copies of a variable to exist on the separate worker nodes.

It is also possible to have replicated data that we would like all the workers to have access to. Perhaps a lookup table of IP addresses to
country codes so that each worker can transform or filter on such information. Maybe we want to exclude all non-US IP entries in our logs.
You might think of ways you could do this just by passing variables, but they would likely be expensive in actual operation (usually requiring
multiple sends). The solution in Spark is to send an (immutable, read only) broadcast variable

![image8](image8.PNG)

## Performance & Debugging

We will give unfortunately short shrift to performance and debugging, which are both
important. Mostly, this is because they are very configuration and application
dependent.
Here are a few things to at least be aware of:
+ SparkConf() class. A lot of options can be tweaked here.
+ Spark Web UI. A very friendly way to explore all of these issues.

## IO Formats

Spark has an impressive, and growing, list of input/output formats it supports. Some important ones:

+ Text
+ CSV
+ SQL type Query/Load
  + JSON (can infer schema)
  + Parquet
  + Hive
  + XML
  + Sequence (Hadoopy key/value)
  + Databases: JDBC, Cassandra, HBase, MongoDB, etc.
+ Compression (gzip…)

And it can interface directly with a variety of filesystems: local, HDFS, Lustre, Amazon S3,...

# Creating DataFrames

A row RDD is the basic way to go from RDD to DataFrame, and back, if necessary. A "row" is just a tuple.

In [36]:
row_rdd = sc.parallelize([ ("Joe","Pine St.","PA",12543), ("Sally","Fir Dr.","WA",78456), ("Jose","Elm Pl.","ND",45698) ])

In [45]:
! pip install spark

Collecting spark
[?25l  Downloading https://files.pythonhosted.org/packages/59/7c/81b89b63927d7b24be7cbd61eabf90a28458bac90ff12531edd56d76ad61/spark-0.2.1.tar.gz (41kB)
[K    100% |████████████████████████████████| 51kB 6.9MB/s 
[?25hBuilding wheels for collected packages: spark
  Running setup.py bdist_wheel for spark ... [?25ldone
[?25h  Stored in directory: /home/jovyan/.cache/pip/wheels/57/a3/24/7c4c81b1cc5e82ad386965d2808543c4f5306b759da26e5629
Successfully built spark
Installing collected packages: spark
Successfully installed spark-0.2.1


In [50]:
from pyspark.sql.session import SparkSession

spark = SparkSession.builder.getOrCreate()

In [51]:
aDataFrameFromRDD = spark.createDataFrame( row_rdd, ["name", "street", "state", "zip"] )

In [52]:
aDataFrameFromRDD.show()

+-----+--------+-----+-----+
| name|  street|state|  zip|
+-----+--------+-----+-----+
|  Joe|Pine St.|   PA|12543|
|Sally| Fir Dr.|   WA|78456|
| Jose| Elm Pl.|   ND|45698|
+-----+--------+-----+-----+

