<style>
  h1, h2, h3, h4, h5, p, ul, li {
    color: #2C475C;
  }
  .output_html {
    color: skyblue;
  }
  hr { height: 2px; color: lightblue; }
</style>

# Spark 101

In [ ]:
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._


### First create a dataset using the local `syslog` file

We will 

*  load the file
*  convert each line keeping its size
*  remove the duplicates

For that, we'll use the `sparkContext`, which

* is the driver
* can define job (read inputs, transform, group, etc)
* constructs DAG
* schedules tasks on the cluster

In [ ]:
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._

val dta:RDD[Int] = sparkContext.textFile("/Users/wtnull/Downloads/analysis/beijinghouse.csv")
                               .map(_.size)
                               .distinct

<console>:42: error: not found: value sparkContext
       val dta:RDD[Int] = sparkContext.textFile("/Users/wtnull/Downloads/analysis/beijinghouse.csv")
                          ^


**MappedRDD** is actually an instance of `RDD[Int]` that will contain the distinct sizes of the lines.

_Note_: there is NO computations happening! → [see UI](http://localhost:4040/stages/)

-----

### Now we can use the size for fancy operations like grouping per last digit

In [ ]:
val rdd1:RDD[(Int, Iterable[Int])] = dta.groupBy(_ % 10)

<console>:67: error: not found: value dta
       val rdd1:RDD[(Int, Iterable[Int])] = dta.groupBy(_ % 10)
                                            ^


### But we can also get rid of even sizes (... non trivially...), then _tupling_ with some other computations

In [ ]:
val rdd2 = dta.map(_ + 1)
              .filter(_ % 2 == 0)
              .map(x => (x%10, x*x))

-----

### We can combine distributed datasets into single ones, by _joining_ them for instance.

In [ ]:
val joined = rdd1.join(rdd2)

_Note (again)_: still nothing done on the cluster up to here → [see ui](http://localhost:4040/stages/)

-----

#### Now we ask the cluster to do the whole thing: Action

In [ ]:
joined.take(10).toList.mkString("\n")

_Note (yeah)_: NOW there were some computations in the cluster → [see stages](http://localhost:4040/stages/) and [see tasks](http://localhost:4040/stages/stage/?id=3&attempt=0)

-----

## But what just happened?

### First Spark created a DAG based on the job definition

In [ ]:
joined.toDebugString

### Then it scheduled it to the executors in the cluster <small>only one when running in local mode<small>

We can check the <strong>Total tasks</strong> activity in the [UI](http://localhost:4040/executors/)

-------

## Now we will prepare the dataset and then using it several times

So we'll read a file about stock price per day, so let's create a type holding relevant data.

In [ ]:
case class Quote(stock:String, date:String, price:Double) extends java.io.Serializable

The file will contain lines like:
``` 
ASTE,2011-12-06,33.93
ASTE,2012-03-14,36.84
```

Let's download the data first

In [ ]:
import sys.process._
"mkdir -p /tmp/data"!!

if (!new java.io.File("/tmp/data/closes.csv").exists)
  "wget https://s3-eu-west-1.amazonaws.com/spark-notebook-data/closes.csv -O /tmp/data/closes.csv"!!

In [ ]:
:sh du -h /tmp/data/closes.csv

In [ ]:
val closes:RDD[Quote] = sparkContext.textFile("/tmp/data/closes.csv")
                                   .map(_.split(",").toList)
                                   .map{ case s::d::p::Nil => Quote(s, d, p.toDouble)}

We have date, so we can group stock prices per day

In [ ]:
val byDate = closes.keyBy(_.date)

Now we can compute the minimum stocks per date

In [ ]:
def minByDate = byDate.combineByKey[(String, Double)](                                                                                           // `def` to force spark recomputing... otherwise it's smart enough to reuse previous RDDs...
  (x:Quote) => (x.stock, x.price), 
  (d:(String, Double), l:Quote) => if (d._2 < l.price) d else (l.stock, l.price),
  (d1:(String, Double), d2:(String, Double)) => if (d1._2 < d2._2) d1 else d2
)

In [ ]:
<pre>{minByDate.take(2).toList.mkString("\n")}</pre>

It took ~2 seconds (in local[8] and 24G of RAM)

In [ ]:
<pre>{minByDate.take(2).toList.mkString("\n")}</pre>

Once again.... 2 seconds!!!

#### Solution: caching!

In [ ]:
val maxByDate2 = byDate.combineByKey[(String, Double)](
  (x:Quote) => (x.stock, x.price), 
  (d:(String, Double), l:Quote) => if (d._2 > l.price) d else (l.stock, l.price),
  (d1:(String, Double), d2:(String, Double)) => if (d1._2 > d2._2) d1 else d2
)

maxByDate2.cache()

Ask some data

In [ ]:
<pre>{maxByDate2.take(2).toList.mkString("\n")}</pre>

**Go to [UI](http://localhost:4040/storage/)**

In [ ]:
<pre>{maxByDate2.take(2).toList.mkString("\n")}</pre>

**BLAZING FAST** => Reuses the cache!