# What you already know: HOFs and Scala collections

I assume you already know the basics of Scala and higher-order functions, at least as they are used in the Scala Collections library. 

In [None]:
// example of List processing with map, filter, etc. 

List(1,2,3,4)
    .map(_ + 1)
    .filter(_ % 2 == 0)
    .reduce(_ + _)

# Spark: standalone setup

Spark is a distributed processing framework for transforming big data sets using the computational power of a dedicated cluster. But we can use Spark in an standalone mode (i.e. with no cluster at all), for testing or pedagogical purposes. In that case, we just exploit the cores of your local processor.

### Create the Spark session

The Spark session is the entry point to the Spark interpreter. We need it for running Spark programs.

In [None]:
// Create a Spark session in standalone mode

import $ivy.`org.apache.spark::spark-sql:2.4.3` 
import $ivy.`sh.almond::almond-spark:0.6.0`

import org.apache.spark.sql.{NotebookSparkSession, SparkSession}

val spark: SparkSession = 
    NotebookSparkSession
      .builder()
      .master("local[*]")
      .getOrCreate()


### Logging configuration

This is convenient to minimize the amount of info displayed in the terminal.

In [None]:
import org.slf4j.LoggerFactory
import org.apache.log4j.{Level, Logger}
Logger.getRootLogger().setLevel(Level.ERROR)

### Useful imports

In [None]:
import spark.implicits._
import org.apache.spark.sql.{functions => func, _}
import org.apache.spark.sql.types._

# Your first Spark program 

The following program is an example of a _Dataset_ program. The Dataset API is one of the languages for distributed data processing that the Spark framework provides. We will omit reference in this notebook to other APIs such as RDDs, DataFrames, etc.

In [None]:
List(1,2,3,4).toDS
    .map(_ + 1)
    .filter(i => i % 2 == 0)
    .reduce(_ + _)

In principle, this Dataset program does not differ significantly from the Scala collection program. Syntactically, the only difference appears to be the `.toDS` expression: 

In [None]:
List(1,2,3,4).toDS

But, of course, there are several major differences.

# First difference: performance

Let's define a heavy computation:

In [None]:
def heavyComp(ms: Int = 1000)(x: Int) = {
  Thread.sleep(ms)
  x+1
}

and a way to measure execution time:

In [None]:
def run[A](code: => A): A = {
    val start = System.currentTimeMillis()
    val res = code
    println(s"Took ${System.currentTimeMillis() - start}")
    res
}

The following Scala Collection program takes some time to execute:

In [None]:
run(List(1,2,3,4).map(heavyComp()).reduce(_ + _))

However, the equivalent Dataset program takes half time (or less time depending on the number of cores of your processor)!

In [None]:
run(List(1,2,3,4).toDS.map(heavyComp()).reduce(_ + _))

The Dataset program run faster because the Spark framework is designed to take advantage of the parallel and distributed architecture of your computing infrastructure. In our case, it simply exploits the number of cores of your processor.

However, note that using Spark to parallelize your code is overkill. If you are not in a truly distributed setting, you can get along the same benefits more simply using the parallel collections framework of the Scala standard library: 

In [None]:
run(List(1,2,3,4).par.map(heavyComp()).reduce(_ + _))

# Second difference: _laziness_

Let's compare this Scala collection transformation:

In [None]:
val result: List[Int] = List(1,2,3,4).map(_ + 1)

with the following Dataset one:

In [None]:
val program: Dataset[Int] = List(1,2,3,4).toDS.map(_ + 1)

We obtain no transformation at all! Dataset programs are that: _programs_. We won't find any data in an instance of `Dataset`, just a program or _generator_ of a data set. A `Dataset` declares a number of _transformations_ that will be eventually enacted with specific _actions_. For instance, using `collect`:

In [None]:
val result: Array[Int] = program.collect

or `reduce`:

In [None]:
val result2: Int = program.reduce(_ + _)

Spark Datasets are said to be *lazy*, because we don't inmediately obtain an answer. Rather, we _declaratively_ specify a number of transformations to be applied, and wait until a specific action interprets the transformation program to obtain the desired result. And the same program may be interpreted differently: we may simply want to execute the transformations using `collect`, or may want to perform some calculation using `reduct`. This difference between _transformations_ and _actions_ is reflected very precisely in the [Dataset API](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset).  

### Inspecting the structure of a Spark program

A `Dataset` is a program that it's compiled into a lower-level program before it can be actually executed. The compiler of datasets is called _catalyst_. We can inspect the execution plan that is generated for a particular dataset using `explain`:

In [None]:
List(1,2,3,4).toDS.map(_ + 1).explain

The execution plan of a dataset is in turn compiled into an `RDD`, i.e. a lower-level abstraction program:

In [None]:
List(1,2,3,4).toDS.map(_ + 1).rdd.toDebugString

### Spark and functional programming

This distinction lies also at the heart of functional programming. On the one hand, there are _programs_ written in a DSL. On the other, there are _interpreters_ that run this program according to different semantics. This is also reflected in the Scala Collections API, particularly, in the notion of _views_. For instance, the following transformation is similarly _lazy_: 

In [None]:
List(1,2,3,4).view.map(heavyComp())

The transformation is only executed when we execute the view using, e.g. `toList`: 

In [None]:
List(1,2,3,4).view.map(heavyComp()).toList

`SeqView`s are *programs*, much in the same way as `Dataset` objects, whereas `toList` is an *action*, equivalent to the `Dataset` actions `collect` and `reduce`.

Similarly, Scala _iterators_ are also good examples of *lazyness*. When we create an iterator from a collection, as in:

In [None]:
val it: Iterator[Int] = List(1,2,3,4).iterator

and specify a number of transformations:

In [None]:
val it2: Iterator[Int] = 
    List(1,2,3,4).iterator.map(_ + 1).filter(_ % 2 == 0)

we don't inmediately obtain those transformations. We are just creating a new iterator that will generate the correspoding data when we ask for it:

In [None]:
it2.toList

Actually, there is a close relationship between Spark RDDs (the transformation language in which `Dataset`s are actually translated into), and iterators. 

# Third difference: the execution framework

When an action is applied on a `Dataset` program a _job_ is executed by the distributed platform of Spark through a sequence of *stages*; in each stage, the work to be done is performed concurrently by a number of _tasks_. 

The so-called [Spark UI](http://localhost:4040/) allows us to debug the execution process of all the jobs that are submitted for execution through a given Spark session. For instace, the following action launches a job that can be inspected through the Spark UI: 

In [None]:
val ds: Dataset[Int] = List(1,2,3,4).toDS.map(heavyComp())

In [None]:
ds.collect

Each bar in the notebook execution corresponds to one stage of the job exectuion; the X/Y label in each bar indicates the number of tasks already executed (X) and the total number of tasks of that stage (Y). 

We can access to the work performed by tasks in each partition through `foreachPartition`:

In [None]:
ds.foreachPartition{ it : Iterator[Int] => 
    println(s"Task output: " + it.toList)
}

The number of tasks scheduled for each stage depends on the number of partitions associated to the dataset. When the dataset is first created from a Scala collection, the number of partitions defaults to the number of cores specified when the Spark context was created. 

In [None]:
List(1,2,3,4).toDS.rdd.getNumPartitions

The number of partitions can be set to a specific value using `repartition`: 

In [None]:
List(1,2,3,4,5,6,7,8,9,10,11,12).toDS
    .repartition(12)
    .map(heavyComp(2000))
    .collect

# Shuffling: narrow vs. wide transformations

Note that a new stage is created when the dataset is repartitioned. More commonly, new stages are created when so-called _wide_ transformations are interpreted. _Narrow_ transformations are those transformations which are not wide: `map`, `filter`, etc. For instance, this program will execute in one stage: 

In [None]:
List(1,2,3,4).toDS.map(_ + 1).collect

and the following one as well:

In [None]:
List(1,2,3,4).toDS.map(_ + 1).filter(_ % 2 == 0).collect

However, this one introduces a new stage:

In [None]:
// groupBy or some wide transformation

Why? where is the difference? What does a stage means? 

In [None]:
// TBD

# Dataset API: narrow transformations

# Dataset API: wide transformations