# 302 Spark optimizations

The goal of this lab is to understand some of the optimization mechanisms of Spark.

- [Spark programming guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html)
- [RDD APIs](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html)
- [PairRDD APIs](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/PairRDDFunctions.html)

Let's start by setting the proper cluster configuration. In case of default setup (i.e., 2 [m5.xlarge](https://aws.amazon.com/it/ec2/instance-types/m5/) machines with 4 cores and 16 GB of RAM each):

- 2 executors with 3 cores each (leave 1 for daemons; and there's also the AMP)
- 8G of memory per executor (slide calculations would recommend 11G, but it exceeds YARN's default maximum allowed in this EMR cluster)

Syntax details [here](https://aws.amazon.com/it/premiumsupport/knowledge-center/modify-spark-configuration-emr-notebook/)

In [None]:
%%configure -f
{"executorMemory":"8G", "numExecutors":2, "executorCores":3, "conf": {"spark.dynamicAllocation.enabled": "false"}}

In [None]:
val bucketname = "unibo-bd2122-egallinucci"

val path_weather = "s3a://"+bucketname+"/datasets/weather-sample1.txt"
val path_weather_full = "s3a://"+bucketname+"/datasets/weather-sample10.txt"
val path_stations = "s3a://"+bucketname+"/datasets/weather-stations.csv"

"SPARK UI: Enable forwarding of port 20888 and connect to http://localhost:20888/proxy/" + sc.applicationId + "/"

In [None]:
//Initialize the following objects to give some structure to the weather RDDs.

case class WeatherData(
  usaf:String,
  wban:String,
  year:String,
  month:String,
  day:String,
  temperature:Double,
  validTemperature:Boolean
)

object WeatherData {
    def extract(row:String) = {
        val usaf = row.substring(4,10)
        val wban = row.substring(10,15)
        val year = row.substring(15,19)
        val month = row.substring(19,21)
        val day = row.substring(21,23)
        val airTemperature = row.substring(87,92)
        val airTemperatureQuality = row.charAt(92)

        new WeatherData(usaf,wban,year,month,day,airTemperature.toInt/10,airTemperatureQuality == '1')
    }
}

case class StationData(
  usaf:String,
  wban:String,
  name:String,
  country:String,
  state:String,
  call:String,
  latitude:Double,
  longitude:Double,
  elevation:Double,
  date_begin:String,
  date_end:String
)

object StationData {
  def extract(row:String) = {
    def getDouble(str:String) : Double = {
      if (str.isEmpty)
        return 0
      else
        return str.toDouble
    }
    val columns = row.split(",").map(_.replaceAll("\"",""))
    val latitude = getDouble(columns(6))
    val longitude = getDouble(columns(7))
    val elevation = getDouble(columns(8))
    new StationData(columns(0),columns(1),columns(2),columns(3),columns(4),columns(5),latitude,longitude,elevation,columns(9),columns(10))
  }
}



## 302-1 Simple job optimization

Optimize the two jobs (avg temperature and max temperature) by avoiding the repetition of the same computations and by defining a good number of partitions.

Hints:
- Verify your persisted data in the web UI
- Use either ```repartition()``` or ```coalesce()``` to define the number of partitions
  - ```repartition()``` shuffles all the data
  - ```coalesce()``` minimizes data shuffling by exploiting the existing partitioning
- Verify the execution plan of your RDDs with ```rdd.toDebugString``` (shell only) or on the web UI

In [None]:
val rddWeather = sc.textFile(path_weather_full).map(WeatherData.extract)

// Average temperature for every month
rddWeather.
    filter(_.temperature<999).
    map(x => (x.month, x.temperature)).
    aggregateByKey((0.0,0.0),1)((a,v)=>(a._1+v,a._2+1),(a1,a2)=>(a1._1+a2._1,a1._2+a2._2)).
    map({case(k,v)=>(k,v._1/v._2)}).
    collect()

// Maximum temperature for every month
rddWeather.
    filter(_.temperature<999).
    map(x => (x.month, x.temperature)).
    reduceByKey((x,y)=>{if(x<y) y else x},1).
    collect()

### Solution

Partitioning:
- Consider 6 cores available in total
- By default there are 42 partitions (i.e., 7 partitions per core), 32MB each
  - No skewness --> no need to increase the number of partitions
  - Task execution times are above 100-200ms -> no need to reduce the number of partitions
  - Reducing to 12 partitions (i.e., 2 partitions per core, 64MB each) doesn't make much of a change in this example
- After the shuffle, 42 partitions remain (due to the heritage mechanism)
  - As we know that the aggregations greatly reduce the number of records, a fewer number of partitions can be specificed in the ```aggregateByKey``` and ```reduceByKey``` transformations

Caching:
- It is good to cache the RDD **after** the common transformations (including the coalescing) have been carried out
- Then, remember to reference the cached RDD

Check cached RDD in the Spark UI

Coalesce vs repartition:
- Coalesce aggregates partitions without shuffling; repartition forces a shuffle
- None of them associates a partitioning criteria to the RDD

In [None]:
val rddWeather = sc.textFile(path_weather_full).map(WeatherData.extract)

val cachedRdd = rddWeather.filter(_.temperature<999).coalesce(12).map(x => (x.month, x.temperature)).cache()

// Average temperature for every month
cachedRdd.aggregateByKey((0.0,0.0))((a,v)=>(a._1+v,a._2+1), (a1,a2)=>(a1._1+a2._1,a1._2+a2._2)).map({case(k,v)=>(k,v._1/v._2)}).collect()
// Maximum temperature for every month
cachedRdd.reduceByKey((x,y)=>{if(x<y) y else x}).collect()

rddWeather.coalesce(12).partitioner
rddWeather.repartition(12).partitioner

## 302-2 RDD preparation

Check the five possibilities to prepare the Station RDD for subsequent (multiple) processing and identify the best one.

In [None]:
import org.apache.spark.HashPartitioner
val p = new HashPartitioner(8)

val rddStation = sc.textFile(path_stations).map(StationData.extract)

val rddS1 = rddStation.
  keyBy(x => x.usaf + x.wban).
  partitionBy(p).
  cache().
  map({case (k,v) => (k,(v.country,v.elevation))})
val rddS2 = rddStation.
  keyBy(x => x.usaf + x.wban).
  map({case (k,v) => (k,(v.country,v.elevation))}).
  cache().
  partitionBy(p)
val rddS3 = rddStation.
  keyBy(x => x.usaf + x.wban).
  partitionBy(p).
  map({case (k,v) => (k,(v.country,v.elevation))}).
  cache()
val rddS4 = rddStation.
  keyBy(x => x.usaf + x.wban).
  map({case (k,v) => (k,(v.country,v.elevation))}).
  partitionBy(p).
  cache()
val rddS5 = rddStation.
  map(x => (x.usaf + x.wban, (x.country,x.elevation))).
  partitionBy(p).
  cache()

### Solution

- keyBy() and map() break the partitioning, thus they must be issued before partitionBy()
- Anything that happens after cache() is not saved and must be recomputed each time;
  thus, it is good practice to cache() as later as possible
- rddS4 and rddS5 are the best options (the latter being less verbose)

## 302-3 Joining RDDs

Define the join between rddWeather and rddStation and compute:
- The maximum temperature for every city
- The maximum temperature for every city in the UK: 
  - ```StationData.country == "UK"```
- Sort the results by descending temperature
  - ```map({case(k,v)=>(v,k)})``` to invert key with value and vice versa

Hints & considerations:
- Keep only temperature values <999
- Join syntax: ```rdd1.join(rdd2)```
  - Both RDDs should be structured as key-value RDDs with the same key: usaf + wban
- Consider partitioning and caching to optimize the join
  - Careful: it is not enough for the two RDDs to have the same number of partitions; they must have the same partitioner!
- Verify the execution plan of the join in the web UI

In [None]:
val rddWeather = sc.textFile(path_weather).map(WeatherData.extract)
val rddStation = sc.textFile(path_stations).map(StationData.extract)

### Solution

In [None]:
import org.apache.spark.HashPartitioner

val p = new HashPartitioner(14)

val rddWeather = sc.textFile(path_weather).map(WeatherData.extract)
val rddStation = sc.textFile(path_stations).map(StationData.extract)

val rddS = rddStation.keyBy(x => x.usaf + x.wban).partitionBy(p)
val rddW = rddWeather.filter(_.temperature<999).keyBy(x => x.usaf + x.wban).partitionBy(p)

val rddJoin = rddW.join(rddS).cache()
rddJoin.toDebugString

rddJoin.map({case(k,v)=>(v._2.name,v._1.temperature)}).reduceByKey((x,y)=>{if(x<y) y else x}).collect()
rddJoin.filter(_._2._2.country=="UK").map({case(k,v)=>(v._2.name,v._1.temperature)}).reduceByKey((x,y)=>{if(x<y) y else x}).collect()
rddJoin.filter(_._2._2.country=="UK").map({case(k,v)=>(v._2.name,v._1.temperature)}).reduceByKey((x,y)=>{if(x<y) y else x}).map({case(k,v)=>(v,k)}).sortByKey(false).collect()

## 302-4 Memory occupation

Use Spark's web UI to verify the space occupied by the provided RDDs.

In [None]:
import org.apache.spark.storage.StorageLevel._
val rddWeather = sc.textFile(path_weather_full).map(WeatherData.extract)

sc.getPersistentRDDs.foreach(_._2.unpersist())

val memRdd = rddWeather.repartition(14).cache()
val memSerRdd = memRdd.map(x=>x).persist(MEMORY_ONLY_SER)
val diskRdd = memRdd.map(x=>x).persist(DISK_ONLY)

### Solution

- Collecting/Saving to file is required to trigger the (lazy) evaluation
- memSerRdd's size will be less than half of memRdd's
- diskRdd's size will be approximately the same as memSerRdd (output to disk is always serialized)

In [None]:
memRdd.saveAsTextFile("s3a://"+bucketname+"/tmp/rddW-m")
memSerRdd.saveAsTextFile("s3a://"+bucketname+"/tmp/rddW-md")
diskRdd.saveAsTextFile("s3a://"+bucketname+"/tmp/rddW-d")

## 302-5 Evaluating different join methods

Consider the following scenario:
- We have a disposable RDD of Weather data (i.e., it is used only once): ```rddW```
- And we have an RDD of Station data that is used many times: ```rddS```
- Both RDDs are cached (```collect()```is called to enforce caching)

We want to join the two RDDS. Which option is best?
- Simply join the two RDDs
- Enforce on ```rddW1``` the same partitioner of ```rddS``` (and then join)
- Exploit broadcast variables

In [None]:
import org.apache.spark.HashPartitioner

val rddWeather = sc.textFile(path_weather_full).map(WeatherData.extract)
val rddStation = sc.textFile(path_stations).map(StationData.extract)

val p = new HashPartitioner(14)

val rddW = rddWeather.
  filter(_.temperature<999).
  keyBy(x => x.usaf + x.wban).
  cache()
val rddS = rddStation.
  keyBy(x => x.usaf + x.wban).
  partitionBy(p).
  cache()

// Collect to enforce caching
rddW.saveAsTextFile("s3a://"+bucketname+"/tmp/rddW")
rddS.saveAsTextFile("s3a://"+bucketname+"/tmp/rddS")

// Is it better to simply join the two RDDs..
rddW.
  join(rddS).
  map({case(k,v)=>(v._2.name,v._1.temperature)}).
  reduceByKey((x,y)=>{if(x<y) y else x}).
  collect

// ..to enforce on rddW1 the same partitioner of rddS..
rddW.
  partitionBy(p).
  join(rddS).
  map({case(k,v)=>(v._2.name,v._1.temperature)}).
  reduceByKey((x,y)=>{if(x<y) y else x}).
  collect()

// // ..or to exploit broadcast variables?
val bRddS = sc.broadcast(rddS.map(x => (x._1, x._2.name)).collectAsMap())
val rddJ = rddW.
  map({case (k,v) => (bRddS.value.get(k),v.temperature)}).
  filter(_._1!=None)
rddJ.
  reduceByKey((x,y)=>{if(x<y) y else x}).
  collect()

### Solution

The first and second options both take approximately the same time
  - The first one shuffles less data
  - The second one allows skipping unnecessary partitions from rddS
  
The third option is faster
  - rddW data is not shuffled
  - The downside is the increased space occupation in the executors' memory

## 302-6 Optimizing Exercise 3

Start from the result of Exercise 3; is there a more efficient way to compute the same result?

In [None]:
val rddWeather = sc.textFile(path_weather_full).map(WeatherData.extract)
val rddStation = sc.textFile(path_stations).map(StationData.extract)

val rddW = rddWeather.filter(_.temperature<999).keyBy(x => x.usaf + x.wban).cache()
val rddS = rddStation.keyBy(x => x.usaf + x.wban).cache()

val rdd6a = rddW.
  join(rddS).
  filter(_._2._2.country=="UK").
  map({case(k,v)=>(v._2.name,v._1.temperature)}).
  reduceByKey((x,y)=>{if(x<y) y else x}).
  map({case(k,v)=>(v,k)}).
  sortByKey(false).
  collect()

### Solution

In [None]:
// First version
val rdd6a = rddW.
  join(rddS).filter(_._2._2.country=="UK").
  map({case(k,v)=>(v._2.name,v._1.temperature)}).
  reduceByKey((x,y)=>{if(x<y) y else x}).map({case(k,v)=>(v,k)}).
  sortByKey(false).
  collect

In [None]:
// Pushing down filters is always a good move
val rdd6b = rddW.
  join(rddS.filter(_._2.country=="UK")).
  map({case(k,v)=>(v._2.name,v._1.temperature)}).
  reduceByKey((x,y)=>{if(x<y) y else x}).
  map({case(k,v)=>(v,k)}).
  sortByKey(false).
  collect

In [None]:
// Pushing down aggregations is even better
val rdd6c = rddW.
  map({case(k,v)=>(k,v.temperature)}).
  reduceByKey((x,y)=>{if(x<y) y else x}).
  join(rddS).filter(_._2._2.country=="UK").
  map({case(k,v)=>(v._1,v._2.name)}).
  sortByKey(false).
  collect

In [None]:
// Best option: pushing down both filters and aggregations
val rdd6d = rddW.
  map({case(k,v)=>(k,v.temperature)}).
  reduceByKey((x,y)=>{if(x<y) y else x}).
  join(rddS.filter(_._2.country=="UK")).
  map({case(k,v)=>(v._1,v._2.name)}).
  sortByKey(false).
  collect