 
### Transformations

def: *Operations on RDD that return a new RDD, among them map/filter*

In [1]:
%spark
val in = sc.textFile("/home/bane/projects/data/log.txt")
val err = in.filter(l => "err" contains l)
val war = in.filter(l => "warning" contains l)
val bads = err.union(war)

 #### **Map**

In [3]:
%spark
val in = sc.parallelize(List(1,2,3,4))
val res = in.map(x => x * x)
println(s"Result is: ${res.collect().mkString(",")}")

 #### **FlatMap**

In [5]:
%spark
val in = sc.parallelize(List("hello world", "hi"))
val words = in.flatMap(l => l.split(" "))
words.first

 #### **Pseudo set ops**

#### *Distinct + shuffle*

rdd.distinct

#### *Union - shuffle*

rdd.union(rdd2)

#### *Intersection + shuffle*

rdd.intersection(rdd2)

#### *Subtract + shuffle*

rdd.subtract(rdd2)

### Actions
def: *Operations returning the result to driver and performing computation aka count/first*

In [16]:
%spark
println(s"Corrupt input ${bads.count} concerning lines")
println(s"Here are 2 examples:")
bads.take(2).foreach(println)

#### *Reduce*

 
val sum = rdd.reduce((x, y) => x + y)

#### *Aggregate*

val res = in.aggregate((0, 0),
                       (acc, value) => (acc._1 + value, acc._2 + 1),
                       (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
val avg = res._1 / res._2.toDouble

#### *Collect*
* returns entire RDD
* has to fit in mem of one machine

#### *Take(n)*
* returns n elems RDD
* attempts to minimize num of partitions it accesses
=> elems not in sequential order

#### *top*
* returns n elems RDD
* elems in sequential order

#### *takeSample(withReplacement, num, seed)*
* random data sampling

#### *foreach*
* does not return values locally
* performs computation say inserting records to Db or by posting JSON

#### *countByValue*
* returns a map of each unique val to its count

### Passing functions to Spark

In [28]:
%spark
import org.apache.spark.rdd.RDD

class SearchFunction(val query: String){
    def isMatch(s: String): Boolean = {
        s.contains(query)
    }
    def getMatchesFunctionReference(rdd: RDD[String]): RDD[String] = {
        rdd.map(isMatch)
    }
    def getMatcheesFieldReference(rdd: RDD[String]): RDD[String] = {
        rdd.map(x => x.split(query))
    }
    def getMatchesNotReference(rdd: RDD[String]): RDD[String] = {
        val query_ = this.query
        rdd.map(x => x.split(query_))
    }
}

### Converting between RDD types

*Mean, Variance*

In [31]:
%spark
import org.apache.spark.SparkContext._

### Persistence (Caching)

#### *Avoid recomputing*
val res = in.map(x => x * x)
println(res.count)
println(res.collect.mkString(",")

* nodes store data of partitions
* recompute lost data on nodes
* allows replicating partitions to avoid slowdown

import org.apache.spark.storage.StorageLevel
val result in.map(x => x * x)
res.persist(StorageLevel.DISK_ONLY)
println(res.count)
println(res.collect.mkString(","))

#### *Persistance levels*
* Level/Space/Cpu time/In mem/On disk/Comments
* MEMORY_ONLY/High/Low/Y/N
* MEMORY_ONLY_SER/Low/High/Y/N
* MEMORY_AND_DISK/High/Med/Some/Some/Splits to disk on MEM overflow
* MEMORY_AND_DISK_SER/Low/High/Some/Some/Stores serialized repr in MEM
* DISK_ONLY/Low/High/N/Y