# Learning Apache Spark
Based on readings of the book, "Learning Spark : Lightning-Fast Big Data Analysis", by Karau, Holden, et al. , O'Reilly Media

## Spark Stack
![Spark stack](SparkStack.png)
__Spark Core__ contains the basic functionality of Spark, including components for task
scheduling,  memory  management,  fault  recovery,  interacting  with  storage  systems,
and more. Spark Core is also home to the API that defines resilient distributed data‐
sets  (RDDs),  which  are  Spark’s  main  programming  abstraction.

__Cluster Managers__ Spark is designed to efficiently scale up from one to many thousands of compute nodes. To achieve this while maximizing flexibility, Spark can run over a variety of cluster managers (e.g., Hadoop YARN, Apache Mesos). A simple
cluster manager is included in Spark called the Standalone Scheduler.

![Spark: Master-Slave](ApacheSpark.png)
The `SparkContext` 
connects to the Spark cluster manager, which then allocates resources across the 
worker nodes for the application. The cluster manager allocates executors across  
the cluster worker nodes. It copies the application jar file to the workers, and finally 
it allocates tasks.



## RDD Basics
An RDD (__R__ esilient __D__ istributed __D__ ataset) is simply an immutable distributed collection of objects. In
Spark  all  work  is  expressed  as one of these three steps:
1. creating  new  RDDs 
2. transforming  existing RDDs (__Transformations__) 
3. calling operations (__Actions__) on RDDs to compute a result. 

Under the hood, Spark automatically distributes the data contained in RDDs across a cluster and parallelizes the operations one performs on them. Each RDD is split into multiple partitions, which may be stored (and manipulated) on different nodes of the cluster. RDDs can contain any type of Python object, including user-defined classes.

### Transformations vs. Actions
Transformations and actions are different because of the way Spark computes RDDs. Although you can define new RDDs any time, Spark computes them only in a lazy fashion - that is, the first time they are used in an action.

In [1]:
##### These lines are to tell jupyter where to find Apache Spark ####
import findspark
findspark.init('/Users/subhayuchakravarty/spark-2.4.4-bin-hadoop2.7')
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("My App")
# There are two configurable parameters
# 1. A cluster URL, namely  local  in this example, which tells Spark how to connect
# to a cluster.  local  is a special value that runs Spark on one thread on the local
# machine, without connecting to a cluster.
# 2. An application name, namely  My App  in this example. This will identify your
# application on the cluster manager’s UI if you connect to a cluster.
sc = SparkContext(conf = conf)
##### These lines are to tell jupyter where to find Apache Spark ####

inputRDD = sc.textFile("log.txt")
## Transformations - All create new RDDs
# Get errors
errorsRDD = inputRDD.filter(lambda x: "Error" in x)
# Get Warnings
warningsRDD = inputRDD.filter(lambda x: "Warning" in x)
# Combine them (union)
badLinesRDD = errorsRDD.union(warningsRDD)

## Lets do some actions 
# action 1: count
print("Input had ", badLinesRDD.count(), " concerning lines")
print("Here are 2 examples:")
# action 2: take (gets 2 lines)
for line in badLinesRDD.take(2):
    print(line)
print("Here is the complete dump:")
# action 3: collect (gets all lines) ## use only for testing
for line in badLinesRDD.collect():
    print(line)

Input had  5  concerning lines
Here are 2 examples:
1/1/18: Error - Blah
1/1/18: Error - Doh
Here is the complete dump:
1/1/18: Error - Blah
1/1/18: Error - Doh
1/3/18: Error - Blah  di Blah


## Another way to create an RDD
The simplest way to create RDDs is to take an existing collection in your program and  pass  it  to  SparkContext’s  `parallelize()`   method. This approach is very useful when you are learning Spark, since you can quickly create your own RDDs in the shell and perform operations on them. Outside of prototyping and testing, this is not widely used since it requires that you have your entire dataset in memory on one machine.

In [2]:
nums = sc.parallelize([1, 2, 3, 4])

## Common Transformations

### Element-wise transformations
### `map()` vs `filter()`

`map()` Takes a function and performs it on each element.

`filter()` Takes a function and returns an RDD that only has elements that pass the filter function

In [4]:
#map example
nums = sc.parallelize([1, 2, 3, 4])
squared = nums.map(lambda x: x * x).collect()
for num in squared:
    print ("%i " % (num))

1 
4 
9 
16 


In [5]:
#filter example
nums = sc.parallelize([1, 2, 3, 4])
filteredNums = nums.filter(lambda x: x !=3).collect()
for num in filteredNums:
    print ("%i " % (num))

1 
2 
4 


### `map` vs `flatMap`
You  can think of  `flatMap()`  as “flattening” the iterators returned to it, so that instead of ending up with an RDD of lists we have an RDD of the elements in those lists.

In [6]:
data = sc.parallelize(["coffee panda", "happy panda", "happiest panda party"])
mappedRDD = data.map(lambda line: line.split(" "))
flatMappedRDD = data.flatMap(lambda line: line.split(" "))
mappedRDD.collect()

[['coffee', 'panda'], ['happy', 'panda'], ['happiest', 'panda', 'party']]

In [7]:
flatMappedRDD.collect()

['coffee', 'panda', 'happy', 'panda', 'happiest', 'panda', 'party']

### Pseudo Set Operations
These are still transformations
 1. Distinct
 2. Union
 3. Intersection
 4. Subtract

In [8]:
rdd1 = sc.parallelize(["coffee", "coffee", "panda", "monkey", "tea"])
rdd2 = sc.parallelize(["coffee", "monkey", "kitty"])
distinctRDD = rdd1.distinct()
distinctRDD.collect()

['panda', 'coffee', 'tea', 'monkey']

In [9]:
unionRDD = rdd1.union(rdd2)
unionRDD.collect()

['coffee', 'coffee', 'panda', 'monkey', 'tea', 'coffee', 'monkey', 'kitty']

In [10]:
intersectionRDD = rdd1.intersection(rdd2)
intersectionRDD.collect()

['coffee', 'monkey']

In [11]:
subtractRDD = rdd1.subtract(rdd2)
subtractRDD.collect()

['panda', 'tea']

## Common  Actions
`reduce()` is the most common action, which takes a function that operates on two elements of the type in your RDD and returns a new element of the same type (`reduce(func)`)

Note that reduce() requires that the return type of our result be the same type as that of the elements in the RDD we are operating over.

The  `aggregate()`  function frees us from the constraint of having the return be the same type as the RDD we are working on. (`aggregate(zeroValue) (seqOp, combOp)`)

In [12]:
# recall that: nums = sc.parallelize([1, 2, 3, 4])
# nums = sc.parallelize([1, 2, 3, 4])
sum = nums.reduce(lambda x, y: (x+y))
print(sum)

10


In [13]:
sumCount = nums.aggregate((0, 0),
               (lambda acc, value: (acc[0] + value, acc[1] + 1)),
               (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
#print(sumCount)
print(sumCount[0] / float(sumCount[1]))

2.5


## Working with Key-Value pairs
Spark provides special operations on RDDs containing key-value pairs. These RDDs are called pair RDDs.  Pair RDDs are a useful building block in many programs, as they expose operations that allow you to act on each key in parallel or regroup data across the network. For example, pair RDDs have a  `reduceByKey()`  method that can
aggregate  data  separately  for  each  key,  and  a  `join()`   method  that  can  merge  two RDDs  together  by  grouping  elements  with  the  same  key.

In [14]:
pairsRDD = sc.parallelize({(1, 2), (3, 4), (3, 6)})
pairsRDD.collect()

[(1, 2), (3, 4), (3, 6)]

In [15]:
newRDD = sc.parallelize({('joe',1234,'1st ave'),('mary', 2345, '9th ave')})
newRDD.collect()

[('joe', 1234, '1st ave'), ('mary', 2345, '9th ave')]

In [16]:
# Combine values with the same key.
reduceByRDD = pairsRDD.reduceByKey(lambda x, y: x + y)
reduceByRDD.collect() # [(1, 2), (3, 10)]

[(1, 2), (3, 10)]

In [17]:
# Group values with the same key.
groupByRDD = pairsRDD.groupByKey()
groupByRDD.collect() # [(1, [2]), (3, [4, 6])]

[(1, <pyspark.resultiterable.ResultIterable at 0x116ec1860>),
 (3, <pyspark.resultiterable.ResultIterable at 0x116ec1978>)]

In [18]:
# Group values with the same key.
groupByRDDIterate = pairsRDD.groupByKey().map(lambda x : (x[0], list(x[1])))
groupByRDDIterate.collect() # [(1, [2]), (3, [4, 6])]

[(1, [2]), (3, [4, 6])]

In [19]:
# Apply a function to each value of a pair RDD without changing the key.
mapValuesRDD = pairsRDD.mapValues(lambda x: x + 1)
mapValuesRDD.collect() # [(1, 3), (3, 5), (3, 7)]

[(1, 3), (3, 5), (3, 7)]

In [20]:
# Apply a function that returns an iterator to each value of 
# a pair RDD, and for each element returned, produce a key/value
# entry with the old key. Often used for tokenization.
flatmapValuesRDD = pairsRDD.flatMapValues(lambda x: range(x,6))
flatmapValuesRDD.collect() 

[(1, 2), (1, 3), (1, 4), (1, 5), (3, 4), (3, 5)]

In [21]:
# keys only RDD
keysRDD = pairsRDD.keys() # keysRDD has [1, 3, 3]
# values only RDD
valuesRDD = pairsRDD.values() # valuesRDD has [2, 4, 6]
valuesRDD.collect()

[2, 4, 6]

In [22]:
# Per-key average (sum and count) with reduceByKey() and mapValues() in Python
perKeyAvg = pairsRDD.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
perKeyAvg.collect()

[(1, (2, 1)), (3, (10, 2))]

## Transformations on two pair RDDs


In [23]:
otherPairsRDD = sc.parallelize({(3, 9)})
# Remove elements with a key present in the other RDD.
diffRDD = pairsRDD.subtractByKey(otherPairsRDD)
diffRDD.collect() # [(1, 2)]

[(1, 2)]

### Inner Join
Some  of  the  most  useful  operations  we  get  with  keyed  data  comes  from  using  it together  with  other  keyed  data.  Joining  data  together  is  probably  one  of  the  most common operations on a pair RDD. 

Only keys that are present in __both__ pair RDDs  are  output.  When  there  are  multiple  values  for  the  same key  in  one  of  the
inputs,  the  resulting  pair  RDD  will  have  an  entry  for  every  possible  pair  of  values with that key from the two input RDDs.

In [24]:
# Perform an inner join between two RDDs.
innerJoinRDD = pairsRDD.join(otherPairsRDD)
innerJoinRDD.collect()

[(3, (4, 9)), (3, (6, 9))]

If we want to lift the restriction that __both__ pair RDDS must have the key we can use a `leftOuterJoin`

In [25]:
# Perform a left Outer join between two RDDs.
leftOuterJoinRDD = pairsRDD.leftOuterJoin(otherPairsRDD)
leftOuterJoinRDD.collect()

[(1, (2, None)), (3, (4, 9)), (3, (6, 9))]

# Wordcount in Apache Spark

In [26]:
text = "to be or not to be".split()
rdd = sc.parallelize(text)
rdd.collect()

['to', 'be', 'or', 'not', 'to', 'be']

In [27]:
result = rdd.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
result.collect()

[('to', 2), ('be', 2), ('or', 1), ('not', 1)]