In [1]:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

In [2]:
// Cluster URL specifies how to connect to a cluster. 
//Local runs on local machine, without conencting to a cluster
val conf = new SparkConf().setMaster("local").setAppName("Learning Spark Transformations App")
val sc = new SparkContext(conf)

conf = org.apache.spark.SparkConf@b7a489c
sc = org.apache.spark.SparkContext@dd5223c


org.apache.spark.SparkContext@dd5223c

### Simple RDD

In [3]:
val lines = sc.textFile("simple_file.txt")

lines = simple_file.txt MapPartitionsRDD[1] at textFile at <console>:32


simple_file.txt MapPartitionsRDD[1] at textFile at <console>:32

In [4]:
lines.filter(line => line.length < 150).take(5)

Array("Lorem ipsum dolor sit amet, consectetur adipiscing elit. ", "", "Vestibulum eu sollicitudin ipsum, ut rutrum leo. ", "Proin sodales tincidunt mi, at sodales augue cursus sit amet. ", "")

### Pair RDD

In [5]:
val pairs = lines.map(x => (x.split(" ")(0), x))

pairs = MapPartitionsRDD[3] at map at <console>:32


MapPartitionsRDD[3] at map at <console>:32

In [6]:
pairs.take(5)

Array((Lorem,"Lorem ipsum dolor sit amet, consectetur adipiscing elit. "), ("",""), (Vestibulum,"Vestibulum eu sollicitudin ipsum, ut rutrum leo. "), (Proin,"Proin sodales tincidunt mi, at sodales augue cursus sit amet. "), ("",""))

In [7]:
pairs.filter{ case (key, value) => value.length < 150 }.take(8)

Array((Lorem,"Lorem ipsum dolor sit amet, consectetur adipiscing elit. "), ("",""), (Vestibulum,"Vestibulum eu sollicitudin ipsum, ut rutrum leo. "), (Proin,"Proin sodales tincidunt mi, at sodales augue cursus sit amet. "), ("",""), (Duis,"Duis tincidunt mollis sem ac fermentum. "), (Vestibulum,"Vestibulum cursus lacinia mauris id viverra. "), (Curabitur,Curabitur rutrum, tellus vitae ullamcorper faucibus, tortor purus varius urna, vitae rhoncus elit dui id risus.))

### Another Pair RDD

In [8]:
// Creating a pair RDD
val simpleRDD = sc.parallelize(Seq((1, 2), (3, 6), (3, 4), (2, 8)))

simpleRDD = ParallelCollectionRDD[5] at parallelize at <console>:33


ParallelCollectionRDD[5] at parallelize at <console>:33

### Transformations on one Pair RDD

In [9]:
// Combine values with same key. This function runs several parallel reduce operations
simpleRDD.reduceByKey((x, y) => x + y).take(3)

Array((1,2), (3,10), (2,8))

In [10]:
// Group values
simpleRDD.groupByKey().take(3)

Array((1,CompactBuffer(2)), (3,CompactBuffer(6, 4)), (2,CompactBuffer(8)))

In [11]:
// Get Keys
simpleRDD.keys.collect()

Array(1, 3, 3, 2)

In [12]:
// Get values
simpleRDD.values.collect()

Array(2, 6, 4, 8)

In [13]:
// RDD is sorted by key
simpleRDD.sortByKey().collect()

Array((1,2), (2,8), (3,6), (3,4))

In [14]:
// RDD is sorted by key, in descending order
simpleRDD.sortByKey(false).collect()

Array((3,6), (3,4), (2,8), (1,2))

In [15]:
// Operation applied to each RDD value
simpleRDD.mapValues(x => x + 1).take(4)

Array((1,3), (3,7), (3,5), (2,9))

In [16]:
// Operation equivalent to mapValues
simpleRDD.map{case (x, y) => (x, y + 1)}.take(4)

Array((1,3), (3,7), (3,5), (2,9))

In [17]:
// The applied function returns an iterator to each value. This is flattened, so
// each value forms a key-value pair with the old key
simpleRDD.flatMapValues(x => (x to 5)).take(10)

Array((1,2), (1,3), (1,4), (1,5), (3,4), (3,5))

#### Applying map and then reduce function

In [18]:
val fruits = sc.parallelize(Seq(("apple", 3), ("banana", 5), ("papaya", 4), 
                                ("banana", 3), ("apple", 2)))

fruits = ParallelCollectionRDD[15] at parallelize at <console>:32


ParallelCollectionRDD[15] at parallelize at <console>:32

In [19]:
// In the result, first number is the sum of the values, while the second is the 
// frequency each key appears
fruits.mapValues(y => (y, 1)).reduceByKey((y, z) => (y._1 + z._1, y._2 + z._2)).collect()

Array((banana,(8,2)), (papaya,(4,1)), (apple,(5,2)))

#### Word Count using flatMap, map and reduceByKey

In [20]:
val content = sc.textFile("simple_file.txt")

content = simple_file.txt MapPartitionsRDD[19] at textFile at <console>:32


simple_file.txt MapPartitionsRDD[19] at textFile at <console>:32

In [21]:
val words = content.flatMap(x => x.split(" "))
words.map(x => (x, 1)).reduceByKey((y1, y2) => y1 + y2).take(15)

words = MapPartitionsRDD[20] at flatMap at <console>:32


Array((ac,5), (ipsum,,1), (erat,1), (justo,1), (augue,,1), (turpis.,1), (fames,1), (congue.,1), (urna,1), (leo,1), (vel,,1), (Lorem,1), (auctor,1), (rutrum,,1), (malesuada,2))

In [22]:
words.partitions.size

1

In [23]:
// Setting the level of parallelism (second argument of reduceByKey), 
// by specifying the number of partitions
val temp = words.map(x => (x, 1)).reduceByKey((y1, y2) => y1 + y2, 6)
temp.take(15)

temp = ShuffledRDD[24] at reduceByKey at <console>:34


Array((faucibus,,1), (pretium,1), (metus,1), (urna,1), (ultricies.,1), (nec,,1), (tortor,1), (dictum,1), (egestas,1), ("",3), (quis,1), (dapibus,1), (mattis,1), (mi,,1), (lobortis.,1))

In [24]:
temp.partitions.size

6

#### Word Count using flatMap and countByValue

In [25]:
content.flatMap(x => x.split(" ")).countByValue().take(15)

Map(elit -> 2, mauris. -> 1, augue. -> 2, "" -> 3, tortor -> 1, diam, -> 1, consectetur -> 1, sed -> 1, dictum -> 1, in -> 1, fames -> 1, ipsum, -> 1, sagittis -> 2, vel, -> 1, volutpat -> 1)

### Transformations on two Pair RDDs

In [26]:
val firstRDD = sc.parallelize(Seq((1, 2), (3, 6), (3, 4), (2, 8)))
val secondRDD = sc.parallelize(Seq((3, 9), (2, 3)))

firstRDD = ParallelCollectionRDD[29] at parallelize at <console>:32
secondRDD = ParallelCollectionRDD[30] at parallelize at <console>:33


ParallelCollectionRDD[30] at parallelize at <console>:33

In [27]:
// Remove elements if theirs keys are present in the second RDD
firstRDD.subtractByKey(secondRDD).collect()

Array((1,2))

In [28]:
// Inner join
firstRDD.join(secondRDD).collect()

Array((3,(6,9)), (3,(4,9)), (2,(8,3)))

In [29]:
// Right Outer Join - the key must be present in the second RDD
firstRDD.rightOuterJoin(secondRDD).take(4)

Array((3,(Some(6),9)), (3,(Some(4),9)), (2,(Some(8),3)))

In [30]:
// Left Outer Join - the key must be present in the first RDD
firstRDD.leftOuterJoin(secondRDD).take(4)

Array((1,(2,None)), (3,(6,Some(9))), (3,(4,Some(9))), (2,(8,Some(3))))

In [31]:
// Group data from the first and second RDD sharing the same key
firstRDD.cogroup(secondRDD).collect()

Array((1,(CompactBuffer(2),CompactBuffer())), (3,(CompactBuffer(6, 4),CompactBuffer(9))), (2,(CompactBuffer(8),CompactBuffer(3))))

### Transformations on three Pair RDDs

In [32]:
val thirdRDD = sc.parallelize(Seq((1, 4), (3, 5), (4, 8)))

thirdRDD = ParallelCollectionRDD[43] at parallelize at <console>:32


ParallelCollectionRDD[43] at parallelize at <console>:32

In [33]:
firstRDD.cogroup(secondRDD).cogroup(thirdRDD).take(4)

Array((4,(CompactBuffer(),CompactBuffer(8))), (1,(CompactBuffer((CompactBuffer(2),CompactBuffer())),CompactBuffer(4))), (3,(CompactBuffer((CompactBuffer(6, 4),CompactBuffer(9))),CompactBuffer(5))), (2,(CompactBuffer((CompactBuffer(8),CompactBuffer(3))),CompactBuffer())))