#### Intrduction to Spark
* At a high level, every Spark application consists of a driver program that runs the user’s main function and executes various parallel operations on a cluster. 
* The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. 
* RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. 
* Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. 
* Finally, RDDs automatically recover from node failures.

#### SparkContext
* Handler to Spark Cluster
* Useful for creating RDD

In [1]:
#Available by default in interactive environment
sc

conf = SparkConf().setAppName('anyappname').setMaster('localhost')
sc = SparkContext(conf=conf)

## RDD
* A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. 
* Represents an immutable, partitioned collection of elements that can be operated on in parallel.

### Two ways to create RDD
* parallelize - from collection
* textFile - from external file

#### Parallelized Collections

In [1]:
rdd = sc.parallelize([1,2,3,4,5],2)

In [3]:
rdd.collect()

[1, 2, 3, 4, 5]

#### External Datsets

In [4]:
baby_names = sc.textFile('Baby_Names__Beginning_2007.csv')

### Basics of RDD

In [1]:
lines = sc.textFile('Baby_Names__Beginning_2007.csv')

#Prints the first element
print lines.first()
#Each element of is one line


Year,First Name,County,Sex,Count


In [5]:
#returns first 5 elemets
lines.take(5)

[u'Year,First Name,County,Sex,Count',
 u'2013,GAVIN,ST LAWRENCE,M,9',
 u'2013,LEVI,ST LAWRENCE,M,9',
 u'2013,LOGAN,NEW YORK,M,44',
 u'2013,HUDSON,NEW YORK,M,49']

In [8]:
#length of first 5 elements
lines.map(lambda s: len(s)).take(5)

[32, 26, 25, 24, 25]

In [2]:
#returns total number of characters
rdd = lines.map(lambda s: len(s))
rdd = rdd.map(lambda s: 2*s)
print rdd.reduce( lambda a,b: a+b)

2424036


#### Key-Value Pairs RDD

In [7]:
rdd = sc.parallelize(["hello","world","good","hello"])

In [8]:
rdd = rdd.map(lambda w: (w,1))
rdd.collect()

[('hello', 1), ('world', 1), ('good', 1), ('hello', 1)]

* value corresponding to same key undergoes lambda operation
* Note: Any function which has (key,value) pair can be worked on by {Any}ByKey

In [21]:
rdd.reduceByKey(lambda x,y:x+y).collect()

[('world', 1), ('good', 1), ('hello', 2)]

#### Transformation
* Eg - map, filter, flatMap ...
* Changes data from one format to another
* Lazy execution - Delays execution untill finds an 'Action' so that it can prepare optimized lineage ( spark internal code pipeline )

#### Actions
* Eg - count, collect, reduce ...
* Trigger execution of pipeline

### Transformation

* map(func)	Return a new distributed dataset formed by passing each element of the source through a function func.
* filter(func)	Return a new dataset formed by selecting those elements of the source on which func returns true.
* flatMap(func)	Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
* mapPartitions(func)	Similar to map, but runs separately on each partition (block) of the RDD.
* mapPartitionsWithIndex(func)	Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, 
* sample(withReplacement, fraction, seed)	Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
* union(otherDataset)	Return a new dataset that contains the union of the elements in the source dataset and the argument.
* intersection(otherDataset)	Return a new RDD that contains the intersection of elements in the source dataset and the argument.
* distinct([numTasks]))	Return a new dataset that contains the distinct elements of the source dataset.
* groupByKey([numTasks])	When called on a dataset of (K, V) pairs, 
* reduceByKey(func, [numTasks])	When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
* aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])	When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
* sortByKey([ascending], [numTasks])	When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
* join(otherDataset, [numTasks])	When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.
* cogroup(otherDataset, [numTasks])	When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith.
* cartesian(otherDataset)	When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
* pipe(command, [envVars])	Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings.
* coalesce(numPartitions)	Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
* repartition(numPartitions)	Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
* repartitionAndSortWithinPartitions(partitioner)	Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.


#### Action

* reduce(func)	Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
* collect()	Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
* count()	Return the number of elements in the dataset.
* first()	Return the first element of the dataset (similar to take(1)).
* take(n)	Return an array with the first n elements of the dataset.
* takeSample(withReplacement, num, [seed])	Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
* takeOrdered(n, [ordering])	Return the first n elements of the RDD using either their natural order or a custom comparator.
* saveAsTextFile(path)	Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
* saveAsSequenceFile(path) 
(Java and Scala)	Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
* saveAsObjectFile(path) 
(Java and Scala)	Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile().
* countByKey()	Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
* foreach(func)	Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems. 
Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.


##### Note : We will dig deeper into many of these functions in a while

### Shuffle Operations
* Many operations in spark trigger shuffle .i.e movement of data across one one to another.
* Data movement is expensive & should be as less as possible

In [22]:
#creates 2 partition
rdd = sc.parallelize(["hello","world","good","hello"],2)

In [23]:
#glom - returns data in one partition in list
rdd.glom().collect()

[['hello', 'world'], ['good', 'hello']]

In [24]:
rdd = rdd.map(lambda w:(w,1))

In [25]:
rdd.glom().collect()

[[('hello', 1), ('world', 1)], [('good', 1), ('hello', 1)]]

In [28]:
#reduceByKey - generates a new RDD where all the values of same key are tupled
rdd.reduceByKey(lambda a,b:(a,b)).collect()

[('world', 1), ('good', 1), ('hello', (1, 1))]

* The above operation brings all data with same key in one node
* This operation causes data shuffling

#### Note - We can reduce shuffle using groupBykey ( we will see soon )

## RDD Opearations
<hr/>

#### aggregate
* Aggregate the elements of each partition.
* Aggregate the result of each partition
* 'zero_value' is default init value

In [30]:
seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))

print sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)

print sc.parallelize([]).aggregate((0, 0), seqOp, combOp)


(10, 4)
(0, 0)


#### aggregateByKey
* seqOp works on each partition
* combOp works on result of each partitions
* ByKey causes operations on data with same key

In [32]:
seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))

print sc.parallelize([('hello',1), ('good',2), ('hello',3), ('food',4)]).aggregateByKey((0, 0), seqOp, combOp).collect()

[('food', (4, 1)), ('good', (2, 1)), ('hello', (4, 2))]


#### cache
* Prevent re-computation of RDD
* In-memory caching wherever computation is happening

In [33]:
#Using cache() persists 
rdd = sc.parallelize(range(10000)) #first time this line will be executed
rdd.cache()
rdd1 = rdd.map(lambda x: x+2)
rdd2 = rdd.map(lambda x:x+3)
print rdd1.count()
print rdd2.count()

#Remove data from chache
rdd.unpersist()

10000
10000


ParallelCollectionRDD[71] at parallelize at PythonRDD.scala:480

### Set Opeartion

* cartesian
* union
* intersection


In [34]:
rdd1 = sc.parallelize(range(1,10))
rdd2 = sc.parallelize(range(11,20))
rdd3 = sc.parallelize(range(5,10))

In [37]:
rdd1.cartesian(rdd2).take(5)

[(1, 11), (1, 12), (1, 13), (1, 14), (1, 15)]

In [38]:
rdd1.intersection(rdd3).collect()

[8, 6, 9, 5, 7]

#### checkpoint
* Checkpoint current data of RDD
* Need to first set dir, where data will be persisted.

In [84]:
sc.setCheckpointDir('ckpt')
rdd.checkpoint()

#### coalesce
* Return a new rdd with reduced partitions

In [40]:
sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()

[[1], [2, 3], [4, 5]]

In [41]:
sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()

[[1, 2, 3, 4, 5]]

#### cogroup
* For each key k in self or other, return a resulting RDD that contains a tuple with the list of values for that key in self as well as other.

In [43]:
x = sc.parallelize([("a", 1), ("b", 4),("a",9)])
y = sc.parallelize([("a",5)])

In [44]:
new_rdd = x.cogroup(y)

In [46]:
new_rdd.collect()

[('a',
  (<pyspark.resultiterable.ResultIterable at 0x7ff80f98f150>,
   <pyspark.resultiterable.ResultIterable at 0x7ff80f8e8e50>)),
 ('b',
  (<pyspark.resultiterable.ResultIterable at 0x7ff80f8e8050>,
   <pyspark.resultiterable.ResultIterable at 0x7ff80f8e8690>))]

In [47]:
[(a, map(list,b)) for a, b in new_rdd.collect()]

[('a', [[1, 9], [5]]), ('b', [[4], []])]

### collect
* Gets data from all executor nodes to driver
* Should be avoid if data is large

In [49]:
rdd = sc.parallelize([1,2,3,4],2)
rdd.collect()

[1, 2, 3, 4]

#### collectAsMap
* Works only on PairRDD
* Gets data to driver

In [56]:
m = sc.parallelize([(1, 2), (3, 4),(1,6)]).collectAsMap()

In [57]:
m

{1: 6, 3: 4}

#### countByValue
* Returns a dict with value & counter corresponding to it

In [58]:
sc.parallelize(['a','c','a','d','c']).countByValue()

defaultdict(int, {'a': 2, 'c': 2, 'd': 1})

#### combineByKey

In [73]:
# In MyStr
# In MyStr
# In MyConcat
# In MyConcat
# In MyStr
# In MyStr
# In MyConcat
# In MyConcat
# In myPartConcat

#Invoked per partition first time a key appears, d is the corresponding value 
def mystr(d):
    print 'In MyStr'
    return d

# 2nd time & onwards for same key in same partition
def myconcat(a,b):
    print 'In MyConcat'
    return a + b

#Works across partitions
def mypartConcat(a,b):
    print 'In myPartConcat'
    return a + b

rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2),("a",8),("c",4), ("a", 12),("a",18),("c",14)],2)

#mystr - this converts the V into of type C
rdd.combineByKey(mystr, myconcat, mypartConcat).collect()

[('a', 41), ('c', 18), ('b', 1)]

In [74]:
#Invoked per partition first time a key appears, d is the corresponding value 
def mystr(d):
    print 'In MyStr'
    return (d,1)

# 2nd time & onwards for same key in same partition
def myconcat(a,b):
    print 'In MyConcat'
    return (a[0] + b,a[1]+1)

#Works across partitions
def mypartConcat(a,b):
    print 'In myPartConcat'
    return (a[0] + b[0],a[1] + b[1])

#mystr - this converts the V into of type C
data = rdd.combineByKey(mystr, myconcat, mypartConcat)
data.map(lambda x: (x[0], x[1][0]/(x[1][1]*1.0))).collect()

[('a', 8.2), ('c', 9.0), ('b', 1.0)]

#### countByKey
* Count the number of elements for each key, and return the result to the master as a dictionary.

In [64]:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 3)])

In [65]:
rdd.countByKey()

defaultdict(int, {'a': 2, 'b': 1})

#### distinct
* Return a new RDD containing the distinct elements in this RDD.

In [66]:
sc.parallelize([1, 1, 2, 3]).distinct().collect()

[1, 2, 3]

#### filter
* selecting data conditionally

In [68]:
rdd = sc.parallelize(range(20))
rdd.filter(lambda x: x%2 != 0).map(lambda x: x*2).collect()

[2, 6, 10, 14, 18, 22, 26, 30, 34, 38]

#### first
* returns first element of rdd

In [69]:
rdd.first()

0

#### flatMap
* Convert 1 data to N data

In [72]:
rdd = sc.parallelize([5,6,7,8])
rdd.flatMap(lambda d: (d,d+1,d+2)).collect()

[5, 6, 7, 6, 7, 8, 7, 8, 9, 8, 9, 10]

#### foreach
* Applies a function to all elements of this RDD.

In [75]:
def f(e):
    print e
    
#This print happens in each executor & not on driver
#Chk on console if running on linux/aws
sc.parallelize([1, 2, 3, 4, 5]).foreach(f)

#### foreachPartition
* Applies function for each partition

In [80]:
def f(iterator):
    for x in iterator:
        print(x,)
    print ('\n Next Partition')
        
sc.parallelize([11, 12, 13, 14, 15],2).foreachPartition(f)

#### getNumPartitions
* Returns number of partitions data is broken down into

In [85]:
rdd = sc.parallelize([11, 12, 13, 14, 15],2)
rdd.getNumPartitions()

2

### StorageLevel

* MEMORY_ONLY	Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.
* MEMORY_AND_DISK	Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.
* DISK_ONLY	Store the RDD partitions only on disk.
* MEMORY_ONLY_2, MEMORY_AND_DISK_2	Same as the levels above, but replicate each partition on two cluster nodes.

#### getStorageLevel
* return rdd storage location

In [96]:
import pyspark

rdd1 = sc.parallelize([1,2])
rdd1.persist(storageLevel=pyspark.StorageLevel.MEMORY_AND_DISK)
print(rdd1.getStorageLevel())

Disk Memory Serialized 1x Replicated


#### randomSplit
* Split rdd elements into two parts

In [99]:
rdd = sc.parallelize(range(500), 1)
rdd1, rdd2 = rdd.randomSplit([2, 3], 17)
rdd1.count()

213

In [100]:
rdd2.count()

287

#### reduce
* Take two data & return one

In [102]:
from operator import add
sc.parallelize([1, 2, 3, 4, 5]).reduce(add)

15

In [103]:
sc.parallelize([1, 2, 3, 4, 5]).reduce(lambda a,b:a+b)

15

#### reduceByKey
* Merge the values for each key using an associative and commutative reduce function.
* This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce.

In [104]:
from operator import add
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.reduceByKey(add).collect())

[('a', 2), ('b', 1)]

#### repartition
* Return a new RDD that has exactly numPartitions partitions.
* Can increase or decrease the level of parallelism in this RDD. 
* Internally, this uses a shuffle to redistribute data. 
* If you are decreasing the number of partitions in this RDD, consider using coalesce, which can avoid performing a shuffle.

In [105]:
rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
rdd.glom().collect()
rdd.repartition(2).glom().collect()

[[1, 2, 3, 4, 5, 6, 7], []]

In [106]:
rdd.repartition(10).glom().collect()

[[], [1], [], [], [], [6, 7], [2, 3], [], [], [4, 5]]

#### saveAsTextFile
* save rdd into a text file

In [107]:
sc.parallelize(range(10)).saveAsTextFile('abc.txt')

#### sortBy
* Sorts this RDD by the given keyfunc

In [108]:
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]

In [110]:
sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()

[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]

#### sortByKey
* Sort based on keys

In [111]:
tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])

In [117]:
sc.parallelize(tmp2).sortByKey(ascending=True, numPartitions=3, keyfunc=lambda k: k.lower()).glom().collect()

[[('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5)],
 [('little', 4), ('Mary', 1), ('was', 8)],
 [('white', 9), ('whose', 6)]]

#### subtract
* subtracts only when key value pair is same

In [120]:
x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
y = sc.parallelize([("a", 3), ("c", None)])
x.subtract(y).collect()

[('b', 5), ('a', 1), ('b', 4)]

#### subtractByKey
* Subtracts from the first rdd all the elements with the key present in second one

In [122]:
x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)])
y = sc.parallelize([("a", 3), ("c", None)])
x.subtractByKey(y).collect()

[('b', 4), ('b', 5)]

#### take
* return first n elements

In [123]:
sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)

[2, 3]

#### takeordered
* Get the N elements from an RDD ordered in ascending order or as specified by the optional key function.

In [124]:
sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x)

[10, 9, 7, 6, 5, 4]

#### toDebugString
* A description of this RDD and its recursive dependencies for debugging.

In [125]:
rdd.toDebugString()

'(4) ParallelCollectionRDD[189] at parallelize at PythonRDD.scala:480 []'

#### values
* Return value of each key-value pair

In [126]:
m = sc.parallelize([(1, 2), (3, 4)]).values()

In [127]:
m.collect()

[2, 4]

#### zip
* zips one rdd with another

In [128]:
x = sc.parallelize(range(0,5))
y = sc.parallelize(range(1000, 1005))

In [129]:
x.zip(y).collect()

[(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]

#### zipWithIndex
* zip rdd element with index

In [130]:
sc.parallelize(["a", "b", "c", "d"], 3).zipWithIndex().collect()

[('a', 0), ('b', 1), ('c', 2), ('d', 3)]