[Spark programming guide](http://spark.apache.org/docs/latest/programming-guide.html#accumulators)

## Setup

In [None]:
docker-machine start
(docker-machine env)

In [None]:
@FOR /f "tokens=*" %i IN ('docker-machine env') DO @%i

In [None]:
docker run -it bigdatauniversity/spark bash

In [None]:
/etc/bootstrap.sh

In [None]:
spark-shell

## Restart

In [None]:
#If you exit from Docker Container, you can always restart and attach to it later by running the below:
docker start  bdu_spark2 
docker attach bdu_spark2

#Start a new command in a running container
docker exec -it bdu_spark2 <command>

## Use zeppeling

In [None]:
https://hub.docker.com/r/bigdatauniversity/spark2/

## Initializing Spark - Scala

In [None]:
SparkConf contains information about application

## Passing functions to Spark

- Anonymous function syntax:
    (x: Int) => x + 1
- Static methods
    object MyFunctions {
        def func1 (s: String): String = (...)
    }
- Passing by reference
    val field = "Hello"
    

## Create an RDD

In [None]:
RDD = Resilient Distributed Dataset

In [None]:
val data = 1 to 1000

In [None]:
val distData = sc.parallelize(data)

## Cache

Use the cache when you want to do repeat actions after transformations.

## Transformations

- map(func)
   - Return new dataset formed by passing each element of source through func
- filter(func)
   - Return new dataset by selecting those elements of the source on which func returns true
- flatMap(func)
   - Similar to map, but func can return a Seq rather than a single item
- join
- reduceByKey


## Actions

- collect()
- count()
- first()
- take(n)
- foreach(func)

## Persistence

- persist()
- cache()

## Shared variables

### Broadcast variables

Used when you have a large dataset that you want to use acress all the worker nodes. Instead of sending all data, just send the variable

In [None]:
val broadcastVar = sc.broadcast(Array(1,2,3))

### Accumulators

Run **counters** and **sum** efficiently in parallel

# Scheduler

Fair scheduler appropriate for multi-user

In [None]:
spark.scheduler.mode = FAIR // (default FIFO (first in first out))

### SQL Example

#### Infer schema using reflection

In [None]:
case class Person(name: String, age: Int)

In [None]:
val people = sc.textFile("examples/people.txt")
.map(_.split(","))
.map(p => Person(p(0), p(1).trim.toInt))

In [None]:
people.registerTempTable("people")

In [None]:
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

In [None]:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

#### Programmatic interface

In [None]:
val people = sc.textFile(..)

In [None]:
val schemaString = "name age"

In [None]:
val schema = StructType( schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, true)))

In [None]:
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
val peopleSchemaRDD = sqlContext.applySchema(rowRDD, schema)

In [None]:
peopleSchemaRDD.registerTempTable("people")

In [None]:
val results = sqlContext.sql("SELECT name FROM people")
results.map(t => "Name: " + t(0)).collect().foreach(println)

## Troubleshoot "Task not serializable"

In [None]:
class MyHelper extends Serializable{ //extends Serializable is important
    def doSomething (input:String):String = inner.doStuff(input)
}
val helper = new MyHelper()
val output = input1.map(helper.doSomething(_))

# Questions

In [None]:
What does _ in .. mean?
input.map(_.split(",")).count()