# What you already know: HOFs and Scala collections

I assume you already know the basics of Scala and higher-order functions, at least as they are used in the Scala Collections library. 

In [None]:
// example of List processing with map, filter, etc. 

List(1,2,3,4)
    .map(_ + 1)
    .filter(_ % 2 == 0)
    .reduce(_ + _)

# Spark: standalone setup

Spark is a distributed processing framework for transforming big data sets using the computational power of a dedicated cluster. But we can use Spark in an standalone mode (i.e. with no cluster at all), for testing or pedagogical purposes. In that case, we just exploit the cores of your local processor.

### Create the Spark session

The Spark session is the entry point to the Spark interpreter. We need it for running Spark programs.

In [None]:
// Create a Spark session in standalone mode

import $ivy.`org.apache.spark::spark-sql:2.4.5` 
import $ivy.`sh.almond::almond-spark:0.6.0`

import org.apache.spark.sql.{NotebookSparkSession, SparkSession}

val spark: SparkSession = 
    NotebookSparkSession
      .builder()
      .master("local[*]")
      .getOrCreate()


### Logging configuration

This is convenient to minimize the amount of info displayed in the terminal.

In [None]:
import org.slf4j.LoggerFactory
import org.apache.log4j.{Level, Logger}
Logger.getRootLogger().setLevel(Level.ERROR)

### Useful imports

In [None]:
import spark.implicits._
import org.apache.spark.sql.{functions => func, _}
import org.apache.spark.sql.types._

# Your first Spark program 

The following program is an example of a _Dataset_ program. The Dataset API is one of the languages for distributed data processing that the Spark framework provides. We will omit reference in this notebook to other APIs such as RDDs, DataFrames, etc.

In [None]:
Logger.getRootLogger().setLevel(Level.ERROR)

In [None]:
List(1,2,3,4).toDS
    .map(_ + 1)
    .filter(i => i % 2 == 0)
    .reduce(_ + _)

In principle, this Dataset program does not differ significantly from the Scala collection program. Syntactically, the only difference appears to be the `.toDS` expression: 

In [None]:
List(1,2,3,4).toDS

But, of course, there are several major differences.

# First difference: performance

Let's define a heavy computation:

In [None]:
def heavyComp(ms: Int = 1000)(x: Int): Int = {
  Thread.sleep(ms)
  x+1
}

and a way to measure execution time:

In [None]:
def run[A](code: => A): A = {
    val start = System.currentTimeMillis()
    val res = code
    println(s"Took ${System.currentTimeMillis() - start}")
    res
}

In [None]:
run(println("hola"))

The following Scala Collection program takes some time to execute:

In [None]:
run{
    List(1,2,3,4).map(heavyComp(): Int => Int).reduce(_ + _)
}

However, the equivalent Dataset program takes half time (or less time depending on the number of cores of your processor)!

In [None]:
run(
    List(1,2,3,4).toDS.map(heavyComp()).reduce(_ + _)
)

The Dataset program run faster because the Spark framework is designed to take advantage of the parallel and distributed architecture of your computing infrastructure. In our case, it simply exploits the number of cores of your processor.

However, note that using Spark to parallelize your code is overkill. If you are not in a truly distributed setting, you can get along the same benefits more simply using the parallel collections framework of the Scala standard library: 

In [None]:
run(List(1,2,3,4).par.map(heavyComp()).reduce(_ + _))

# Second difference: _laziness_

Let's compare this Scala collection transformation:

In [None]:
val result: List[Int] = List(1,2,3,4).map(_ + 1)

with the following Dataset one:

In [None]:
List(1,2,3,4).toDS.map(_ + 1).filter(_ % 2 == 0).reduce(_ + _)

In [None]:
val program: Dataset[Int] = List(1,2,3,4).toDS.map(_ + 1)

We obtain no transformation at all! Dataset programs are that: _programs_. We won't find any data in an instance of `Dataset`, just a program or _generator_ of a data set. A `Dataset` declares a number of _transformations_ that will be eventually enacted with specific _actions_. For instance, using `collect`:

In [None]:
val result: Array[Int] = program.collect
val result2: Int = program.reduce(_ + _)

or `reduce`:

In [None]:
val result2: Int = program.reduce(_ + _)

Spark Datasets are said to be *lazy*, because we don't inmediately obtain an answer. Rather, we _declaratively_ specify a number of transformations to be applied, and wait until a specific action interprets the transformation program to obtain the desired result. And the same program may be interpreted differently: we may simply want to execute the transformations using `collect`, or may want to perform some calculation using `reduct`. This difference between _transformations_ and _actions_ is reflected very precisely in the [Dataset API](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset).  

### Inspecting the structure of a Spark program

A `Dataset` is a program that it's compiled into a lower-level program before it can be actually executed. The compiler of datasets is called _catalyst_. We can inspect the execution plan that is generated for a particular dataset using `explain`:

In [None]:
List(1,2,3,4).toDS.map(_ + 1).explain

The execution plan of a dataset is in turn compiled into an `RDD`, i.e. a lower-level abstraction program:

In [None]:
// RDD = Resilient Distributed Dataset
val rdd: org.apache.spark.rdd.RDD[Int] = List(1,2,3,4).toDS.map(_ + 1).rdd
rdd.toDebugString

### Spark and functional programming

This distinction lies also at the heart of functional programming. On the one hand, there are _programs_ written in a DSL. On the other, there are _interpreters_ that run this program according to different semantics. This is also reflected in the Scala Collections API, particularly, in the notion of _views_. For instance, the following transformation is similarly _lazy_: 

In [None]:
// List(1,2,3,4).map(_ + 1).filter(_ % 2 == 0)

In [None]:
List(1,2,3,4).view.map(heavyComp())
    .filter{ i => 
        println("hola");
        i % 2 == 0
    }

The transformation is only executed when we execute the view using, e.g. `toList`: 

In [None]:
List(1,2,3,4).view.map(heavyComp()).toList

`SeqView`s are *programs*, much in the same way as `Dataset` objects, whereas `toList` is an *action*, equivalent to the `Dataset` actions `collect` and `reduce`.

Similarly, Scala _iterators_ are also good examples of *lazyness*. When we create an iterator from a collection, as in:

In [None]:
val it: Iterator[Int] = List(1,2,3,4).iterator

and specify a number of transformations:

In [None]:
val it2: Iterator[Int] = List(1,2,3,4).iterator
    .map{i => println("it"); i + 1}
    .filter{ i => println("it2"); i % 2 == 0 }
    //.toList

we don't inmediately obtain those transformations. We are just creating a new iterator that will generate the correspoding data when we ask for it:

In [None]:
it2.toList

Actually, there is a close relationship between Spark RDDs (the transformation language in which `Dataset`s are actually translated into), and iterators. 

# Third difference: the execution framework

When an action is applied on a `Dataset` program a _job_ is executed by the distributed platform of Spark through a sequence of *stages*; in each stage, the work to be done is performed concurrently by a number of _tasks_. 

The so-called [Spark UI](http://localhost:4040/) allows us to debug the execution process of all the jobs that are submitted for execution through a given Spark session. For instace, the following action launches a job that can be inspected through the Spark UI: 

In [None]:
val ds: Dataset[Int] = List(1,2,3,4).toDS.map(heavyComp())

In [None]:
ds.explain

In [None]:
ds.collect

Each bar in the notebook execution corresponds to one stage of the job exectuion; the X/Y label in each bar indicates the number of tasks already executed (X) and the total number of tasks of that stage (Y). 

We can get the work performed by tasks in each partition through `foreachPartition`:

In [None]:
ds.foreachPartition{ it : Iterator[Int] => 
    println(s"Task output: " + it.toList)
}

In [None]:
def collectPartition[T](ds: Dataset[T]) : Unit =
    ds.foreachPartition{ it: Iterator[T] =>
        println("Task output: " + it.toList)
    }

In [None]:
collectPartition(ds)

We will use this action quite frequently, so let's define an *extension method* for the `Dataset` type:

In [None]:
// Exxtension de metodos, solo un argumento: el objeto que quieres transformar
implicit class DatasetOps[T](ds: Dataset[T]){
    def collectPartitions: Unit = 
        ds.foreachPartition{it : Iterator[T] => println(it.toList)}
}

The number of tasks scheduled for each stage depends on the number of partitions associated to the dataset. When the dataset is first created from a Scala collection, the number of partitions defaults to the number of cores specified when the Spark context was created. 

In [None]:
List(1,2,3,4).toDS.rdd.getNumPartitions

The number of partitions can be set to a specific value using `repartition`: 

In [None]:
List(1,2,3,4,5,6,7,8,9,10,11,12).toDS
    .repartition(24)
    .map(heavyComp(2000))
    .collectPartitions

# Shuffling: narrow vs. wide transformations
//narrow: particiones de datos independientes
//wide transformations: particiones con datos dependiente a datos de otras particiones

Note that a new stage is created when the dataset is repartitioned. More commonly, new stages are created when so-called _wide_ transformations are interpreted. _Narrow_ transformations are those transformations which are not wide: `map`, `filter`, etc. For instance, this program will execute in one stage: 

In [None]:
List(("a", 1), ("b", 2), ("a", 3), ("d", 3), ("b", 4)).toDS
    .map{ case (key, value) => (key, value + 1) }
    .collect

and the following one as well:

In [None]:
List(("a", 1), ("b", 2), ("a", 3), ("d", 3), ("b", 4)).toDS
    .map{ case (key, value) => (key, value + 1) }
    .filter{ t => t._1 == "a" }
    .collect

However, this one introduces a new stage:

In [None]:
List(("a", 1), ("b", 2), ("a", 3), ("d", 3), ("b", 4)).toDS
    .map{ case (key, value) => (key, value + 1) }
    .groupByKey(_._1)
    .mapGroups((key, values) => (key, values.toList.map(_._2)))
    .collect

Why? Which difference between `filter` and `groupBy` creates such a need for a new stage? And why the next stage generates a dataset with 200 partitions? Let's answer these questions: 
* First, a new stage is created when data needs to be moved, or *shuffled*, between partitions. 
* Indeed, this is the case for `groupBy`.
* Last, 200 is the default number of partitions created when a shuffled is needed.

This value can be customised as follows:

In [None]:
spark.conf.set("spark.sql.shuffle.partitions", 8)

In [None]:
List(("a", 1), ("b", 2), ("a", 3), ("d", 3), ("b", 4)).toDS
    .map{ case (key, value) => (key, value + 1) }
    .groupByKey(_._1)
    .mapGroups((key, values) => (key, values.toList.map(_._2)))
    .collect

We can inspect the contents of the different partitions after each transformation:

In [None]:
List(("a", 1), ("e", 2), ("f", 3), ("d", 3), 
     ("z", 3), ("k", 3), ("i", 3), ("o", 3), 
     ("l",2), ("b", 2), ("a", 3), ("d", 3), 
     ("b", 4), ("e", 4)).toDS
    .collectPartitions

In [None]:
List(("a", 1), ("e", 2), ("f", 3), ("d", 3), ("z", 3), ("k", 3), ("i", 3), ("o", 3), ("l",2), ("b", 2), ("a", 3), ("d", 3), ("b", 4), ("e", 4)).toDS
    .map{ case (key, value) => (key, value + 1) }
    .collectPartitions

In [None]:
List(("a", 1), ("e", 2), ("f", 3), ("d", 3), ("z", 3), ("k", 3), ("i", 3), ("o", 3), ("l",2), ("b", 2), ("a", 3), ("d", 3), ("b", 4), ("e", 4)).toDS
    .map{ case (key, value) => (key, value + 1) }
    .groupByKey(_._1)
    .mapGroups((key, values: Iterator[(String,Int)]) => (key, values.toList.map(_._2)))
    .collectPartitions

How do Spark decides where to move the data?

In [None]:
List(("a", 1), ("e", 2), ("f", 3), ("d", 3), ("z", 3), ("k", 3), ("i", 3), ("o", 3), ("l",2), ("b", 2), ("a", 3), ("d", 3), ("b", 4), ("e", 4)).toDS
    .map{ case (key, value) => (key, value + 1) }
    .groupByKey(_._1)
    .mapGroups((key, values) => (key, values.toList.map(_._2)))
    .explain

# Narrow or wide?

The transformations in the Dataset API can be classified into narrow and wide transformations, systematically. We have already mentioned that `map` and `filter` belong to the former, and `groupByKey` to the latter. What about the following ones?

### `coalesce`
//Reducir numero de particiones

In [None]:
val ds = List(1,2,2,3,4,4,5,6,4,7,3,8).toDS

In [None]:
ds.collectPartitions

In [None]:
ds.coalesce(2).collectPartitions

### `dropDuplicates` (eliminar elementos duplicados)
//wide

In [None]:
ds.dropDuplicates.collectPartitions

### `flatMap`
//Narrow

In [None]:
ds.flatMap(i => List(i, -i)).collectPartitions

### `limit` (limitar el nº de elementos de salida)
//Narrow

In [None]:
ds.limit(6).collectPartitions

### `mapPartitions`

In [None]:
ds.mapPartitions{ it: Iterator[Int] => it.map(_ + 1) }.collectPartitions

In [None]:
implicit class DMap[T](da:Dataset[T])

### `repartition`

In [None]:
ds.repartition(2).collectPartitions

Compare it with `coalesce`:

In [None]:
ds.coalesce(2).collectPartitions

Differences can be "explained":

//Todo elemento de una particion se ha ido a otra particion (una) -> coalesce, por eso narrow

//Elementos de una particion se van a distintos particiones -> repartition, por eso Wide

In [None]:
ds.repartition(2).explain

In [None]:
ds.coalesce(2).explain

### `sort`

In [None]:
ds.sort("value").collectPartitions

In [None]:
ds.sort("value").collect

# What about transformations that relate several datasets?

Commonly, information is spread across several datasets, and the Spark Dataset API includes transformations to deal with this situation.

### `Union`

In [None]:
val ds1: Dataset[Int] = List(1,2,3,4).toDS.repartition(3)
val ds2: Dataset[Int] = List(5,6,7,8).toDS.repartition(6)

In [None]:
ds1.collectPartitions

In [None]:
ds2.collectPartitions

In [None]:
ds1.union(ds2).collectPartitions

No shuffle involved, just a single stage which makes the union of the different partitions.

### `Join`

In [None]:
object DS{
    case class Person(name: String, age: Int)
    case class Student(name: String, degree: String, year: Int)
}

import DS._

In [None]:
val people: Dataset[Person] = List(
    Person("Yihui", 20),
    Person("Noelia", 19),
    Person("Gabriel", 22),
    Person("Javier", 21)).toDS

In [None]:
val students: Dataset[Student] = List(
    Student("Yihui", "II", 2000),
    Student("Yihui", "M", 2001),
    Student("Noelia", "II", 2000),
    Student("Noelia", "IS", 2000),
    Student("Gabriel", "II", 2004),
    Student("Javier", "II", 2005),
    Student("Javier", "M", 2005)).toDS

In [None]:
people.collectPartitions

In [None]:
students.collectPartitions

In [None]:
people.join(students, "name").collectPartitions

Somewhat unexpectedly, there is no shuffle! This is because Spark performs the join following a "broadcast" strategy:

In [None]:
people.join(students, "name").explain

This happens when one of the datasets is small enough to be copied for each partition. We can force Spark to avoid broadcast as follows (just for testing purposes): 

In [None]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold",0)


In [None]:
people.join(students, "name").collectPartitions

# Caching

One of the most distinctive features of Spark is its ability to cache computations of datasets.

In [None]:
val ds5: Dataset[Int] = (0 to 1000).toDS.map(heavyComp(20))

In [None]:
run(ds5.count)
run(ds5.count)

Now with caching:

In [None]:
ds5.cache

In [None]:
run(ds5.count)
run(ds5.count)

We may instruct the Spark interpreter to not use cached data:

In [None]:
ds5.unpersist

In [None]:
run(ds5.count)

The method `cache` is not a pure transformation but a side-effectful operation. It just instructs the Spark interpreter to cache the dataset as soon as it's executed.  

In [None]:
val ds1: Dataset[Int] = List(1,2,3,4).toDS.map(heavyComp())
val ds1_cached: Dataset[Int] = ds1.cache

We may expect that the only cached dataset is `ds1_cached`, but that's not true:

In [None]:
run(ds1.count)
run(ds1.count)