# Spark first

## Initialization. import spark dependencies

In [1]:
import $ivy.`org.apache.spark::spark-sql:2.4.5` // Or use any other 2.x version here
import $ivy.`sh.almond::almond-spark:0.9.1` // Not required since almond 0.7.0 (will be automatically added when importing spark)

[32mimport [39m[36m$ivy.$                                   // Or use any other 2.x version here
[39m
[32mimport [39m[36m$ivy.$                               // Not required since almond 0.7.0 (will be automatically added when importing spark)[39m

## Disable logging

In [2]:
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)

[32mimport [39m[36morg.apache.log4j.{Level, Logger}
[39m

## Create spark session

In [3]:
import org.apache.spark.sql._

//jupyter notebook way to create spark session
val spark = {
  NotebookSparkSession.builder()
    .master("local[*]")
    .getOrCreate()
}

Loading spark-stubs
Getting spark JARs
Creating SparkSession


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties


[32mimport [39m[36morg.apache.spark.sql._

//jupyter notebook way to create spark session
[39m
[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@1e23547

## get sparkContext obj

In [4]:
def sc = spark.sparkContext

defined [32mfunction[39m [36msc[39m

## Create RDD

In [5]:
//create RDD from collection
val rdd1 = sc.parallelize(1 to 100)
rdd1.getClass

//create RDD from text file
val txtRdd = sc.textFile("/home/pi/spark-2.4.5-bin-hadoop2.6/README.md")
txtRdd.getClass

[36mrdd1[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mrdd[39m.[32mRDD[39m[[32mInt[39m] = ParallelCollectionRDD[0] at parallelize at cmd4.sc:1
[36mres4_1[39m: [32mClass[39m[[32mT[39m] = class org.apache.spark.rdd.ParallelCollectionRDD
[36mtxtRdd[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mrdd[39m.[32mRDD[39m[[32mString[39m] = /home/pi/spark-2.4.5-bin-hadoop2.6/README.md MapPartitionsRDD[2] at textFile at cmd4.sc:5
[36mres4_3[39m: [32mClass[39m[[32mT[39m] = class org.apache.spark.rdd.MapPartitionsRDD

## Create Pair RDD

In [6]:
//create it from Map:
val mapScore = Map("name1"->1, "name2"->3,"name3"->2)
val nameRdd = sc.parallelize(mapScore.toSeq)

val mapAge = Map("name1"->33, "name2"->29,"name3"->45)
val ageRdd = sc.parallelize(mapAge.toSeq)

//create it from list
val mapStatus = List(("name1","good"),("name2","better"),("name3","intermediate"))
val statusRdd = sc.parallelize(mapStatus)

[36mmapScore[39m: [32mMap[39m[[32mString[39m, [32mInt[39m] = [33mMap[39m([32m"name1"[39m -> [32m1[39m, [32m"name2"[39m -> [32m3[39m, [32m"name3"[39m -> [32m2[39m)
[36mnameRdd[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mrdd[39m.[32mRDD[39m[([32mString[39m, [32mInt[39m)] = ParallelCollectionRDD[3] at parallelize at cmd5.sc:2
[36mmapAge[39m: [32mMap[39m[[32mString[39m, [32mInt[39m] = [33mMap[39m([32m"name1"[39m -> [32m33[39m, [32m"name2"[39m -> [32m29[39m, [32m"name3"[39m -> [32m45[39m)
[36mageRdd[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mrdd[39m.[32mRDD[39m[([32mString[39m, [32mInt[39m)] = ParallelCollectionRDD[4] at parallelize at cmd5.sc:5
[36mmapStatus[39m: [32mList[39m[([32mString[39m, [32mString[39m)] = [33mList[39m(
  ([32m"name1"[39m, [32m"good"[39m),
  ([32m"name2"[39m, [32m"better"[39m),
  ([32m"name3"[39m, [32m"intermediate"[39m)
)
[36mstatusRdd[39m: [32morg[39m.

## transforming of RDD

### transforming - 1

In [7]:
//common transformations supported by RDD
//filter
val rddLessThan10 = rdd1.filter(item=>item<10)

//find line with max word count
//map
val txtRddLineSize = txtRdd.map(line=>line.split(" ").size)

//flatMap. will break each line into multiple items
val pairMap = txtRdd.flatMap(line => line.split(" ")).map(word => (word,1))
pairMap.take(4)

[36mrddLessThan10[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mrdd[39m.[32mRDD[39m[[32mInt[39m] = MapPartitionsRDD[6] at filter at cmd6.sc:1
[36mtxtRddLineSize[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mrdd[39m.[32mRDD[39m[[32mInt[39m] = MapPartitionsRDD[7] at map at cmd6.sc:5
[36mpairMap[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mrdd[39m.[32mRDD[39m[([32mString[39m, [32mInt[39m)] = MapPartitionsRDD[9] at map at cmd6.sc:8
[36mres6_3[39m: [32mArray[39m[([32mString[39m, [32mInt[39m)] = [33mArray[39m(
  ([32m"#"[39m, [32m1[39m),
  ([32m"Apache"[39m, [32m1[39m),
  ([32m"Spark"[39m, [32m1[39m),
  ([32m""[39m, [32m1[39m)
)

### transforming - 2

In [8]:
//groupByKey. when called on dataset of (K,V) pairs, returns a dataset of (K,Iterable<V>) pairs
val groupByKeyMap = pairMap.groupByKey()
groupByKeyMap.take(5)

//reduceByKey. when called on dataset of (K,V) pairs, returns a dataset of (K,V) pairs
//where V for each K are aggregated by giving reduce func.
val reduceByKeyMap = pairMap.reduceByKey((a,b)=>a+b)
reduceByKeyMap.take(5)

//aggregateByKey

//sortByKey. first parameter indicating sorting by asc or not
val maxWordCountMap = reduceByKeyMap.map(k=>(k._2,k._1)).sortByKey(false)
maxWordCountMap.take(5)

//join
val nameAgeRdd = nameRdd.join(ageRdd)
nameAgeRdd.collect()

val nameAgeCogroupRdd = nameRdd.cogroup(ageRdd)
nameAgeCogroupRdd.collect()

[36mgroupByKeyMap[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mrdd[39m.[32mRDD[39m[([32mString[39m, [32mIterable[39m[[32mInt[39m])] = ShuffledRDD[10] at groupByKey at cmd7.sc:1
[36mres7_1[39m: [32mArray[39m[([32mString[39m, [32mIterable[39m[[32mInt[39m])] = [33mArray[39m(
  ([32m"package"[39m, [33mCompactBuffer[39m([32m1[39m)),
  ([32m"this"[39m, [33mCompactBuffer[39m([32m1[39m)),
  ([32m"integration"[39m, [33mCompactBuffer[39m([32m1[39m)),
  ([32m"page](http://spark.apache.org/documentation.html)."[39m, [33mCompactBuffer[39m([32m1[39m)),
  ([32m"Python"[39m, [33mCompactBuffer[39m([32m1[39m, [32m1[39m))
)
[36mreduceByKeyMap[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mrdd[39m.[32mRDD[39m[([32mString[39m, [32mInt[39m)] = ShuffledRDD[11] at reduceByKey at cmd7.sc:6
[36mres7_3[39m: [32mArray[39m[([32mString[39m, [32mInt[39m)] = [33mArray[39m(
  ([32m"package"[39m, [32m1[39m),
  ([32m"thi

### transforming - 3

In [9]:
//create it from list
val idStatus = List(("id1","1"),("id2","1"),("id1","0"))
val idStatusRdd = sc.parallelize(idStatus)

[36midStatus[39m: [32mList[39m[([32mString[39m, [32mString[39m)] = [33mList[39m(
  ([32m"id1"[39m, [32m"1"[39m),
  ([32m"id2"[39m, [32m"1"[39m),
  ([32m"id1"[39m, [32m"0"[39m)
)
[36midStatusRdd[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mrdd[39m.[32mRDD[39m[([32mString[39m, [32mString[39m)] = ParallelCollectionRDD[21] at parallelize at cmd8.sc:2

In [10]:
//def checkValue(arg1:String, arg2:String):String={
//    return "twoValue"
//}

//print(checkValue("abc","ddd"))

def checkTuple(arg: Tuple2[String,String]):Int={
    arg._2 match{
        case "1"=>1
        case "0"=>1
        case _=>0
    }
}
println(checkTuple(("id1","p")))
println(checkTuple(("id1","1")))
idStatusRdd.map(a=>(a._1,checkTuple(a))).collect()

/*def checkValue(arg: Iterable[String]):Int={
    var count = 0
    val it = arg.iterator
    while(it.hasNext){
        count = count + 1
    }
    return count
}

//val idStatusRddReduced = idStatusRdd.reduceByKey((x,y)=>checkValue(x,y))
val idStatusRddReduced = idStatusRdd.groupByKey().reduceByKey(checkValue)
idStatusRddReduced.collect()
*/

0
1


defined [32mfunction[39m [36mcheckTuple[39m
[36mres9_3[39m: [32mArray[39m[([32mString[39m, [32mInt[39m)] = [33mArray[39m(([32m"id1"[39m, [32m1[39m), ([32m"id2"[39m, [32m1[39m), ([32m"id1"[39m, [32m1[39m))

## Actions of RDD

In [11]:
//collect will display RDD items
rddLessThan10.collect()

//count. show txtRdd size
txtRdd.count()

//take
txtRddLineSize.take(3)
val maxLineWordCount = txtRddLineSize.reduce((a,b) => if (a>b) a else b)
val lineWithMaxWordCount = txtRdd.filter(line => line.size == 16)
lineWithMaxWordCount.collect()

//contByKey
var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("B",3)))
rdd1.getClass
rdd1.countByKey()

//countByKey can't be called on type class org.apache.spark.rdd.MapPartitionsRDD
pairMap.getClass
//val countByKeyMap = pairMap.countyByKey()
//countByKeyMap.take(5)

## creating Dataset

In [12]:
val dataset = spark.read.textFile("/home/pi/spark-2.4.5-bin-hadoop2.6/README.md")

[36mdataset[39m: [32mDataset[39m[[32mString[39m] = [value: string]

In [13]:
dataset.count()
dataset.first()

[36mres12_0[39m: [32mLong[39m = [32m104L[39m
[36mres12_1[39m: [32mString[39m = [32m"# Apache Spark"[39m

In [14]:
val linesWithSpark = dataset.filter(txt=>txt.contains("Spark"))
linesWithSpark.count()


[36mlinesWithSpark[39m: [32mDataset[39m[[32mString[39m] = [value: string]
[36mres13_1[39m: [32mLong[39m = [32m19L[39m

### find the line with most words

In [20]:
//https://spark.apache.org/docs/latest/quick-start.html example can't run here:
dataset.getClass
//dataset.map(line => line.split(" ").size)

val rddx = sc.textFile("hdfs://localhost:9000/user/pi/input/core-site.xml")

[36mres19_0[39m: [32mClass[39m[[32mT[39m] = class org.apache.spark.sql.Dataset
[36mrddx[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mrdd[39m.[32mRDD[39m[[32mString[39m] = hdfs://localhost:9000/user/pi/input/core-site.xml MapPartitionsRDD[48] at textFile at cmd19.sc:4

In [21]:
rddx.take(2)

[36mres20[39m: [32mArray[39m[[32mString[39m] = [33mArray[39m(
  [32m"<?xml version=\"1.0\" encoding=\"UTF-8\"?>"[39m,
  [32m"<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>"[39m
)