In [1]:
spark

Intitializing Scala interpreter ...

Spark Web UI available at http://f9c8cdc0638d:4041
SparkContext available as 'sc' (version = 3.5.1, master = local[*], app id = local-1733411858115)
SparkSession available as 'spark'


res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@60dfb3bf


In [8]:
import org.apache.spark.sql.SparkSession 

val sparkSession = SparkSession.builder.appName("Juptyer").getOrCreate()

//TODO Illustrate how this fails if you change from Option[String] to String for referrer
case class Event (
   //Option is a way to handle NULL more gracefully
    user_id: Option[Integer],
    device_id: Option[Integer],
    referrer: Option[String],
    host: String,
    url: String,
    event_time: String
)

val dummyData = List(
        Event(user_id=Some(1), device_id=Some(2), referrer=Some("linkedin"), host="eczachly.com", url="/signup", event_time="2023-01-01"),
        Event(user_id=Some(3), device_id=Some(7), referrer=Some("twitter"), host="eczachly.com", url="/signup", event_time="2023-01-01")
    )

import org.apache.spark.sql.SparkSession
sparkSession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@60dfb3bf
defined class Event
dummyData: List[Event] = List(Event(Some(1),Some(2),Some(linkedin),eczachly.com,/signup,2023-01-01), Event(Some(3),Some(7),Some(twitter),eczachly.com,/signup,2023-01-01))


In [9]:
//TODO Illustrate how this fails if you change from Option[Long] to Long
case class Device (
    device_id: Integer,
    browser_type: String,
    os_type: String,
    device_type: String
)

case class EventWithDeviceInfo (
   user_id: Integer,
    device_id: Integer,
    browser_type: String,
    os_type: String,
    device_type: String,
    referrer: String,
    host: String,
    url: String,
    event_time: String
)

defined class Device
defined class EventWithDeviceInfo


In [12]:
// When should you use each type?
import sparkSession.implicits._
import org.apache.spark.sql.{SparkSession, Dataset}

// Applying this case class before hand is very powerful, enforces Nullability/non-nullability at runtime!
val events: Dataset[Event] = sparkSession.read.option("header", "true")
                        .option("inferSchema", "true")
                        .csv("/home/iceberg/data/events.csv")
                        .as[Event]

val devices: Dataset[Device] = sparkSession.read.option("header", "true")
                        .option("inferSchema", "true")
                        .csv("/home/iceberg/data/devices.csv")
                        .as[Device]

devices.createOrReplaceTempView("devices")
events.createOrReplaceTempView("events")

import sparkSession.implicits._
import org.apache.spark.sql.{SparkSession, Dataset}
events: org.apache.spark.sql.Dataset[Event] = [user_id: int, device_id: int ... 4 more fields]
devices: org.apache.spark.sql.Dataset[Device] = [device_id: int, browser_type: string ... 2 more fields]


In [14]:
// For simple transformations, you can see that these approaches are very similar. Dataset is winning slightly because of the quality enforcement

val filteredViaDataset = events.filter(event => event.user_id.isDefined && event.device_id.isDefined)
val filteredViaDataFrame = events.toDF().where($"user_id".isNotNull && $"device_id".isNotNull)
val filteredViaSparkSql = sparkSession.sql("SELECT * FROM events WHERE user_id IS NOT NULL AND device_id IS NOT NULL")

filteredViaDataset: org.apache.spark.sql.Dataset[Event] = [user_id: int, device_id: int ... 4 more fields]
filteredViaDataFrame: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [user_id: int, device_id: int ... 4 more fields]
filteredViaSparkSql: org.apache.spark.sql.DataFrame = [user_id: int, device_id: int ... 4 more fields]


In [18]:
// This will fail if user_id is None
val combinedViaDatasets = filteredViaDataset
    .joinWith(devices, events("device_id") === devices("device_id"), "inner")
    .map { case (event: Event, device: Device) => 
        EventWithDeviceInfo(
            user_id = event.user_id.getOrElse(-1),
            device_id = device.device_id,
            browser_type = device.browser_type,
            os_type = device.os_type,
            device_type = device.device_type,
            referrer = event.referrer.getOrElse("unknown"),
            host = event.host,
            url = event.url,
            event_time = event.event_time
        )
    }
    .map { row => 
        row.copy(browser_type = row.browser_type.toUpperCase)
    }


combinedViaDatasets: org.apache.spark.sql.Dataset[EventWithDeviceInfo] = [user_id: int, device_id: int ... 7 more fields]


In [19]:
// DataFrames give up some of the intellisense because you no longer have static typing
val combinedViaDataFrames = filteredViaDataFrame.as("e")
            //Make sure to use triple equals when using data frames
            .join(devices.as("d"), $"e.device_id" === $"d.device_id", "inner")
            .select(
              $"e.user_id",
              $"d.device_id",
              $"d.browser_type",
              $"d.os_type",
              $"d.device_type",
              $"e.referrer",
              $"e.host",
              $"e.url",
              $"e.event_time"
            )

combinedViaDataFrames: org.apache.spark.sql.DataFrame = [user_id: int, device_id: int ... 7 more fields]


In [20]:
//Creating temp views is a good strategy if you're leveraging SparkSQL
filteredViaSparkSql.createOrReplaceTempView("filtered_events")
val combinedViaSparkSQL = spark.sql(f"""
    SELECT 
        fe.user_id,
        d.device_id,
        d.browser_type,
        d.os_type,
        d.device_type,
        fe. referrer,
        fe.host,
        fe.url,
        fe.event_time
    FROM filtered_events fe 
    JOIN devices d ON fe.device_id = d.device_id
""")

combinedViaDatasets.take(5)


combinedViaSparkSQL: org.apache.spark.sql.DataFrame = [user_id: int, device_id: int ... 7 more fields]
res4: Array[EventWithDeviceInfo] = Array(EventWithDeviceInfo(1037710827,532630305,OTHER,Other,Other,unknown,www.zachwilson.tech,/,2021-03-08 17:27:24.241), EventWithDeviceInfo(925588856,532630305,OTHER,Other,Other,unknown,www.eczachly.com,/,2021-05-10 11:26:21.247), EventWithDeviceInfo(-1180485268,532630305,OTHER,Other,Other,unknown,admin.zachwilson.tech,/,2021-02-17 16:19:30.738), EventWithDeviceInfo(-1044833855,532630305,OTHER,Other,Other,unknown,www.zachwilson.tech,/,2021-09-24 15:53:14.466), EventWithDeviceInfo(747494706,532630305,OTHER,Other,Other,unknown,www.zachwilson.tech,/,2021-09-26 16:03:17.535))
