## With Spark 2.x one can work primarily through the SparkSession
### which avoids need for managing SparkContext, SQLContext and HiveContext


import org.apache.spark.sql.SparkSession<br><br>
val sparkSession = SparkSession.builder().appName("Spark SQL basic example").getOrCreate()<br><br>
sparkSession.read.format("csv").option("header", "true").option("inferSchema",<br> "true").load("raw_session_data_sample.csv")

In [1]:
// SparkContext typically already available in notebooks, we'll use it for this demo

sc.getClass

class org.apache.spark.SparkContext

### Read raw data into a Dataset

In [2]:
// SQLContext is a wrapper on SparkContext which allows sql-like operations
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// read raw session events from .csv as a Dataset
val sessionsDS = sqlContext.read.format("csv").option("header", "true").option("inferSchema", "true").load("raw_session_data_sample.csv")

sessionsDS.show

+----------+-------+-----------+----------+
|session_id|user_id|     action|product_id|
+----------+-------+-----------+----------+
|         1|      1|       view|    prod_1|
|         1|      1|   cart_add|    prod_1|
|         1|      1|cart_remove|    prod_1|
|         1|      1|       view|    prod_2|
|         1|      1|       view|    prod_3|
|         1|      1|       view|    prod_4|
|         1|      1|       view|    prod_2|
|         2|      1|       view|    prod_5|
|         2|      1|     search|      null|
|         2|      1|       view|    prod_6|
|         2|      1|     search|      null|
|         2|      1|       view|    prod_2|
|         3|      1|       view|    prod_5|
|         3|      1|     search|      null|
|         3|      1|   cart_add|    prod_2|
|         3|      1|       view|    prod_8|
|         3|      1|   purchase|    prod_2|
+----------+-------+-----------+----------+



### What the raw data should look like after featurization

In [3]:
val sessionsFeaturizedDS = sqlContext.read.format("csv").option("header", "true").option("inferSchema", "true").load("raw_session_data_sample_featurized.csv")

sessionsFeaturizedDS.show()

+---------------+----------------------+------------+--------------+-----------------+-----------+
|prod_view_count|unique_prod_view_count|search_count|cart_add_count|cart_remove_count|is_purchase|
+---------------+----------------------+------------+--------------+-----------------+-----------+
|              5|                     4|           0|             1|                1|          0|
|              3|                     3|           2|             0|                0|          0|
|              2|                     2|           1|             1|                0|          1|
+---------------+----------------------+------------+--------------+-----------------+-----------+



### Can view datatypes of columns

In [4]:
sessionsDS.printSchema

root
 |-- session_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- action: string (nullable = true)
 |-- product_id: string (nullable = true)



## Featurization using RDD

In [20]:
// RDD's are NOT deprecated
// RDD's are immutable
// Lend themselves to unstructured/schemaless data

// Get count of product views per session - RDD version 

val NA_STRING  = "NA"

// Easy to switch from DS to RDD (and back)
val sessionsRDD = sessionsDS.rdd

// Rekey the session data in format : (session_id, (action, prod_id))
val sessionsRDDKeyed = sessionsRDD.map(r => (r.getInt(r.fieldIndex("session_id")), (r.getString(r.fieldIndex("action")), Option(r.get(r.fieldIndex("product_id"))).getOrElse(NA_STRING))))

// foreach() allows to execute non-mutating operations on each item, such as println()
sessionsRDDKeyed.collect.foreach(x => println(x))

(1,(view,prod_1))
(1,(cart_add,prod_1))
(1,(cart_remove,prod_1))
(1,(view,prod_2))
(1,(view,prod_3))
(1,(view,prod_4))
(1,(view,prod_2))
(2,(view,prod_5))
(2,(search,NA))
(2,(view,prod_6))
(2,(search,NA))
(2,(view,prod_2))
(3,(view,prod_5))
(3,(search,NA))
(3,(cart_add,prod_2))
(3,(view,prod_8))
(3,(purchase,prod_2))


In [6]:
// wrap (action, product) tuples in List

val sessionsRDDKeyedWithList = sessionsRDDKeyed.map(x => (x._1, List(x._2)))

sessionsRDDKeyedWithList.collect.foreach(x => println(x))

(1,List((view,prod_1)))
(1,List((cart_add,prod_1)))
(1,List((cart_remove,prod_1)))
(1,List((view,prod_2)))
(1,List((view,prod_3)))
(1,List((view,prod_4)))
(1,List((view,prod_2)))
(2,List((view,prod_5)))
(2,List((search,NA)))
(2,List((view,prod_6)))
(2,List((search,NA)))
(2,List((view,prod_2)))
(3,List((view,prod_5)))
(3,List((search,NA)))
(3,List((cart_add,prod_2)))
(3,List((view,prod_8)))
(3,List((purchase,prod_2)))


In [7]:
// append each List(Tuple)) object into a single list using reduceByKey

val sessionsRDDReduced = sessionsRDDKeyedWithList.reduceByKey(_ ++ _)

sessionsRDDReduced.collect.foreach(x => println(x))

(1,List((view,prod_1), (cart_add,prod_1), (cart_remove,prod_1), (view,prod_2), (view,prod_3), (view,prod_4), (view,prod_2)))
(3,List((view,prod_5), (search,NA), (cart_add,prod_2), (view,prod_8), (purchase,prod_2)))
(2,List((view,prod_5), (search,NA), (view,prod_6), (search,NA), (view,prod_2)))


In [8]:
// get 'view' action tuples
val sessionsRDDProductViews = sessionsRDDReduced.map(x => (x._1, x._2.filter(y => y._1 == "view")))

sessionsRDDProductViews.collect.foreach(x => println(x))

(1,List((view,prod_1), (view,prod_2), (view,prod_3), (view,prod_4), (view,prod_2)))
(3,List((view,prod_5), (view,prod_8)))
(2,List((view,prod_5), (view,prod_6), (view,prod_2)))


In [9]:
// Count of product view by session

val sessionIdWithProductViewCount = sessionsRDDProductViews.map(x => (x._1, x._2.size))

sessionIdWithProductViewCount.collect.foreach(x => println(x))

(1,5)
(3,2)
(2,3)


In [10]:
// Count of UNIQUE product view by session

val sessionIdWithUniqueProductViewCount = sessionsRDDProductViews.map(x => (x._1, x._2.map(y => y._2).distinct.size))

sessionIdWithUniqueProductViewCount.collect.foreach(x => println(x))

(1,4)
(3,2)
(2,3)


## Featurization using Datasets

In [12]:
// Dataframes are new with 1.3
// Inspired by dataframe construct in R and python
// Lacking in type safety

// Datasets are new with 1.6, stable with 2.0
// More type-safe, more errors caught at compile time than with DataFrame
// More memory efficient than RDD
// More sql-like syntax
// Allows to refer to columns/fields by name

// Get product view count (as above) using DS

import sqlContext.implicits._

var sessionWithProductViewCountDS = sessionsDS.filter($"action" === "view").groupBy("session_id").count()
sessionWithProductViewCountDS = sessionWithProductViewCountDS.withColumnRenamed("count", "prod_view_count")

sessionWithProductViewCountDS.show

+----------+---------------+
|session_id|prod_view_count|
+----------+---------------+
|         1|              5|
|         3|              2|
|         2|              3|
+----------+---------------+



In [13]:
// Can use sql

// Register the DataFrame as a SQL temporary view
sessionsDS.createOrReplaceTempView("sessions_table")

sqlContext.sql("SELECT session_id, count(*) as prod_view_count FROM sessions_table where action = 'view' group by session_id").show

+----------+---------------+
|session_id|prod_view_count|
+----------+---------------+
|         1|              5|
|         3|              2|
|         2|              3|
+----------+---------------+



In [14]:
// get number of searches per session

var sessionWithSearchCountDS = sessionsDS.filter($"action" === "search").groupBy("session_id").count()

sessionWithSearchCountDS = sessionWithSearchCountDS.withColumnRenamed("session_id", "search_session_id")
sessionWithSearchCountDS = sessionWithSearchCountDS.withColumnRenamed("count", "search_count")

sessionWithSearchCountDS.show()

+-----------------+------------+
|search_session_id|search_count|
+-----------------+------------+
|                3|           1|
|                2|           2|
+-----------------+------------+



In [15]:
// can join Datasets

var sessionsWithBothCountsDS = sessionWithProductViewCountDS.join(sessionWithSearchCountDS, sessionWithProductViewCountDS("session_id") === sessionWithSearchCountDS("search_session_id"), "left_outer")

// did a left join so fill in defaults for missing values
// since Datasets are immutable must reassign to a new DS
val finalDS = sessionsWithBothCountsDS.na.fill(0)

// drop duplicate col from join
finalDS.drop("search_session_id").show

+----------+---------------+------------+
|session_id|prod_view_count|search_count|
+----------+---------------+------------+
|         1|              5|           0|
|         3|              2|           1|
|         2|              3|           2|
+----------+---------------+------------+



In [16]:
// User Defined Functions can be applied to columns to derive new columns

import org.apache.spark.sql.functions.udf

def isPurchase(action: String) : Int = if (action.equals("purchase")) 1 else 0

// functional mapping from scala function to a UDF
val isPurchaseUDF = udf((s: String) => isPurchase(s))

var sessionWithIsPurchaseDS = sessionsDS.withColumn("is_purchase", isPurchaseUDF($"action"))

sessionWithIsPurchaseDS.show

+----------+-------+-----------+----------+-----------+
|session_id|user_id|     action|product_id|is_purchase|
+----------+-------+-----------+----------+-----------+
|         1|      1|       view|    prod_1|          0|
|         1|      1|   cart_add|    prod_1|          0|
|         1|      1|cart_remove|    prod_1|          0|
|         1|      1|       view|    prod_2|          0|
|         1|      1|       view|    prod_3|          0|
|         1|      1|       view|    prod_4|          0|
|         1|      1|       view|    prod_2|          0|
|         2|      1|       view|    prod_5|          0|
|         2|      1|     search|      null|          0|
|         2|      1|       view|    prod_6|          0|
|         2|      1|     search|      null|          0|
|         2|      1|       view|    prod_2|          0|
|         3|      1|       view|    prod_5|          0|
|         3|      1|     search|      null|          0|
|         3|      1|   cart_add|    prod_2|     

In [18]:
// Then use groupBy/max to assign the label (is_purchase) to the session

import org.apache.spark.sql.functions.max

sessionWithIsPurchaseDS.groupBy($"session_id").agg(max("is_purchase").alias("is_purchase")).show

+----------+-----------+
|session_id|is_purchase|
+----------+-----------+
|         1|          0|
|         3|          1|
|         2|          0|
+----------+-----------+

