# 301 Spark basics

The goal of this lab is to get familiar with Spark programming.

- [Spark programming guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html)
- [RDD APIs](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html)
- [PairRDD APIs](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/PairRDDFunctions.html)

## 301-2 Running a sample Spark job

Goal: calculate the average temperature for every month; dataset is ```weather-sample1```.

In [1]:
val bucketname = "unibo-bd2122-egallinucci"

val rddWeather = sc.textFile("s3a://"+bucketname+"/datasets/weather-sample1.txt")

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1647187270179_0002,spark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

bucketname: String = eg-myfirstbucket
rddWeather: org.apache.spark.rdd.RDD[String] = s3a://eg-myfirstbucket/datasets/weather-sample1.txt MapPartitionsRDD[1] at textFile at <console>:28


In [2]:
def parseWeatherLine(line:String):(String,Double) = {
  val year = line.substring(15,19)
  val month = line.substring(19,21)
  val day = line.substring(21,23)
  var temp = line.substring(87,92).toInt
  (month, temp/10)
}

// Parse records
val rddWeatherKv = rddWeather.map(x => parseWeatherLine(x))
// Aggregate by key (i.e., month) to compute the sum and the count of temperature values
val rddTempDataPerMonth = rddWeatherKv.aggregateByKey((0.0,0.0))((a,v)=>(a._1+v,a._2+1), (a1,a2)=>(a1._1+a2._1,a1._2+a2._2))
// Calculate the average temperature in each record
val rddAvgTempPerMonth = rddTempDataPerMonth.map({case(k,v) => (k, v._1/v._2)})
// Sort, coalesce and cache the result (because it is used twice)
val rddCached = rddAvgTempPerMonth.sortByKey().coalesce(1).cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

parseWeatherLine: (line: String)(String, Double)
rddWeatherKv: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[2] at map at <console>:29
rddTempDataPerMonth: org.apache.spark.rdd.RDD[(String, (Double, Double))] = ShuffledRDD[3] at aggregateByKey at <console>:26
rddAvgTempPerMonth: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[4] at map at <console>:26
rddCached: org.apache.spark.rdd.RDD[(String, Double)] = CoalescedRDD[8] at coalesce at <console>:26


In [4]:
// Show all the records
rddCached.collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res9: Array[(String, Double)] = Array((01,29.764781644286497), (02,52.831468961278425), (03,49.43499927074724), (04,61.3592872169286), (05,55.82656), (06,55.45816479125297), (07,86.90952392350223), (08,79.250958082407), (09,80.51662117371808), (10,106.26454490168254), (11,113.49704495968224), (12,63.9184413544602))


In [None]:
rddCached.saveAsTextFile("s3a://"+bucketname+"/spark/301-2")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 301-3 Spark warm-up

Load the ```capra``` and ```divinacommedia``` datasets and try the following actions:
- Show their content (```collect```)
- Count their rows (```count```)
- Split phrases into words (```map``` or ```flatMap```; what’s the difference?)
- Check the results (remember: evaluation is lazy)
- Try the ```toDebugString``` function to check the execution plan

In [3]:
val rddCapra = sc.textFile("s3a://"+bucketname+"/datasets/capra.txt")
val rddDC = sc.textFile("s3a://"+bucketname+"/datasets/divinacommedia.txt")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddCapra: org.apache.spark.rdd.RDD[String] = s3a://eg-myfirstbucket/datasets/capra.txt MapPartitionsRDD[10] at textFile at <console>:27
rddDC: org.apache.spark.rdd.RDD[String] = s3a://eg-myfirstbucket/datasets/divinacommedia.txt MapPartitionsRDD[12] at textFile at <console>:27


In [11]:
val rddCapraWords1 = rddCapra.map( x => x.split(" ") )
rddCapraWords1.collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddCapraWords1: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[22] at map at <console>:25
res13: Array[Array[String]] = Array(Array(sopra, la, panca, la, capra, campa), Array(sotto, la, panca, la, capra, crepa))


In [12]:
val rddDcWords2 = rddDC.flatMap( x => x.split(" ") )
rddDcWords2.collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddDC: org.apache.spark.rdd.RDD[String] = s3a://eg-myfirstbucket/first-datasets/divinacommedia.txt MapPartitionsRDD[24] at textFile at <console>:25
rddDcWords2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[25] at flatMap at <console>:25
res14: Array[String] = Array(LA, DIVINA, COMMEDIA, di, Dante, Alighieri, INFERNO, "", "", "", Inferno:, Canto, I, "", "", "", Nel, mezzo, del, cammin, di, nostra, vita, mi, ritrovai, per, una, selva, oscura, ch?, la, diritta, via, era, smarrita., "", "", Ahi, quanto, a, dir, qual, era, ?, cosa, dura, esta, selva, selvaggia, e, aspra, e, forte, che, nel, pensier, rinova, la, paura!, "", "", Tant'?, amara, che, poco, ?, pi?, morte;, ma, per, trattar, del, ben, ch'i', vi, trovai,, dir?, de, l'altre, cose, ch'i', v'ho, scorte., "", "", Io, non, so, ben, ridir, com'i', v'intrai,, tant'era, pien, di, sonno, a, quel, punto, che, la, verace, via, abbandonai., "", "", Ma, poi, ch'i', fui, al, pi?, d'un, colle, giunto,, l?, dove, terminava, quella, valle, 

In [9]:
val rddL = rddCapra.
   flatMap( x => x.split(" ") ).
   map(x => (x,1)).
   reduceByKey((x,y)=>x+y)
rddL.toDebugString

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddL: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[19] at reduceByKey at <console>:28
res12: String =
(2) ShuffledRDD[19] at reduceByKey at <console>:28 []
 +-(2) MapPartitionsRDD[18] at map at <console>:27 []
    |  MapPartitionsRDD[17] at flatMap at <console>:26 []
    |  s3a://eg-myfirstbucket/first-datasets/capra.txt MapPartitionsRDD[12] at textFile at <console>:25 []
    |  s3a://eg-myfirstbucket/first-datasets/capra.txt HadoopRDD[11] at textFile at <console>:25 []


## 301-4 From MapReduce to Spark

Reproduce on Spark the exercises seen on Hadoop MapReduce on the capra and divinacommedia datasets.

- Jobs:
  - Count the number of occurrences of each word
    - Result: (sopra, 1), (la, 4), …
  - Count the number of occurrences of words of given lengths
    - Result: (2, 4), (5, 8)
  - Count the average length of words given their first letter (hint: check the example in 301-1)
    - Result: (s, 5), (l, 2), …
  - Return the inverted index of words
    - Result: (sopra, (0)), (la, (0, 1)), ...
- How does Spark compare with respect to MapReduce? (performance, ease of use)
- How is the output sorted? How can you sort by value?

In [14]:
// Word count

val rddMap = rddCapra.flatMap( x => x.split(" ") ).map( x => (x,1))
rddMap.take(5)

val rddReduce = rddMap.reduceByKey((x,y) => x + y)
rddReduce.collect()

val rddMap = rddDC.flatMap( x => x.split(" ") ).map( x => (x,1))
rddMap.take(5)

val rddReduce = rddMap.reduceByKey((x,y) => x + y)
rddReduce.collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddMap: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[27] at map at <console>:27
res19: Array[(String, Int)] = Array((sopra,1), (la,1), (panca,1), (la,1), (capra,1))
rddReduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[28] at reduceByKey at <console>:26
res21: Array[(String, Int)] = Array((campa,1), (la,4), (panca,2), (sotto,1), (crepa,1), (sopra,1), (capra,2))
rddMap: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[29] at map at <console>:26
res23: Array[(String, Int)] = Array((LA,1), (DIVINA,1), (COMMEDIA,1), (di,1), (Dante,1))
rddReduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[30] at reduceByKey at <console>:26
res25: Array[(String, Int)] = Array((grand'avello,,1), (diseta,1), (vane.,1), (tonda,3), (blandimenti;,1), (sapore,1), (dando,3), (Verrucchio,,1), (Mantua,1), (m'apparvero,1), (disiderate,1), (dole,1), (moventi,1), (rincalzi,1), (freni,,1), (Voglia,1), (focina,1), (tormento,5), (s?:,2), (marino,,1), (scalz?,1), (pensassi,1), (

In [15]:
// Word length count

val rddMap = rddDC.flatMap( x => x.split(" ") ).map( x => (x.length,1))
rddMap.collect()

val rddReduce = rddMap.reduceByKey((x,y) => x + y)
rddReduce.collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddMap: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[32] at map at <console>:27
res28: Array[(Int, Int)] = Array((2,1), (6,1), (8,1), (2,1), (5,1), (9,1), (7,1), (0,1), (0,1), (0,1), (8,1), (5,1), (1,1), (0,1), (0,1), (0,1), (3,1), (5,1), (3,1), (6,1), (2,1), (6,1), (4,1), (2,1), (8,1), (3,1), (3,1), (5,1), (6,1), (3,1), (2,1), (7,1), (3,1), (3,1), (9,1), (0,1), (0,1), (3,1), (6,1), (1,1), (3,1), (4,1), (3,1), (1,1), (4,1), (4,1), (4,1), (5,1), (9,1), (1,1), (5,1), (1,1), (5,1), (3,1), (3,1), (7,1), (6,1), (2,1), (6,1), (0,1), (0,1), (6,1), (5,1), (3,1), (4,1), (1,1), (3,1), (6,1), (2,1), (3,1), (7,1), (3,1), (3,1), (5,1), (2,1), (7,1), (4,1), (2,1), (7,1), (4,1), (5,1), (4,1), (7,1), (0,1), (0,1), (2,1), (3,1), (2,1), (3,1), (5,1), (6,1), (9,1), (8,1), (4,1), (2,1), (5,1), (1,1), (4,1), (5,1), (3,1), (2,1), (6,1), (3,1), (11,1), (0,1), (0,1), (2,1), (3,1), (5,1), ...
rddReduce: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[33] at reduceByKey at <console>:26
res30: Arra

In [17]:
// Average word length by initial

val rddMap = rddDC.flatMap( x => x.split(" ") ).
  filter( _.length>0 ).
  map( x => (x.substring(0,1).toLowerCase, x.length))
rddMap.collect()

val rddReduce = rddMap.aggregateByKey((0.0,0.0))((a,v)=>(a._1+v,a._2+1), (a1,a2)=>(a1._1+a2._1,a1._2+a2._2))
rddReduce.collect()

val rddFinal = rddReduce.mapValues(v => v._1/v._2)
rddFinal.collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddMap: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[41] at map at <console>:29
res40: Array[(String, Int)] = Array((l,2), (d,6), (c,8), (d,2), (d,5), (a,9), (i,7), (i,8), (c,5), (i,1), (n,3), (m,5), (d,3), (c,6), (d,2), (n,6), (v,4), (m,2), (r,8), (p,3), (u,3), (s,5), (o,6), (c,3), (l,2), (d,7), (v,3), (e,3), (s,9), (a,3), (q,6), (a,1), (d,3), (q,4), (e,3), (?,1), (c,4), (d,4), (e,4), (s,5), (s,9), (e,1), (a,5), (e,1), (f,5), (c,3), (n,3), (p,7), (r,6), (l,2), (p,6), (t,6), (a,5), (c,3), (p,4), (?,1), (p,3), (m,6), (m,2), (p,3), (t,7), (d,3), (b,3), (c,5), (v,2), (t,7), (d,4), (d,2), (l,7), (c,4), (c,5), (v,4), (s,7), (i,2), (n,3), (s,2), (b,3), (r,5), (c,6), (v,9), (t,8), (p,4), (d,2), (s,5), (a,1), (q,4), (p,5), (c,3), (l,2), (v,6), (v,3), (a,11), (m,2), (p,3), (c,5), (f,3), (a,2), (p,3), (d,4), (c,5), (g,7), (l,2), (d,4), (t,9), (q,6), (v,5), (c,3), (m,6), (d,2...
rddReduce: org.apache.spark.rdd.RDD[(String, (Double, Double))] = ShuffledRDD[42] at aggregateByKey at <c

In [5]:
// Inverted index (word-based offset)

val rddMap = rddCapra.flatMap( x => x.split(" ") ).zipWithIndex()
rddMap.collect()

val rddGroup = rddMap.groupByKey()
rddGroup.collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddMap: org.apache.spark.rdd.RDD[(String, Long)] = ZippedWithIndexRDD[17] at zipWithIndex at <console>:27
res14: Array[(String, Long)] = Array((sopra,0), (la,1), (panca,2), (la,3), (capra,4), (campa,5), (sotto,6), (la,7), (panca,8), (la,9), (capra,10), (crepa,11))
rddGroup: org.apache.spark.rdd.RDD[(String, Iterable[Long])] = ShuffledRDD[18] at groupByKey at <console>:26
res16: Array[(String, Iterable[Long])] = Array((campa,CompactBuffer(5)), (la,CompactBuffer(1, 3, 7, 9)), (panca,CompactBuffer(2, 8)), (sotto,CompactBuffer(6)), (crepa,CompactBuffer(11)), (sopra,CompactBuffer(0)), (capra,CompactBuffer(4, 10)))


In [18]:
// Inverted index (sentence-based offset)
val rddMap = rddCapra.zipWithIndex().
    flatMap({case (k,v)=> k.split(" ").map(x=>(x,v))}).
    distinct()
rddMap.collect()

val rddGroup = rddMap.groupByKey()
rddGroup.collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddMap: org.apache.spark.rdd.RDD[(String, Long)] = MapPartitionsRDD[82] at distinct at <console>:28
res72: Array[(String, Long)] = Array((la,0), (capra,0), (la,1), (sotto,1), (sopra,0), (capra,1), (panca,1), (crepa,1), (campa,0), (panca,0))
rddGroup: org.apache.spark.rdd.RDD[(String, Iterable[Long])] = ShuffledRDD[83] at groupByKey at <console>:27
res74: Array[(String, Iterable[Long])] = Array((campa,CompactBuffer(0)), (la,CompactBuffer(0, 1)), (panca,CompactBuffer(1, 0)), (sotto,CompactBuffer(1)), (crepa,CompactBuffer(1)), (sopra,CompactBuffer(0)), (capra,CompactBuffer(0, 1)))


In [14]:
// Inverted index (sentence-based offset) alternative
val rddMap = rddCapra.zipWithIndex().
    map({case (k,v)=>(v,k)}).
    flatMapValues( x => x.split(" ") ).
    map({case (k,v)=>(v,k)}).
    distinct()
rddMap.collect()

val rddGroup = rddMap.groupByKey()
rddGroup.collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import org.apache.spark.HashPartitioner
p: org.apache.spark.HashPartitioner = org.apache.spark.HashPartitioner@8
rddMap: org.apache.spark.rdd.RDD[(String, Long)] = MapPartitionsRDD[63] at distinct at <console>:32
res59: Array[(String, Long)] = Array((chiuder,4459), (che,6723), (era,,12427), ("",9109), (ragionando,8297), (Naiade,,9726), (vidi,13635), (Cancro,13513), ("",5314), (noi,3707), ("",3986), (lui,2878), (occhi,9695), ("",4588), (volta,12193), (voi,11677), ("",14088), ("",2483), (china',702), (per?,8677), ("",12522), (terra,14121), ("Aspetta,3180), (che,12145), ("",9439), (nomava,12175), (che,6064), ("Gentucca",8332), (Catria,,12912), ("",4395), (sogna.,9710), (dicendo:,1896), (occhi,264), ("",9475), ("",1808), (sembianti,,11387), ("",10138), (col,3183), (vidi,2114), ("",11963), (tutti,7587), (l'ombra,5271), ("",12807), (terra,5735), ("",4144), (ritornaro,9587), ("",3938), (figliuoli,6569), ("",12414), (disciolto,,7566), ("Con,1031), ("",8494), (mutua,11518), ("",10063), (che,126

In [None]:
// Sort an RDD by key

rdd.sortByKey()

// Sort an RDD by value

rdd.map({case(k,v) => (v,k)}).sortByKey()