### Task: Data Engineer

Make sure to keep the Spark UI run in port no. http://localhost:4040/jobs open in a browser

#### Load/Read CSV Data

In [1]:
import org.apache.spark.sql.functions._

In [7]:
val columns = Seq("timestamp", "site", "requests")
val df = spark.read.option("inferSchema", true)
    .option("header", true)
    .csv("/Users/josiahsams/SparkDemo/datasets/pageviews-by-second-tsv.csv")
    .toDF(columns: _*)

columns: Seq[String] = List(timestamp, site, requests)
df: org.apache.spark.sql.DataFrame = [timestamp: timestamp, site: string ... 1 more field]


Make sure to use inferSchema specially when dealing with csv to let Spark infer the Schema for us by sampling the data

In [8]:
df.printSchema

root
 |-- timestamp: timestamp (nullable = true)
 |-- site: string (nullable = true)
 |-- requests: integer (nullable = true)



In [4]:
df.show

+--------------------+-------+--------+
|           timestamp|   site|requests|
+--------------------+-------+--------+
|2015-03-16 01:30:...|desktop|    2288|
|2015-03-16 01:57:...| mobile|    1825|
|2015-03-16 02:20:...|desktop|    2350|
|2015-03-16 03:06:...|desktop|    2476|
|2015-03-16 03:52:...|desktop|    2283|
|2015-03-16 04:28:...|desktop|    2374|
|2015-03-16 04:55:...|desktop|    2211|
|2015-03-16 05:20:...|desktop|    2238|
|2015-03-16 06:10:...|desktop|    2295|
|2015-03-16 07:01:...|desktop|    2211|
|2015-03-16 07:47:...| mobile|     878|
|2015-03-16 08:20:...|desktop|    2227|
|2015-03-16 09:03:...| mobile|     839|
|2015-03-16 09:42:...| mobile|     863|
|2015-03-16 10:39:...|desktop|    2391|
|2015-03-16 11:33:...| mobile|     981|
|2015-03-16 12:07:...|desktop|    2588|
|2015-03-16 12:36:...| mobile|    1037|
|2015-03-16 13:39:...|desktop|    2943|
|2015-03-16 14:17:...| mobile|    1282|
+--------------------+-------+--------+
only showing top 20 rows



In [4]:
def timeit[T]( code: => T): T ={
    val now1 = System.nanoTime
    val ret: T = code
    val ms1 = (System.nanoTime - now1) / 1000000
    println("Elapsed time: %d ms".format( ms1))
    ret
}

timeit: [T](code: => T)T


In [6]:
timeit({println(df.count)})

7199995
Elapsed time: 3554 ms


### Cache the data by creating a temporary view

In [9]:
df.createTempView("pageviews_by_second")

In [8]:
spark.sqlContext.cacheTable("pageviews_by_second")

In [10]:
val pageDF = spark.read.table("pageviews_by_second")

pageDF: org.apache.spark.sql.DataFrame = [timestamp: timestamp, site: string ... 1 more field]


In [11]:
timeit({ println(pageDF.count) })

7199995
Elapsed time: 4129 ms


### Repartition the data to improve parallelism

Check the Event Timeline for this Job in the Spark UI

In [11]:
df.rdd.getNumPartitions

res6: Int = 5


In [12]:
import org.apache.spark.sql.SaveMode

df.repartition(4)
    .write.format("parquet")
    .mode(SaveMode.Overwrite)
    .option("overwrite", true)
    .save("/Users/josiahsams/SparkDemo/datasets/repart-pageviews-by-second-tsv.parquet")

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.


In [2]:
val pageviewsDF = spark.read.parquet("/Users/josiahsams/SparkDemo/datasets/repart-pageviews-by-second-tsv.parquet")

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.


pageviewsDF: org.apache.spark.sql.DataFrame = [timestamp: timestamp, site: string ... 1 more field]


In [6]:
timeit({println(pageviewsDF.count)})

7199995
Elapsed time: 2174 ms


[image1]: ./images/4_partitions_dashed.png
![4 partitions][image1]

In [15]:
val cachedDF = pageviewsDF.orderBy(column("timestamp"), column("site").desc).cache

cachedDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [timestamp: timestamp, site: string ... 1 more field]


In [16]:
cachedDF.count

res9: Long = 7199995


In [17]:
cachedDF.rdd.getNumPartitions

res10: Int = 200


### Caching with Caution

check the partitions right after the cache call on the dataframe in Spark UI

In [18]:
cachedDF.unpersist

res11: cachedDF.type = [timestamp: timestamp, site: string ... 1 more field]


In [19]:
spark.sqlContext.getConf("spark.sql.shuffle.partitions")

res12: String = 200


In [12]:
spark.sqlContext.setConf("spark.sql.shuffle.partitions", "4")

In [21]:
spark.sqlContext.getConf("spark.sql.shuffle.partitions")

res14: String = 4


In [13]:
val newcachedDF = pageviewsDF.orderBy(column("timestamp"), column("site").desc).cache

newcachedDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [timestamp: timestamp, site: string ... 1 more field]


In [14]:
newcachedDF.count

res7: Long = 7199995


### DataSet

In [24]:
case class pageview(timestamp: Option[java.sql.Timestamp], site: Option[String], requests: Option[Int])

defined class pageview


In [25]:
import org.apache.spark.sql.SparkSession
val sc1 = SparkSession.builder.getOrCreate()
import sc1.implicits._
val dataset = newcachedDF.as[pageview]

sc1: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@21f1ed30
dataset: org.apache.spark.sql.Dataset[pageview] = [timestamp: timestamp, site: string ... 1 more field]


In [26]:
val filterMapDS = dataset.filter(_.site.getOrElse("") == "mobile")

filterMapDS: org.apache.spark.sql.Dataset[pageview] = [timestamp: timestamp, site: string ... 1 more field]


In [27]:
filterMapDS.take(10).foreach(println)

pageview(Some(2015-03-16 00:00:00.0),Some(mobile),Some(1628))
pageview(Some(2015-03-16 00:00:01.0),Some(mobile),Some(1636))
pageview(Some(2015-03-16 00:00:02.0),Some(mobile),Some(1619))
pageview(Some(2015-03-16 00:00:03.0),Some(mobile),Some(1776))
pageview(Some(2015-03-16 00:00:04.0),Some(mobile),Some(1716))
pageview(Some(2015-03-16 00:00:05.0),Some(mobile),Some(1721))
pageview(Some(2015-03-16 00:00:06.0),Some(mobile),Some(1695))
pageview(Some(2015-03-16 00:00:07.0),Some(mobile),Some(1630))
pageview(Some(2015-03-16 00:00:08.0),Some(mobile),Some(1731))
pageview(Some(2015-03-16 00:00:09.0),Some(mobile),Some(1664))


### RDDs: Job, Stages, Task

[image2]: ./images/picture1.png
![stages][image2]

[image3]: ./images/picture2.png
![stages][image3]

### Catalyst Optimizer: Logical Plan to Physical Plan

In [15]:
val filterDF = newcachedDF.filter(column("site") === "mobile")

filterDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [timestamp: timestamp, site: string ... 1 more field]


In [29]:
filterDF.queryExecution.logical

res17: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
'Filter ('site = mobile)
+- Sort [timestamp#124 ASC, site#125 DESC], true
   +- Relation[timestamp#124,site#125,requests#126] parquet


[image4]: ./images/filter_count_run.png
![logical plan][image4]

In [16]:
filterDF.explain(true)

== Parsed Logical Plan ==
'Filter ('site = mobile)
+- Sort [timestamp#0 ASC, site#1 DESC], true
   +- Relation[timestamp#0,site#1,requests#2] parquet

== Analyzed Logical Plan ==
timestamp: timestamp, site: string, requests: int
Filter (site#1 = mobile)
+- Sort [timestamp#0 ASC, site#1 DESC], true
   +- Relation[timestamp#0,site#1,requests#2] parquet

== Optimized Logical Plan ==
Filter (isnotnull(site#1) && (site#1 = mobile))
+- InMemoryRelation [timestamp#0, site#1, requests#2], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
   :  +- *Sort [timestamp#0 ASC, site#1 DESC], true, 0
   :     +- Exchange rangepartitioning(timestamp#0 ASC, site#1 DESC, 4)
   :        +- *BatchedScan parquet [timestamp#0,site#1,requests#2] Format: ParquetFormat, InputPaths: file:/Users/josiahsams/SparkDemo/datasets/repart-pageviews-by-second-tsv.parquet, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<timestamp:timestamp,site:string,requests:int>

== Physical Plan ==
*Filter (

[image5]: ./images/catalyst.png
![catalyst][image5]

In [31]:
filterDF.count

res19: Long = 3599997


[image6]: ./images/filter_physical_model.png
![filter_physical_model][image6]

In [32]:
filterDF.queryExecution.debug.codegen

Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*Filter (isnotnull(site#125) && (site#125 = mobile))
+- InMemoryTableScan [timestamp#124, site#125, requests#126], [isnotnull(site#125), (site#125 = mobile)]
   :  +- InMemoryRelation [timestamp#124, site#125, requests#126], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
   :     :  +- *Sort [timestamp#124 ASC, site#125 DESC], true, 0
   :     :     +- Exchange rangepartitioning(timestamp#124 ASC, site#125 DESC, 4)
   :     :        +- *BatchedScan parquet [timestamp#124,site#125,requests#126] Format: ParquetFormat, InputPaths: file:/Users/josiahsams/SparkDemo/datasets/repart-pageviews-by-second-tsv.parquet, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<timestamp:timestamp,site:string,requests:int>

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.ap

[image8]: ./images/pipelining.png
![pipeline][image8]

### Reference


* Deep Dive into Project Tungsten: Bringing Spark Closer to Bare Metal

    http://tinyurl.com/project-tungsten

    https://www.youtube.com/watch?v=5ajs8EIPWGI
    

* Spark Performance: What’s Next

    https://www.youtube.com/watch?v=JX0CdOTWYX4
    

* Unified Memory Management

    https://issues.apache.org/jira/browse/SPARK-10000
    