# 103 Spark optimizations

The goal of this lab is to understand some of the optimization mechanisms of Spark.

- [Spark programming guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html)
- [RDD APIs](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html)
- [PairRDD APIs](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/PairRDDFunctions.html)

In [1]:
import org.apache.spark

Intitializing Scala interpreter ...

Spark Web UI available at http://LAPTOP-T2P39KLE:4042
SparkContext available as 'sc' (version = 3.5.1, master = local[*], app id = local-1730276518081)
SparkSession available as 'spark'


import org.apache.spark


In [None]:
// DO NOT EXECUTE - this is needed just to avoid showing errors in the following cells
val sc = spark.SparkContext.getOrCreate()

In [2]:
// WEATHER structure: (usaf,wban,year,month,day,airTemperature,airTemperatureQuality)
def parseWeather(row:String) = {
    val usaf = row.substring(4,10)
    val wban = row.substring(10,15)
    val year = row.substring(15,19)
    val month = row.substring(19,21)
    val day = row.substring(21,23)
    val airTemperature = row.substring(87,92)
    val airTemperatureQuality = row.charAt(92)

    (usaf,wban,year,month,day,airTemperature.toInt/10,airTemperatureQuality == '1')
}

// STATION structure: (usaf,wban,city,country,state,latitude,longitude,elevation,date_begin,date_end) 
def parseStation(row:String) = {
    def getDouble(str:String) : Double = {
        if (str.isEmpty)
            return 0
        else
            return str.toDouble
    }
    val columns = row.split(",").map(_.replaceAll("\"",""))
    val latitude = getDouble(columns(6))
    val longitude = getDouble(columns(7))
    val elevation = getDouble(columns(8))
    (columns(0),columns(1),columns(2),columns(3),columns(4),latitude,longitude,elevation,columns(9),columns(10))  
}

parseWeather: (row: String)(String, String, String, String, String, Int, Boolean)
parseStation: (row: String)(String, String, String, String, String, Double, Double, Double, String, String)


In [3]:
val rddWeather = sc.
  textFile("../../../../datasets/big/weather-sample1.txt").
  map(x => parseWeather(x))
val rddStation = sc.
  textFile("../../../../datasets/weather-stations.csv").
  map(x => parseStation(x))

rddWeather: org.apache.spark.rdd.RDD[(String, String, String, String, String, Int, Boolean)] = MapPartitionsRDD[2] at map at <console>:29
rddStation: org.apache.spark.rdd.RDD[(String, String, String, String, String, Double, Double, Double, String, String)] = MapPartitionsRDD[5] at map at <console>:32


## 103-1 Simple job optimization

Optimize the two jobs (avg temperature and max temperature) by avoiding the repetition of the same computations and by enforcing a partitioning criteria.
- There are multiple methods to repartition an RDD: check the ```coalesce```, ```partitionBy```, and ```repartition``` methods on the documentation and choose the best one.
  - To create a partitioning function, you must ```import org.apache.spark.HashPartitioner``` and then define ```val p = new HashPartitioner(n)``` where ```n``` is the number of partitions to create
- Verify your persisted data in the web UI
- Verify the execution plan of your RDDs with ```rdd.toDebugString``` (shell only) or on the web UI

In [4]:
import org.apache.spark.HashPartitioner

import org.apache.spark.HashPartitioner


In [5]:
// Average temperature for every month
// WEATHER structure: (usaf,wban,year,month,day,airTemperature,airTemperatureQuality)
rddWeather.
  filter(_._6<999).
  map(x => (x._4, x._6)).
  coalesce(10).
  aggregateByKey((0.0,0.0))((a,v)=>(a._1+v,a._2+1), (a1,a2)=>(a1._1+a2._1,a1._2+a2._2)).
  map({case(k,v)=>(k,Math.round(v._1*100/v._2)/100.0)}).
  collect()

res0: Array[(String, Double)] = Array((04,4.69), (11,4.17), (08,13.5), (09,10.35), (12,1.6), (05,9.37), (01,0.18), (06,11.67), (02,0.29), (03,1.75), (07,13.81), (10,8.33))


In [6]:
// Maximum temperature for every month
rddWeather.
  filter(_._6<999).
  map(x => (x._4, x._6)).
  partitionBy(new HashPartitioner(10)).
  reduceByKey((x,y)=>{if(x<y) y else x}).
  collect()

res1: Array[(String, Int)] = Array((04,23), (05,34), (06,31), (07,29), (08,23), (09,30), (01,12), (10,20), (02,13), (11,14), (03,15), (12,14))


In [7]:
//SOLUZIONE DEL PROF
val rddCached = rddWeather.
  filter(_._6<999).
  map(x => (x._4, x._6)).
  partitionBy(new HashPartitioner(4)).  
  cache()

rddCached: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[17] at partitionBy at <console>:30


In [8]:
rddCached.aggregateByKey((0.0,0.0))((a,v)=>(a._1+v,a._2+1), (a1,a2)=>(a1._1+a2._1,a1._2+a2._2)).
  map({case(k,v)=>(k,Math.round(v._1*100/v._2)/100.0)}).
  collect()

res2: Array[(String, Double)] = Array((04,4.69), (11,4.17), (08,13.5), (09,10.35), (12,1.6), (05,9.37), (01,0.18), (06,11.67), (02,0.29), (03,1.75), (07,13.81), (10,8.33))


In [9]:
rddCached.reduceByKey((x,y)=>{if(x<y) y else x}).
  collect()

res3: Array[(String, Int)] = Array((04,23), (11,14), (08,23), (09,30), (12,14), (05,34), (01,12), (06,31), (02,13), (03,15), (07,29), (10,20))


## 103-2 RDD preparation

Check the five possibilities to prepare the Station RDD for subsequent processing and identify the best one.

In [10]:
import org.apache.spark.HashPartitioner
val p2 = new HashPartitioner(8)

// _1 and _2 are the fields composing the key; _4 and _8 are country and elevation, respectively
val rddS1 = rddStation.
  keyBy(x => x._1 + x._2).
  partitionBy(p2).
  cache().
  map({case (k,v) => (k,(v._4,v._8))})
// numero 1 non è adatto poichè anche se fa tutto il lavoro che ci interessa, è che la mappa viene fatta dopo il cache (quello che va in cache è il risultato di KeyBy e PartitionBy (non map). Inoltre rompiamo il criterio poichè facciamo partitionBy prima di map (NON SI FA QUESTO, NON SI MANTIENE LA PARTIZIONE DESIDERATA SE FACCIAMO MAP DOPO PARTITION!!!)
val rddS2 = rddStation.
  keyBy(x => x._1 + x._2).
  map({case (k,v) => (k,(v._4,v._8))}).
  cache().
  partitionBy(p2)
// numero 2 ha come problema il fatto che la cache viene fatta prima di partitionBy
val rddS3 = rddStation.
  keyBy(x => x._1 + x._2).
  partitionBy(p2).
  map({case (k,v) => (k,(v._4,v._8))}).
  cache()
// numero 3 ha il problema di non si mantiene criterio di partizione facendo map dopo partitionBy
val rddS4 = rddStation.
  keyBy(x => x._1 + x._2).
  map({case (k,v) => (k,(v._4,v._8))}).
  partitionBy(p2).
  cache()
// numero 4 va bene ma deve essere ottimizzato magari mettendo inline la map
val rddS5 = rddStation.
  map(x => (x._1 + x._2, (x._4,x._8))).
  partitionBy(p2).
  cache()
// numero 5 soluzione migliore!

import org.apache.spark.HashPartitioner
p2: org.apache.spark.HashPartitioner = org.apache.spark.HashPartitioner@8
rddS1: org.apache.spark.rdd.RDD[(String, (String, Double))] = MapPartitionsRDD[23] at map at <console>:34
rddS2: org.apache.spark.rdd.RDD[(String, (String, Double))] = ShuffledRDD[26] at partitionBy at <console>:39
rddS3: org.apache.spark.rdd.RDD[(String, (String, Double))] = MapPartitionsRDD[29] at map at <console>:43
rddS4: org.apache.spark.rdd.RDD[(String, (String, Double))] = ShuffledRDD[32] at partitionBy at <console>:48
rddS5: org.apache.spark.rdd.RDD[(String, (String, Double))] = ShuffledRDD[34] at partitionBy at <console>:52


## 103-3 Joining RDDs

Define the join between rddWeather and rddStation and compute:
- The maximum temperature for every city
- The maximum temperature for every city in the UK: 
  - ```StationData.country == "UK"```
- Sort the results by descending temperature
  - ```map({case(k,v)=>(v,k)})``` to invert key with value and vice versa

Hints & considerations:
- Keep only temperature values <999
- Join syntax: ```rdd1.join(rdd2)```
  - Both RDDs should be structured as key-value RDDs with the same key: usaf + wban
- Consider partitioning and caching to optimize the join
  - Careful: it is not enough for the two RDDs to have the same number of partitions; they must have the same partitioner!
- Verify the execution plan of the join in the web UI

In [None]:
// WEATHER structure: (usaf,wban,year,month,day,airTemperature,airTemperatureQuality)
// STATION structure: (usaf,wban,city,country,state,latitude,longitude,elevation,date_begin,date_end) 

In [27]:
// STARTING CACHED WORK TO BE USED FOR THE OTHER JOBS
val partitioner = new HashPartitioner(8)

val cachedWeather = rddWeather.filter(_._6<999).keyBy(y => y._1 + y._2).cache()

val rddJoinCached = rddStation.keyBy(x => x._1 + x._2)
  .join(cachedWeather)
  .reduceByKey((a,b) => if (a._2._6 > b._2._6) a else b)
  .partitionBy(partitioner)
  .cache()

partitioner: org.apache.spark.HashPartitioner = org.apache.spark.HashPartitioner@8
cachedWeather: org.apache.spark.rdd.RDD[(String, (String, String, String, String, String, Int, Boolean))] = MapPartitionsRDD[79] at keyBy at <console>:37
rddJoinCached: org.apache.spark.rdd.RDD[(String, ((String, String, String, String, String, Double, Double, Double, String, String), (String, String, String, String, String, Int, Boolean)))] = ShuffledRDD[85] at partitionBy at <console>:42


In [36]:
// MAX TEMPERATURE OF EVERY CITY
val maxTemperature = rddJoinCached.map({case (k,v) => (v._1._3, v._2._6)}).cache()

maxTemperature.collect()

maxTemperature: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[91] at map at <console>:33
res15: Array[(String, Int)] = Array((TAMPERE PIRKKALA,29), (BARRA ISLAND,21), (ISOSAARI,21), (LOSSIEMOUTH,23), (SCATSTA,20), (FOYERS,23), (BALTASOUND NO.2,18), (LAPPEENRANTA HIEKKAPAKKA,23), (UTTI,29), (VAASA,27), (LEMLAND NYHAMN,19), (FOULA,15), (INVERGORDON HARBOUR,20), (SUOMUSSALMI,29), (TULLOCH BRIDGE,26), (VAASA,25), (MARIEHAMN,23), (FOULA NO2,15), (PARAINEN UTO,20), (BUTT OF LEWIS (LH),8), (HALLI,29), (SUMBURGH,30), (AHTARI MYLLYMAKI,28), (INVERNESS,23), (SKYE/LUSA,23), (KAUHAVA,29), (STORNOWAY,20), (JOMALA,22), (JYVASKYLA,29), (KUUSAMO,29), (HANKO RUSSARO,22), (TAIN RANGE (SAWS),22), (KILMORY,21), (AVIEMORE,23), (TURKU,27), (MIKKELI,28), (AONACH MOR,17), (LAPPEENRANTA,29), (LERW...


In [37]:
// MAX TEMPERATURE OF EVERY CITY IN UK
val maxTemperatureUK = rddJoinCached.filter(_._2._1._4 == "UK").map({case (k,v) => (v._1._3, v._2._6)})

maxTemperatureUK.collect()

maxTemperatureUK: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[93] at map at <console>:33
res16: Array[(String, Int)] = Array((BARRA ISLAND,21), (LOSSIEMOUTH,23), (SCATSTA,20), (FOYERS,23), (BALTASOUND NO.2,18), (FOULA,15), (INVERGORDON HARBOUR,20), (TULLOCH BRIDGE,26), (FOULA NO2,15), (BUTT OF LEWIS (LH),8), (SUMBURGH,30), (INVERNESS,23), (SKYE/LUSA,23), (STORNOWAY,20), (TAIN RANGE (SAWS),22), (KILMORY,21), (AVIEMORE,23), (AONACH MOR,17), (LERWICK,17), (WATERSTEIN,21), (LOCH GLASCARNOCH,22), (KIRKWALL,19), (SELLA NESS,19), (CAIRNGORM SUMMIT,14), (MUCKLE HOLM,16), (SULE SKERRY,17), (LOCHBOISDALE,21), (ALTNAHARRA NO2,24), (KINLOSS,24), (RACKWICK,19), (BENBECULA,20), (NORTH RONA ISLAND,21), (FAIR ISLE,15), (NORTH RONALDSAY ISL,6), (SOUTH UIST RANGE,34), (GLENLIVET,23), (AUL...


In [41]:
// SORT TEMPERATURE BY DESCENDING
// ATTENZIONE: cache() si usa solo se quel rdd creato verrà riutilizzato dopo per altri lavori grossi quali sortBy/reduceBy etc...
// non spammare cache() a caso!!
val sortedMaxTemperature = maxTemperature.map({case (k,v) => (v,k)}).sortByKey(false).map({case (k,v) => (v,k)})

sortedMaxTemperature.collect()

sortedMaxTemperature: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[107] at sortByKey at <console>:33
res20: Array[(Int, String)] = Array((34,SOUTH UIST RANGE), (31,OULU), (30,SUMBURGH), (30,SAVONLINNA), (30,KAJAANI), (30,KUOPIO), (30,JOENSUU), (29,TAMPERE PIRKKALA), (29,UTTI), (29,SUOMUSSALMI), (29,HALLI), (29,KAUHAVA), (29,JYVASKYLA), (29,KUUSAMO), (29,LAPPEENRANTA), (29,PORI), (29,KANKAANPAA NIINISALO PUOLVOIM), (29,LAHTI LAUNE), (29,VARKAUS), (28,AHTARI MYLLYMAKI), (28,MIKKELI), (28,VIITASAARI), (28,HAILUOTO ISLAND), (28,HELSINKI VANTAA), (28,SEINAJOKI), (28,NIVALA), (28,KRUUNUPYY), (28,HELSINKI MALMI), (27,VAASA), (27,TURKU), (27,JOKIOINEN), (26,TULLOCH BRIDGE), (26,SUOMUSJARVI), (25,VAASA), (25,KOTKA RANKKI), (24,ALTNAHARRA NO2), (24,KINLOSS), (23,LOSSIEMOUTH), (23,FOYERS...


## 103-4 Memory occupation

Use Spark's web UI to verify the space occupied by the provided RDDs.

In [11]:
import org.apache.spark.storage.StorageLevel._

sc.getPersistentRDDs.foreach(_._2.unpersist())

val memRdd = rddWeather.cache()
val memSerRdd = memRdd.map(x=>x).persist(MEMORY_ONLY_SER)
val diskRdd = memRdd.map(x=>x).persist(DISK_ONLY)

import org.apache.spark.storage.StorageLevel._
memRdd: rddWeather.type = MapPartitionsRDD[2] at map at <console>:29
memSerRdd: org.apache.spark.rdd.RDD[(String, String, String, String, String, Int, Boolean)] = MapPartitionsRDD[35] at map at <console>:34
diskRdd: org.apache.spark.rdd.RDD[(String, String, String, String, String, Int, Boolean)] = MapPartitionsRDD[36] at map at <console>:35


## 103-5 Evaluating different join methods

Consider the following scenario:
- We have a disposable RDD of Weather data (i.e., it is used only once): ```rddW```
- And we have an RDD of Station data that is used many times: ```rddS```
- Both RDDs are cached (```collect()```is called to enforce caching)

We want to join the two RDDS. Which option is best?
- Simply join the two RDDs
- Enforce on ```rddW1``` the same partitioner of ```rddS``` (and then join)
- Exploit broadcast variables

In [None]:
import org.apache.spark.HashPartitioner
val p = new HashPartitioner(8)
sc.getPersistentRDDs.foreach(_._2.unpersist())

val rddW = rddWeather.
  filter(_._6<999).
  keyBy(x => x._1 + x._2).
  persist()
val rddS = rddStation.
  keyBy(x => x._1 + x._2).
  partitionBy(p).
  cache()

// Collect to enforce caching
rddW.collect()
rddS.collect()

In [None]:
// Is it better to simply join the two RDDs..
rddW.
  join(rddS).
  map({case(k,v)=>(v._2._3,v._1._6)}).
  reduceByKey((x,y)=>{if(x<y) y else x}).
  collect

In [None]:
// ..to enforce on rddW1 the same partitioner of rddS..
rddW.
  partitionBy(p).
  join(rddS).
  map({case(k,v)=>(v._2._3,v._1._6)}).
  reduceByKey((x,y)=>{if(x<y) y else x}).
  collect()

In [None]:
// ..or to exploit broadcast variables?
val bRddS = sc.broadcast(rddS.map(x => (x._1, x._2._3)).collectAsMap())
val rddJ = rddW.
  map({case (k,v) => (bRddS.value.get(k),v._6)}).
  filter(_._1!=None)
rddJ.
  reduceByKey((x,y)=>{if(x<y) y else x}).
  collect()

## 103-6 Optimizing Exercise 3

Start from the result of the last job of Exercise 3; is there a more efficient way to compute the same result?
- Try it on weather-sample10
- Hint: consider that each station is located in only one country

In [None]:
import org.apache.spark.HashPartitioner
import org.apache.spark.storage.StorageLevel._
val p = new HashPartitioner(8)
sc.getPersistentRDDs.foreach(_._2.unpersist())

val rddS = rddStation.
  keyBy(x => x._1 + x._2).
  partitionBy(p).
  cache()
val rddW = rddWeather.
  filter(_._6<999).
  keyBy(x => x._1 + x._2).
  partitionBy(p).
  persist(MEMORY_AND_DISK_SER)

// Collect to enforce caching
rddW.collect()
rddS.collect()

In [None]:
// First version
rddW.
  join(rddS).
  filter(_._2._2._4=="UK").
  map({case(k,v)=>(v._2._3,v._1._6)}).
  reduceByKey((x,y)=>{if(x<y) y else x}).
  map({case(k,v)=>(v,k)}).
  sortByKey(false).
  collect()