#### Cheat Sheet Scala-Spark environment on Jupyter-lab


In [25]:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext


import org.apache.spark.SparkConf
import org.apache.spark.SparkContext


In [None]:
// Creare un Dataset, fornita una sequenza in ingresso

val data = Seq((1,2,3), (4,5,6), (6,7,8), (9,19,10))
val ds   = spark.createDataset(data)

ds.show()

In [11]:
// Generazione di un Resilient Distributed Dataset
val rdd      = sc.parallelize(List("ABC","BCD","DEF"))

// filter: new RDD by selecting those data elements on which func returns true
val filtered = rdd.filter(_.contains("C"))
// Finché non costretto da un'operazione Spark mantiene tutto in cache
// collect(): get all the data elements in an RDD as an array
filtered.collect()


rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:29
filtered: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at filter at <console>:32
res5: Array[String] = Array(ABC, BCD)


In [23]:
// Generazione di un Resilient Distributed Dataset
val rdd      = sc.parallelize(List(1,2,3,4,5))
// map: return new RDD by applying func on each data element
val times2   = rdd.map(_*2)
times2.collect()


rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at parallelize at <console>:29
times2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[33] at map at <console>:31
res17: Array[Int] = Array(2, 4, 6, 8, 10)


In [17]:
// Generazione di un Resilient Distributed Dataset
val rdd      = sc.parallelize(List("Spark is awesome”,”It is fun"))
// flatMap: Similar to map but func returns a Seq instead of a value. 
val fm       = rdd.flatMap(str=>str.split(" "))
// distinct: Eliminate duplicates from RDD
fm.distinct().collect()

rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[15] at parallelize at <console>:29
fm: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at flatMap at <console>:31
res11: Array[String] = Array(Spark, is, awesome”,”It, fun)


In [18]:
val word1   = fm.map(word=>(word,1))
// reduceByKey: to aggregate values of a key using a function. “numTasks” is an optional
// parameter to specify number of reduce tasks
val wrdCnt  = word1.reduceByKey(_+_)
wrdCnt.collect()

val cntWrd = wrdCnt.map{case (word, count) => (count, word)}
cntWrd.groupByKey().collect()

word1: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[20] at map at <console>:30
wrdCnt: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[21] at reduceByKey at <console>:33
cntWrd: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[22] at map at <console>:36
res12: Array[(Int, Iterable[String])] = Array((1,CompactBuffer(Spark, awesome”,”It, fun)), (2,CompactBuffer(is)))


In [28]:
// Generazione di un Resilient Distributed Dataset
val rdd1=sc.parallelize(List("A","B"))
val rdd2=sc.parallelize(List("B","C"))
// union(): new RDD containing all elements from source RDD and argument.
rdd1.union(rdd2).collect()
// intersection(): new RDD containing all elements from source RDD and argument.
rdd1.intersection(rdd2).collect()
// cartesian(): new RDD cross product of all elements from source RDD and argument.
rdd1.cartesian(rdd2).collect()
// subtract() :new RDD created by removing data elements in source RDD in common with argument 
rdd1.subtract(rdd2).collect()

rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[66] at parallelize at <console>:29
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[67] at parallelize at <console>:30
res22: Array[(String, String)] = Array((A,B), (A,C), (B,B), (B,C))


In [29]:
val personFruit = sc.parallelize(Seq(("Andy", "Apple"), ("Bob", "Banana"), ("Charlie", "Cherry"), ("Andy","Apricot")))
val personSE    = sc.parallelize(Seq(("Andy","Google"), ("Bob", "Bing"), ("Charlie", "Yahoo"), ("Bob","AltaVista")))
// join(RDD,[numTasks]): When invoked on(K,V) and (K,W), this operationcreates a new RDD of (K,(V,W))
personFruit.join(personSE).collect()

personFruit: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[76] at parallelize at <console>:25
personSE: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[77] at parallelize at <console>:26
res23: Array[(String, (String, String))] = Array((Bob,(Banana,Bing)), (Bob,(Banana,AltaVista)), (Andy,(Apple,Google)), (Andy,(Apricot,Google)), (Charlie,(Cherry,Yahoo)))


In [31]:
val rdd = sc.parallelize(List("A","B","C"))
// count(): Get the number of data elements in the RDD
rdd.count()

rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[81] at parallelize at <console>:27
res24: Long = 3


In [32]:
val rdd = sc.parallelize(List(1,2,3,4))
// reduce(func): Aggregate the data elements in an RDD using this function which takes two arguments and returns one
rdd.reduce(_+_)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[82] at parallelize at <console>:27
res25: Int = 10


In [33]:
val rdd = sc.parallelize(List(1,2,3,4))
//take (n): fetch first n data elements in an RDD. Computed by driver program. 
rdd.take(2)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[83] at parallelize at <console>:27
res26: Array[Int] = Array(1, 2)


In [36]:
val rdd = sc.parallelize(List(1,2,3,4))
// foreach(func): execute function foreach data element in RDD. 
rdd.foreach(x=>println("%s*10=%s".format(x,x*10)))

1*10=10
2*10=20
3*10=30
4*10=40


rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[85] at parallelize at <console>:29


In [2]:
// Accumulators
val nErrors=sc.accumulator(0.0)
val logs = sc.textFile("output.log")
logs.filter(_.contains("error")).foreach(x=>nErrors+=1)
nErrors.value

nErrors: org.apache.spark.Accumulator[Double] = 2.0
logs: org.apache.spark.rdd.RDD[String] = output.log MapPartitionsRDD[1] at textFile at <console>:27
res1: Double = 2.0


In [9]:
val map = sc.parallelize(Seq(("ground",1),("med",2),("priority",5),("express",10))).collect().toMap
val bcMailRates = sc.broadcast(map)
// In the above command, we create a broadcast variable, a map containing cost by class of service.
val pts = sc.textFile("packagesToShip.txt")
//pts.map(shipType=>(shipType,1)).reduceByKey(_+_).map{case (shipType,nPackages)=>(shipType,nPackages*bcMailRates.value(shipType))}.collect()

val shippingCost=sc.accumulator(0.0)
//pts.map(x=>(x,1)).reduceByKey(_+_).map{case(x,y)=>(x,y*bcMailRates.value(x))}.foreach(v=>shippingCost+=v._2)
shippingCost.value

map: scala.collection.immutable.Map[String,Int] = Map(ground -> 1, med -> 2, priority -> 5, express -> 10)
bcMailRates: org.apache.spark.broadcast.Broadcast[scala.collection.immutable.Map[String,Int]] = Broadcast(23)
pts: org.apache.spark.rdd.RDD[String] = packagesToShip.txt MapPartitionsRDD[27] at textFile at <console>:30
shippingCost: org.apache.spark.Accumulator[Double] = 0.0
res5: Double = 0.0


In [None]:
case class Customer(name:String,age:Int,gender:String,address:String)

spark.driver.allowMultipleContexts = true

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext

val sparkConf  = new SparkConf().setAppName("Customers")
val sc         = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
val r          = sc.textFile("customers.txt")
val records    = r.map(_.split("|"))
val c          = records.map(r=>Customer(r(0),r(1).trim.toInt,r(2),r(3)))


c.registerAsTable("customers")
sqlContext.sql("select * from customers where gender='M' and age < 30").collect().foreach(println) 