#![Spark Logo](http://sameerf-dbc-labs.s3-website-us-west-2.amazonaws.com/ta_Spark-logo-small.png)
## A Visual Guide to Spark's API
### Time to complete: 30 minutes
#### This lab will introduce you to using Apache Spark 1.4 with the Python API. We will explore common transformations and actions including
* Actions: Collect, Count, GetNumPartitions, Reduce, Aggregate, Max, Min, Sum, Mean, Variance, Stdev, CountByKey, SaveAsTextFile, 
* Transformations + MISC operations: Map, Filter, FlatMap, GroupBy, GroupByKey, MapPartitions, MapPartitionsWithIndex, Sample, Union, Join, Distinct, Coalese, KeyBy, PartitionBy, Zip


Note that these images were inspired by Jeff Thomson's 67 "PySpark images".

###Collect

Action / To Driver: Return all items in the RDD to the driver in a single list

Start with this action, since it is used in all of the examples.

![](http://i.imgur.com/DUO6ygB.png)

In [3]:
x = sc.parallelize([1,2,3], 2)
y = x.collect()
print(x.glom().collect()) # glom() flattens elements on the same partition
print(y)

## Transformations

Create a new RDD from one or more RDDs

###Map

Transformation / Narrow: Return a new RDD by applying a function to each element of this RDD

![](http://i.imgur.com/PxNJf0U.png)

In [6]:
x = sc.parallelize(["b", "a", "c"])
y = x.map(lambda z: (z, 1))
print(x.collect())
print(y.collect())


####**Try it!** change the indicated line to produce squares of the original numbers

In [8]:
#Lab exercise:

x = sc.parallelize([1,2,3,4])
y = x.map(lambda n: n) #CHANGE the lambda to take a number and returns its square
print(x.collect())
print(y.collect())

#### Filter

Transformation / Narrow: Return a new RDD containing only the elements that satisfy a predicate

![](http://i.imgur.com/GFyji4U.png)

In [10]:
x = sc.parallelize([1,2,3])
y = x.filter(lambda x: x%2 == 1) #keep odd values 
print(x.collect())
print(y.collect())

####**Try it!** Change the sample to keep even numbers

In [12]:
#Lab exercise:
x = sc.parallelize([1,2,3])
y = x.filter(  ) #add a lambda parameter to keep only even numbers
print(x.collect())
print(y.collect())

### FlatMap

Transformation / Narrow: Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results

![](http://i.imgur.com/TsSUex8.png)

In [14]:
x = sc.parallelize([1,2,3])
y = x.flatMap(lambda x: (x, x*100, 42))
print(x.collect())
print(y.collect())


### GroupBy

Transformation / Wide: Group the data in the original RDD. Create pairs where the key is the output of a user function, and the value is all items for which the function yields this key.

![](http://i.imgur.com/gdj0Ey8.png)

In [16]:
x = sc.parallelize(['John', 'Fred', 'Anna', 'James'])
y = x.groupBy(lambda w: w[0])
print [(k, list(v)) for (k, v) in y.collect()]


### GroupByKey

Transformation / Wide: Group the values for each key in the original RDD. Create a new pair where the original key corresponds to this collected group of values.

![](http://i.imgur.com/TlWRGr2.png)

In [18]:
x = sc.parallelize([('B',5),('B',4),('A',3),('A',2),('A',1)])
y = x.groupByKey()
print(x.collect())
print(list((j[0], list(j[1])) for j in y.collect()))


### MapPartitions

Transformation / Narrow: Return a new RDD by applying a function to each partition of this RDD

![](http://i.imgur.com/dw8QOLX.png)

In [20]:
x = sc.parallelize([1,2,3], 2)

def f(iterator): yield sum(iterator); yield 42

y = x.mapPartitions(f)

print(x.glom().collect())
print(y.glom().collect())


### MapPartitionsWithIndex

Transformation / Narrow: Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition

![](http://i.imgur.com/3cGvAF7.png)

In [22]:
x = sc.parallelize([1,2,3], 2)

def f(partitionIndex, iterator): yield (partitionIndex, sum(iterator))

y = x.mapPartitionsWithIndex(f)

print(x.glom().collect())
print(y.glom().collect())


### Sample

Transformation / Narrow: Return a new RDD containing a statistical sample of the original RDD

![](http://i.imgur.com/LJ56nQq.png)

In [24]:
x = sc.parallelize([1, 2, 3, 4, 5])
y = x.sample(False, 0.4, 42)
print(x.collect())
print(y.collect())


### Union

Transformation / Narrow: Return a new RDD containing all items from two original RDDs. Duplicates are not culled.

![](http://i.imgur.com/XFpbqZ8.png)

In [26]:
x = sc.parallelize([1,2,3], 2)
y = sc.parallelize([3,4], 1)
z = x.union(y)
print(z.glom().collect())


### Join

Transformation / Wide: Return a new RDD containing all pairs of elements having the same key in the original RDDs

![](http://i.imgur.com/YXL42Nl.png)

In [28]:
x = sc.parallelize([("a", 1), ("b", 2)])
y = sc.parallelize([("a", 3), ("a", 4), ("b", 5)])
z = x.join(y)
print(z.collect())


####**Try it!** Join the RDDs so that each company's name and stock price are collected into a tuple value, whose key is the company ticker symbol.

In [30]:
x = sc.parallelize([("TWTR", "Twitter"), ("GOOG", "Google"), ("AAPL", "Apple")])
y = sc.parallelize([("TWTR", 36), ("GOOG", 532), ("AAPL", 127)])

#Add code here to perform a join and print the result

### Distinct

Transformation / Wide: Return a new RDD containing distinct items from the original RDD (omitting all duplicates)

![](http://i.imgur.com/Vqgy2a4.png)

In [32]:
x = sc.parallelize([1,2,3,3,4])
y = x.distinct()

print(y.collect())


### Coalesce

Transformation / Narrow or Wide: Return a new RDD which is reduced to a smaller number of partitions

![](http://i.imgur.com/woQiM7E.png)

In [34]:
x = sc.parallelize([1, 2, 3, 4, 5], 3)
y = x.coalesce(2)
print(x.glom().collect())
print(y.glom().collect())


### KeyBy

Transformation / Narrow: Create a Pair RDD, forming one pair for each item in the original RDD. The pair?s key is calculated from the value via a user-supplied function.

![](http://i.imgur.com/nqYhDW5.png)

In [36]:
x = sc.parallelize(['John', 'Fred', 'Anna', 'James'])
y = x.keyBy(lambda w: w[0])
print y.collect()

####**Try it!** Create an RDD from this list, and then use .keyBy to create a pair RDD where the state abbreviation is the key and the city + state is the value (e.g., ("NY", "New York, NY")) ... For extra credit, add a .map that strips out the redundant state abbreviation to yield pairs like ("NY", "New York").

In [38]:
data = ["New York, NY", "Philadelphia, PA", "Denver, CO", "San Francisco, CA"]
# Add code to parallelize the list to an RDD
# call .keyBy on the RDD to create an RDD of pairs



### PartitionBy

Transformation / Wide: Return a new RDD with the specified number of partitions, placing original items into the partition returned by a user supplied function

![](http://i.imgur.com/QHDWwYv.png)

In [40]:
x = sc.parallelize([('J','James'),('F','Fred'), ('A','Anna'),('J','John')], 3)

y = x.partitionBy(2, lambda w: 0 if w[0] < 'H' else 1)

print x.glom().collect()
print y.glom().collect()

### Zip

Transformation / Narrow: Return a new RDD containing pairs whose key is the item in the original RDD, and whose value is that item?s corresponding element (same partition, same index) in a second RDD

![](http://i.imgur.com/5J0lg6g.png)

In [42]:
x = sc.parallelize([1, 2, 3])
y = x.map(lambda n:n*n)
z = x.zip(y)

print(z.collect())

## Actions

Calculate a result (e.g., numeric data or creata a non-RDD data structure), or produce a side effect, such as writing output to disk

### GetNumPartitions

Action / To Driver: Return the number of partitions in RDD

![](http://i.imgur.com/9yhDsVX.png)

In [45]:
x = sc.parallelize([1,2,3], 2)
y = x.getNumPartitions()

print(x.glom().collect())
print(y)

### Reduce

Action / To Driver: Aggregate all the elements of the RDD by applying a user function pairwise to elements and partial results, and return a result to the driver

![](http://i.imgur.com/R72uzwX.png)

In [47]:
x = sc.parallelize([1,2,3,4])
y = x.reduce(lambda a,b: a+b)

print(x.collect())
print(y)

### Aggregate

Action / To Driver: Aggregate all the elements of the RDD by: 
  - applying a user function to combine elements with user-supplied objects, 
  - then combining those user-defined results via a second user function, 
  - and finally returning a result to the driver.
  
![](http://i.imgur.com/7MLnYeh.png)

In [49]:
seqOp = lambda data, item: (data[0] + [item], data[1] + item)
combOp = lambda d1, d2: (d1[0] + d2[0], d1[1] + d2[1])

x = sc.parallelize([1,2,3,4])

y = x.aggregate(([], 0), seqOp, combOp)

print(y)

####**Try it!** Can you use .aggregate to collect the inputs into a plain list -- so that the output of your .aggregate is just like that of .collect? How about producing a plain total, just like .sum? What does that tell you about the amount of data returned from .aggregate?

In [51]:
x = sc.parallelize([1,2,3,4])

#define appropriate seqOp and combOp
seqOp = lambda #...
combOp = lambda #... 

y = x.aggregate(  ) #add correct parameters

# these two lines should produce the same thing
print(x.collect())
print(y)


### Max, Min, Sum, Mean, Variance, Stdev

Action / To Driver: Compute the respective function (maximum value, minimum value, sum, mean, variance, or standard deviation) from a numeric RDD

![](http://i.imgur.com/HUCtib1.png)

In [53]:
x = sc.parallelize([2,4,1])
print(x.collect())
print(x.max(), x.min(), x.sum(), x.mean(), x.variance(), x.stdev())

### CountByKey

Action / To Driver: Return a map of keys and counts of their occurrences in the RDD

![](http://i.imgur.com/jvQTGv6.png)

In [55]:
x = sc.parallelize([('J', 'James'), ('F','Fred'), 
                    ('A','Anna'), ('J','John')])

y = x.countByKey()
print(y)

### SaveAsTextFile

Action / Distributed: Save the RDD to the filesystem indicated in the path

![](http://i.imgur.com/Tb2Q9mG.png)

In [57]:
dbutils.fs.rm("/temp/demo", True)
x = sc.parallelize([2,4,1])
x.saveAsTextFile("/temp/demo")

y = sc.textFile("/temp/demo")
print(y.collect())
