# 103 Spark optimizations

The goal of this lab is to understand some of the optimization mechanisms of Spark.

- [Spark programming guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html)
- [RDD APIs](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html)
- [PairRDD APIs](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/PairRDDFunctions.html)

In [None]:
import org.apache.spark

In [None]:
// DO NOT EXECUTE - this is needed just to avoid showing errors in the following cells
val sc = spark.SparkContext.getOrCreate()

In [None]:
// WEATHER structure: (usaf,wban,year,month,day,airTemperature,airTemperatureQuality)
def parseWeather(row:String) = {
    val usaf = row.substring(4,10)
    val wban = row.substring(10,15)
    val year = row.substring(15,19)
    val month = row.substring(19,21)
    val day = row.substring(21,23)
    val airTemperature = row.substring(87,92)
    val airTemperatureQuality = row.charAt(92)

    (usaf,wban,year,month,day,airTemperature.toInt/10,airTemperatureQuality == '1')
}

// STATION structure: (usaf,wban,city,country,state,latitude,longitude,elevation,date_begin,date_end) 
def parseStation(row:String) = {
    def getDouble(str:String) : Double = {
        if (str.isEmpty)
            return 0
        else
            return str.toDouble
    }
    val columns = row.split(",").map(_.replaceAll("\"",""))
    val latitude = getDouble(columns(6))
    val longitude = getDouble(columns(7))
    val elevation = getDouble(columns(8))
    (columns(0),columns(1),columns(2),columns(3),columns(4),latitude,longitude,elevation,columns(9),columns(10))  
}

In [None]:
val rddWeather = sc.
  textFile("../../../../datasets/big/weather-sample10.txt").
  map(x => parseWeather(x))
val rddStation = sc.
  textFile("../../../../datasets/weather-stations.csv").
  map(x => parseStation(x))

## 103-1 Simple job optimization

Optimize the two jobs (avg temperature and max temperature) by avoiding the repetition of the same computations and by enforcing a partitioning criteria.
- There are multiple methods to repartition an RDD: check the ```coalesce```, ```partitionBy```, and ```repartition``` methods on the documentation and choose the best one.
  - To create a partitioning function, you must ```import org.apache.spark.HashPartitioner``` and then define ```val p = new HashPartitioner(n)``` where ```n``` is the number of partitions to create
- Verify your persisted data in the web UI
- Verify the execution plan of your RDDs with ```rdd.toDebugString``` (shell only) or on the web UI

In [None]:
// Average temperature for every month
rddWeather.
  filter(_._6<999).
  map(x => (x._4, x._6)).
  aggregateByKey((0.0,0.0))((a,v)=>(a._1+v,a._2+1), (a1,a2)=>(a1._1+a2._1,a1._2+a2._2)).
  map({case(k,v)=>(k,Math.round(v._1*100/v._2)/100.0)}).
  collect()

In [None]:
// Maximum temperature for every month
rddWeather.
  filter(_._6<999).
  map(x => (x._4, x._6)).
  reduceByKey((x,y)=>{if(x<y) y else x}).
  collect()

## 103-2 RDD preparation

Check the five possibilities to prepare the Station RDD for subsequent processing and identify the best one.

In [None]:
import org.apache.spark.HashPartitioner
val p2 = new HashPartitioner(8)

// _1 and _2 are the fields composing the key; _4 and _8 are country and elevation, respectively
val rddS1 = rddStation.
  keyBy(x => x._1 + x._2).
  partitionBy(p2).
  cache().
  map({case (k,v) => (k,(v._4,v._8))})
val rddS2 = rddStation.
  keyBy(x => x._1 + x._2).
  map({case (k,v) => (k,(v._4,v._8))}).
  cache().
  partitionBy(p2)
val rddS3 = rddStation.
  keyBy(x => x._1 + x._2).
  partitionBy(p2).
  map({case (k,v) => (k,(v._4,v._8))}).
  cache()
val rddS4 = rddStation.
  keyBy(x => x._1 + x._2).
  map({case (k,v) => (k,(v._4,v._8))}).
  partitionBy(p2).
  cache()
val rddS5 = rddStation.
  map(x => (x._1 + x._2, (x._4,x._8))).
  partitionBy(p2).
  cache()

## 103-3 Joining RDDs

Define the join between rddWeather and rddStation and compute:
- The maximum temperature for every city
- The maximum temperature for every city in the UK: 
  - ```StationData.country == "UK"```
- Sort the results by descending temperature
  - ```map({case(k,v)=>(v,k)})``` to invert key with value and vice versa

Hints & considerations:
- Keep only temperature values <999
- Join syntax: ```rdd1.join(rdd2)```
  - Both RDDs should be structured as key-value RDDs with the same key: usaf + wban
- Consider partitioning and caching to optimize the join
  - Careful: it is not enough for the two RDDs to have the same number of partitions; they must have the same partitioner!
- Verify the execution plan of the join in the web UI

## 103-4 Memory occupation

Use Spark's web UI to verify the space occupied by the provided RDDs.

In [None]:
import org.apache.spark.storage.StorageLevel._

sc.getPersistentRDDs.foreach(_._2.unpersist())

val memRdd = rddWeather.cache()
val memSerRdd = memRdd.map(x=>x).persist(MEMORY_ONLY_SER)
val diskRdd = memRdd.map(x=>x).persist(DISK_ONLY)

## 103-5 Evaluating different join methods

Consider the following scenario:
- We have a disposable RDD of Weather data (i.e., it is used only once): ```rddW```
- And we have an RDD of Station data that is used many times: ```rddS```
- Both RDDs are cached (```collect()```is called to enforce caching)

We want to join the two RDDS. Which option is best?
- Simply join the two RDDs
- Enforce on ```rddW1``` the same partitioner of ```rddS``` (and then join)
- Exploit broadcast variables

In [None]:
import org.apache.spark.HashPartitioner
val p = new HashPartitioner(8)
sc.getPersistentRDDs.foreach(_._2.unpersist())

val rddW = rddWeather.
  filter(_._6<999).
  keyBy(x => x._1 + x._2).
  persist()
val rddS = rddStation.
  keyBy(x => x._1 + x._2).
  partitionBy(p).
  cache()

// Collect to enforce caching
rddW.collect()
rddS.collect()

In [None]:
// Is it better to simply join the two RDDs..
rddW.
  join(rddS).
  map({case(k,v)=>(v._2._3,v._1._6)}).
  reduceByKey((x,y)=>{if(x<y) y else x}).
  collect

In [None]:
// ..to enforce on rddW1 the same partitioner of rddS..
rddW.
  partitionBy(p).
  join(rddS).
  map({case(k,v)=>(v._2._3,v._1._6)}).
  reduceByKey((x,y)=>{if(x<y) y else x}).
  collect()

In [None]:
// ..or to exploit broadcast variables?
val bRddS = sc.broadcast(rddS.map(x => (x._1, x._2._3)).collectAsMap())
val rddJ = rddW.
  map({case (k,v) => (bRddS.value.get(k),v._6)}).
  filter(_._1!=None)
rddJ.
  reduceByKey((x,y)=>{if(x<y) y else x}).
  collect()

## 103-6 Optimizing Exercise 3

Start from the result of the last job of Exercise 3; is there a more efficient way to compute the same result?
- Try it on weather-sample10
- Hint: consider that each station is located in only one country

In [None]:
import org.apache.spark.HashPartitioner
import org.apache.spark.storage.StorageLevel._
val p = new HashPartitioner(8)
sc.getPersistentRDDs.foreach(_._2.unpersist())

val rddS = rddStation.
  keyBy(x => x._1 + x._2).
  partitionBy(p).
  cache()
val rddW = rddWeather.
  filter(_._6<999).
  keyBy(x => x._1 + x._2).
  partitionBy(p).
  persist(MEMORY_AND_DISK_SER)

// Collect to enforce caching
rddW.collect()
rddS.collect()

In [None]:
// First version
rddW.
  join(rddS).
  filter(_._2._2._4=="UK").
  map({case(k,v)=>(v._2._3,v._1._6)}).
  reduceByKey((x,y)=>{if(x<y) y else x}).
  map({case(k,v)=>(v,k)}).
  sortByKey(false).
  collect()