In [None]:
//create the context :: execute once - use multiple times
val conf = new SparkConf().setMaster("local[2]").setAppName("Wiki app")
val sc = getOrCreate(conf)

In [None]:
//create RDD using parallelize 
val lines = sc.parallelize(List("pandas","second line pandas"))
lines.collect

In [None]:
//create RDD using external dataset
val lines = sc.textFile("/home/jovyan/work/learning-spark-master/files/fake_logs/log1.log")
lines.collect            

In [None]:
//transformations
val inputRDD = sc.textFile("/home/jovyan/work/learning-spark-master/files/fake_logs/log1.log").persist
val errorsRDD = inputRDD.filter(x => x.contains("error")).persist
errorsRDD.count

In [None]:
//transformations - using union :: check for all errors and warnings
val warningsRDD = inputRDD.filter(x => x.contains("warning"))
val errorOrwarningRDD = errorsRDD.union(warningsRDD)
errorOrwarningRDD.count

//better way to do the same
val errorOrwarningRDD1 = inputRDD.filter(x => x.contains("warning") || x.contains("error"))
errorOrwarningRDD1.count

In [None]:
//actions
errorOrwarningRDD1.count
//use take for few records
errorOrwarningRDD1.take(10)
errorOrwarningRDD1.take(10).foreach(println)
//use collect for all records - shouldn't be used on large datasets
errorOrwarningRDD1.collect

In [87]:
//transformations - basic :: using map and convert list to string
val input = sc.parallelize(List(1,2,3,4))
val sqRDD = input.map(x => x*x)
println(sqRDD.collect().mkString(","))


1,4,9,16


In [91]:
//transformations - basic :: using flatMap
val lines = sc.parallelize(List("hello world","hi","this is third element"))
val flatRDD = lines.flatMap(x => x.split(" "))
flatRDD.collect

Array(hello, world, hi, this, is, third, element)

In [94]:
//transformations - basic :: using distinct
val lines = sc.parallelize(List(1,2,2,3,3,4))
val uniqueRDD = lines.distinct
uniqueRDD.collect

Name: Unknown Error
Message: <console>:32: error: value sort is not a member of org.apache.spark.rdd.RDD[Int]
       uniqueRDD.sort.collect
                 ^
StackTrace: 

In [99]:
//transformations - basic :: unions - will contain all the elements including all duplicates
val lines1 = sc.parallelize(List(1,2,2,3,3,4))
val lines2 = sc.parallelize(List(1,2,2,3,5))
val unionRDD = lines1.union(lines2)
unionRDD.collect

Array(1, 2, 2, 3, 3, 4, 1, 2, 2, 3, 5)

In [98]:
//actions - basic :: intersection - will contain common elements and REMOVES all the duplicates
//note :: bad performance than union since it needs to shuffle over the network to identify the common elements
val lines1 = sc.parallelize(List(1,2,2,3,3,4))
val lines2 = sc.parallelize(List(1,2,2,3,5))
val intersectRDD = lines1.intersection(lines2)
intersectRDD.collect

Array(1, 2, 3)

In [101]:
//transformations - basic :: subtract - will contain only those elements unique to first RDD
val lines1 = sc.parallelize(List(1,2,2,3,3,4))
val lines2 = sc.parallelize(List(1,2,2,3,5))
val subtractRDD = lines1.subtract(lines2)
subtractRDD.collect

Array(4)

In [102]:
//transformations - basic :: cartesian product - will return all pairs - very expensive for large datasets
val lines1 = sc.parallelize(List(1,2,2,3,3,4))
val lines2 = sc.parallelize(List(1,2,2,3,5))
val cartesianRDD = lines1.cartesian(lines2)
cartesianRDD.collect

Array((1,1), (1,2), (1,2), (1,3), (1,5), (2,1), (2,1), (2,2), (2,2), (2,2), (2,2), (2,3), (2,5), (2,3), (2,5), (3,1), (3,2), (3,2), (3,3), (3,5), (3,1), (4,1), (3,2), (4,2), (3,2), (4,2), (3,3), (3,5), (4,3), (4,5))

In [103]:
//actions - basic :: reduce
val lines = sc.parallelize(List(1,2,3,4))
val sum = lines.reduce((x,y)=> x+y)
println(sum)

10


In [104]:
//actions - basic :: collect - returns entire RDDs contents - all data must fit on a signle machine's memory
val lines = sc.parallelize(List(1,2,3,4))
lines.collect


Array(1, 2, 3, 4)

In [106]:
//actions - basic :: take(n) returns n elements from RDD and attempts to minimize the number of partitions it accesses
//it's possible that the output is not in order expected

val lines = sc.parallelize(List(1,2,3,4,5,6),3)
lines.take(5)

Array(1, 2, 3, 4, 5)

In [109]:
//actions - basic :: top(n) returns max n elements from RDD 

val lines = sc.parallelize(List(1,2,3,4,5,6,2,4),3)
lines.top(2)

Array(6, 5)

In [120]:
//misc - persist
import org.apache.spark.storage.StorageLevel

val inputRDD = sc.parallelize(List(1,2,3,4))
val resultRDD = input.map(x => x * x)
resultRDD.persist(StorageLevel.DISK_ONLY)
println(resultRDD.count)
println(resultRDD.collect.mkString(","))
resultRDD.unpersist()

4
1,4,9,16


MapPartitionsRDD[156] at map at <console>:35