# Spark in Motion

## Unit 1: AN Introduction to Apache Spark

### Module 6: Using RDDs Part 2: Transformations and Actions

<br>
#### with Jason Kolter

## Transformations 
<img style="margin:0px auto;display:block;background: rgba(238, 238, 238, .5);height:550px; width:auto" src="images/1.6/transformers-1333083_1920.jpg">

### Filter
<img style="margin:0px auto;display:block;background: rgba(238, 238, 238, .5);" src="images/1.6/filter.png">
##### def filter(f: (T) ⇒ Boolean): RDD[T] 
##### Return a new RDD containing only the elements that satisfy a predicate.

In [47]:
val exampleRDD = sc.parallelize(1 to 20)
exampleRDD.collect

Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)

In [115]:
val filteredRDD = exampleRDD.filter(value => value % 2 == 0)
filteredRDD.collect

Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)

### Set Operations

### Union
<img style="margin:0px auto;display:block;background: rgba(238, 238, 238, .5);height:250px; width:auto" src="images/1.6/Venn0111.svg">
##### def union(other: RDD[T]): RDD[T] 
##### Return the union of this RDD and another one.

In [50]:
val a = sc.parallelize(1 to 10)
a.collect

Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

In [51]:
val b = sc.parallelize(11 to 20)
b.collect

Array(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)

In [52]:
val aUnionB = a.union(b)
aUnionB.collect

Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)

### Intersection
<img style="margin:0px auto;display:block;background: rgba(238, 238, 238, .5);height:250px; width:auto" src="images/1.6/Venn0001.svg">
##### def intersection(other: RDD[T]): RDD[T] 
##### Return the intersection of this RDD and another one.

In [53]:
val a = sc.parallelize(1 to 10)
a.collect

Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

In [54]:
val b = sc.parallelize(1 to 10 by 2)
b.collect

Array(1, 3, 5, 7, 9)

In [55]:
val aIntersectB = a.intersection(b)
aIntersectB.collect

Array(1, 9, 5, 3, 7)

### Subtraction
<img style="margin:0px auto;display:block;background: rgba(238, 238, 238, .5);height:250px; width:auto" src="images/1.6/Venn0010.svg">
##### def subtract(other: RDD[T]): RDD[T] 
##### Return an RDD with the elements from <b>this</b> that are not in <b>other</b>.

In [56]:
val a = sc.parallelize(1 to 10)
a.collect

Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

In [67]:
val b = sc.parallelize(1 to 20)
b.collect

Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)

In [58]:
val bSubtractA = b.subtract(a)
bSubtractA.collect

Array(16, 20, 12, 17, 13, 18, 14, 19, 11, 15)

### Map
<img style="margin:0px auto;display:block;background: rgba(238, 238, 238, .5);" src="images/1.6/map.png">
##### def map[U]\(f: (T) ⇒ U)(implicit arg0: ClassTag[U]): RDD[U]
##### Return a new RDD by applying a function to all elements of this RDD.

In [116]:
val text = sc.textFile("file:///data/war_and_peace.txt")
text.collect.slice(1200,1202)

Array(guests, first the vicomte and then the abbé, as peculiarly choice, morsels. The group about Mortemart immediately began discussing the)

In [117]:
val upperText = text.map(line => line.toUpperCase())
upperText.collect.slice(1200,1202)

Array(GUESTS, FIRST THE VICOMTE AND THEN THE ABBÉ, AS PECULIARLY CHOICE, MORSELS. THE GROUP ABOUT MORTEMART IMMEDIATELY BEGAN DISCUSSING THE)

### Flat Map
<img style="margin:0px auto;display:block;background: rgba(238, 238, 238, .5);" src="images/1.6/flatMap.png">
##### def flatMap\[U](f: (T) ⇒ TraversableOnce[U])(implicit arg0: ClassTag[U]): RDD[U]
##### Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.

In [119]:
val words = text.flatMap(line => line.split(" "))
words.collect().slice(5000,5025)

Array(personally,", whispered, Anna, Pávlovna, to, one, of, the, guests., "The, vicomte, is, a, wonderful, raconteur,", said, she, to, another., "How, evidently, he, belongs, to, the)

## Pair RDDs
<img style="margin:0px auto;display:block;background: rgba(238, 238, 238, .5);height:450px; width:auto" src="images/1.6/pair_rdd.png">

### Reduce By Key
<img style="margin:0px auto;display:block;background: rgba(238, 238, 238, .5);" src="images/1.6/reduce_by_key.png">
##### def reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)] 
##### Merge the values for each key using an associative and commutative reduce function.

In [1]:
val text = sc.textFile("file:///data/war_and_peace.txt")
val words = text.flatMap(line => line.split(" "))
val wordsPairs = words.map(word => (word,1))
wordsPairs.collect.slice(5000,5025)

Array((personally,",1), (whispered,1), (Anna,1), (Pávlovna,1), (to,1), (one,1), (of,1), (the,1), (guests.,1), ("The,1), (vicomte,1), (is,1), (a,1), (wonderful,1), (raconteur,",1), (said,1), (she,1), (to,1), (another.,1), ("How,1), (evidently,1), (he,1), (belongs,1), (to,1), (the,1))

In [2]:
val counts = wordsPairs.reduceByKey((value1,value2) => value1 + value2)
counts.collect.slice(5000,5025)

Array((service;,1), (populated,1), (moved;,1), (guided,,1), (Tsarevo-Zaymishche,2), (seriously;,1), (Dukes,1), (Arakcheev,,15), (Dnieper—which,1), (endless,10), (profundity,4), (Postulating,1), (diffused,,1), (safely.,1), (Kamenka,,1), (realize,30), (Already,,1), (marvels.",1), (crown—an,1), (colloquies,1), (booked,1), (welfare,,2), (eyebrows,22), (expiring,,1), (be),,2))

## Actions 
<img style="margin:0px auto;display:block;background: rgba(238, 238, 238, .5);height:550px; width:auto" src="images/1.6/action.jpg">

### Count
<img style="margin:0px auto;display:block;background: rgba(238, 238, 238, .5);" src="images/1.6/count.png">
##### def count(): Long
##### Return the number of elements in the RDD.

In [95]:
val a = sc.parallelize(1 to 10)
a.count

10

### Collect
<img style="margin:0px auto;display:block;background: rgba(238, 238, 238, .5);" src="images/1.6/collect.png">
##### def collect(): Array[T]
##### Return an array that contains all of the elements in this RDD.

In [96]:
val a = sc.parallelize(1 to 10)
a.collect

Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

### Take
<img style="margin:0px auto;display:block;background: rgba(238, 238, 238, .5);" src="images/1.6/take.png">
##### def take(num: Int): Array[T]
##### Take the first num elements of the RDD.

In [97]:
val a = sc.parallelize(1 to 10)
a.take(2)

Array(1, 2)

### Take Sample
<img style="margin:0px auto;display:block;background: rgba(238, 238, 238, .5);" src="images/1.6/take_sample2.png">
##### def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]
##### Return a fixed-size sampled subset of this RDD in an array 

In [103]:
val a = sc.parallelize(1 to 10)
a.takeSample(false,2)

Array(6, 8)

### Reduce
<img style="margin:0px auto;display:block;background: rgba(238, 238, 238, .5);" src="images/1.6/reduce.png">
##### def reduce(f: (T, T) ⇒ T): T
##### Reduces the elements of this RDD using the specified commutative and associative binary operator.

In [108]:
val a = sc.parallelize(1 to 100)
a.reduce((v1,v2) => v1 + v2)

5050

## RDD Save Actions

Save an RDD to disk as a text file, one element per line.  Note that is this an output directory, not a file name.
Each individual partition will be written to a separate file

In [113]:
counts.saveAsTextFile("file:///data/word_counts")

Save an RDD to disk using Java serialization.  Also specifies an output directory, with individual files for each partition

In [114]:
counts.saveAsObjectFile("file:///data/word_counts_binary")

## Summary

##### http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD>
##### http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD
##### http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/api/java/JavaRDD.html