# Bite sized Spark Introduction

## Spark Session & DataFrame

The entry point to all functionality in Spark is the `SparkSession` class. In `spark-shell`, `SparkSession` class already instantiated in `spark` object, so we can use it directly.

In [None]:
// use pre initiated spark instance
val bikeData = spark.read.option("header", "true").csv("datalake/bike-data/201508_trip_data.csv")

Or if we want to create another instance with specific configuration.

In [None]:
import org.apache.spark.sql.SparkSession

// build instance another spark instance manually 
val sparkSession = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

val bikeData1 = sparkSession.read.option("header", "true").csv("datalake/bike-data/201508_trip_data.csv")

`DataFrame` is the main class that we use to do transformation with our data. We can asume it like a table in relational database. When we load a datasource actually we can say it's a table. It can be joined with another and also doing other operations like `select()`, `filter()`, `sum()`, `max()`, `groupBy()`, etc. In the example above, `bikeData` and `bikeData1` are instance of `DataFrame`.

In [None]:
// doing operation to DataFrame
bikeData.filter($"`Duration`" > 1000)
    .select(
        $"`Trip ID`".as("trip_id"),
        $"`Duration`".as("duration"),
        $"`Start Station`".as("start_station"),
        $"`End Station`".as("end_station")
    )
    .show()

There are another table like data class in `Spark` called `DataSet`. The main difference between `DataFrame` and `DataSet`: `DataSet` is `strongly-typed` data and `DataFrame` is `untyped-data`. Please read this [reference](https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html) to get more explanation about `DataFrame` and `DataSet` comparison.

## Dataframe Operations

`DataFrame` operations in `Spark` actually very similar with operations that exists in `SQL`. If we already familiar with `SQL` syntax, we can easily getting familiar with `Spark`. We just need to understand the equivalent operation/function that exist in `Spark`. Here are some common operation that we usually use in `SQL` for transform our data.

### read()

Load datasource files to `Spark` `DataFrame`. Datasource format can be `csv`, `json`, `parquet`, etc. Example:

In [None]:
val datalakeSrc = "datalake/business_platform"
val salesDeliverySrc = spark.read.parquet(s"$datalakeSrc/sales_delivery") // <-- as from explanation above, this will return a DataFrame

### printSchema()

See the structure of a DataFrame

In [None]:
// inspect schema
salesDeliverySrc.printSchema()

### select()

Select operation

In [None]:
val salesDeliverySrc = spark.read.parquet(s"$datalakeSrc/sales_delivery")

salesDeliverySrc
    .select(
        $"`sales.sales_delivery.additional_data.bookingId`".as("booking_id"),
        $"`sales.sales_delivery.additional_data.locale`".as("locale"),
        $"`sales.sales_delivery.sales_delivery_id`".as("sales_delivery_id"),
        $"`sales.sales_delivery.total_amount`".as("total_amount")
    )
    .show()

### filter()

Equivalent with `WHERE` syntax in `SQL`

In [None]:
val salesDeliverySrc = spark.read.parquet(s"$datalakeSrc/sales_delivery")

salesDeliverySrc
    .filter(
        $"`sales.sales_delivery.additional_data.tripType`" === "CULINARY"
    )
    .select(
        $"`sales.sales_delivery.additional_data.bookingId`".as("booking_id"),
        $"`sales.sales_delivery.additional_data.locale`".as("locale"),
        $"`sales.sales_delivery.sales_delivery_id`".as("sales_delivery_id"),
        $"`sales.sales_delivery.total_amount`".as("total_amount"),
        $"`sales.sales_delivery.additional_data.tripType`".as("trip_type")
    )
    .show()

### take()

Get some rows in `DataFrame`, returning an `Array`.

In [None]:
val salesDeliverySrc = spark.read.parquet(s"$datalakeSrc/sales_delivery")

salesDeliverySrc
    .filter(
        $"`sales.sales_delivery.additional_data.tripType`" === "CULINARY"
    )
    .select(
        $"`sales.sales_delivery.additional_data.bookingId`".as("booking_id"),
        $"`sales.sales_delivery.additional_data.locale`".as("locale"),
        $"`sales.sales_delivery.sales_delivery_id`".as("sales_delivery_id"),
        $"`sales.sales_delivery.total_amount`".as("total_amount"),
        $"`sales.sales_delivery.additional_data.tripType`".as("trip_type")
    )
    .take(5)

### join()

Joining two or more DataFrame

In [None]:
// Load Data
val salesDeliverySrc = spark.read.parquet(s"$datalakeSrc/sales_delivery")
val salesInvoiceSrc = spark.read.parquet(s"$datalakeSrc/sales_invoice")

// Do some operation
val salesDelivery = salesDeliverySrc
    .filter(
        $"`sales.sales_delivery.additional_data.tripType`" === "CULINARY"
    )
    .select(
        $"`sales.sales_delivery.additional_data.bookingId`".as("booking_id"),
        $"`sales.sales_delivery.additional_data.locale`".as("locale"),
        $"`sales.sales_delivery.sales_delivery_id`".as("sales_delivery_id"),
        $"`sales.sales_delivery.total_amount`".as("total_amount"),
        $"`sales.sales_delivery.additional_data.tripType`".as("trip_type")
    )
    .as("bp_sales_delivery")

val salesInvoice = salesInvoiceSrc
    .filter(
        $"`sales.sales_invoice.additional_data.tripType`" === "CULINARY"
    )
    .select(
        $"`sales.sales_invoice.additional_data.bookingId`".as("booking_id"),
        $"`sales.sales_invoice.sales_invoice_id`".as("sales_invoice_id")    
    )
    .as("bp_sales_invoice")

// Join data
salesDelivery
    .join(salesInvoice, $"bp_sales_delivery.booking_id" === $"bp_sales_invoice.booking_id", "left")
    .select(
        $"*"
    )
    .show()

### when() and otherwise()

Equivalent to `CASE...WHEN` syntax in `SQL`

In [None]:
import org.apache.spark.sql.functions.{when, lit} // dont forget to import the function

val salesDeliverySrc = spark.read.parquet(s"$datalakeSrc/sales_delivery")

salesDeliverySrc
    .filter(
        $"`sales.sales_delivery.additional_data.tripType`" === "CULINARY"
    )
    .select(
        $"`sales.sales_delivery.additional_data.bookingId`".as("booking_id"),
        when($"`sales.sales_delivery.additional_data.locale`" === "id_ID", lit("ID"))
            .otherwise(lit("Non ID")).as("locale"),
        $"`sales.sales_delivery.sales_delivery_id`".as("sales_delivery_id"),
        $"`sales.sales_delivery.total_amount`".as("total_amount"),
        $"`sales.sales_delivery.additional_data.tripType`".as("trip_type")
    )
    .show()

### groupby() and agg()

Equivalent to `GROUP` BY in `SQL`

In [None]:
val salesDeliverySrc = spark.read.parquet(s"$datalakeSrc/sales_delivery")

salesDeliverySrc
    .filter(
        $"`sales.sales_delivery.additional_data.tripType`" === "CULINARY"
    )
    .select(
        $"`sales.sales_delivery.additional_data.bookingId`".as("booking_id"),
        $"`sales.sales_delivery.additional_data.locale`".as("locale"),
        $"`sales.sales_delivery.sales_delivery_id`".as("sales_delivery_id"),
        $"`sales.sales_delivery.total_amount`".as("total_amount"),
        $"`sales.sales_delivery.additional_data.tripType`".as("trip_type")
    )
    .groupBy($"locale")

### max()

Equivalent to `MAX` in `SQL`

In [None]:
import org.apache.spark.sql.functions.max // dont forget to import the function

val salesDeliverySrc = spark.read.parquet(s"$datalakeSrc/sales_delivery")

salesDeliverySrc
    .filter(
        $"`sales.sales_delivery.additional_data.tripType`" === "CULINARY"
    )
    .select(
        $"`sales.sales_delivery.additional_data.bookingId`".as("booking_id"),
        $"`sales.sales_delivery.additional_data.locale`".as("locale"),
        $"`sales.sales_delivery.sales_delivery_id`".as("sales_delivery_id"),
        $"`sales.sales_delivery.total_amount`".as("total_amount"),
        $"`sales.sales_delivery.additional_data.tripType`".as("trip_type")
    )
    .groupBy($"locale")
    .agg(
        max($"total_amount").as("max_total_amount")
    )
    .show()

### sum()

Equivalent to `SUM` in `SQL`

In [None]:
import org.apache.spark.sql.functions.sum 

val salesDeliverySrc = spark.read.parquet(s"$datalakeSrc/sales_delivery")

val salesDelivery = salesDeliverySrc
    .filter(
        $"`sales.sales_delivery.additional_data.tripType`" === "CULINARY"
    )
    .select(
        $"`sales.sales_delivery.additional_data.bookingId`".as("booking_id"),
        $"`sales.sales_delivery.additional_data.locale`".as("locale"),
        $"`sales.sales_delivery.sales_delivery_id`".as("sales_delivery_id"),
        $"`sales.sales_delivery.total_amount`".as("total_amount"),
        $"`sales.sales_delivery.additional_data.tripType`".as("trip_type")
    )
    .groupBy($"locale")
    .agg(
        sum($"total_amount").as("sum_total_amount")
    )
    .show()

### dropDuplicates()

Remove duplicated rows result

In [None]:
val salesDeliverySrc = spark.read.parquet(s"$datalakeSrc/sales_delivery")

// Do some operation
val salesDelivery = salesDeliverySrc
    .filter(
        $"`sales.sales_delivery.additional_data.tripType`" === "CULINARY"
    )
    .select(
        $"`sales.sales_delivery.additional_data.bookingId`".as("booking_id"),
        $"`sales.sales_delivery.additional_data.locale`".as("locale"),
        $"`sales.sales_delivery.sales_delivery_id`".as("sales_delivery_id"),
        $"`sales.sales_delivery.total_amount`".as("total_amount"),
        $"`sales.sales_delivery.additional_data.tripType`".as("trip_type")
    )
    .dropDuplicates()
    .show()

### explain()

In [None]:
// Load Data
val salesDeliverySrc = spark.read.parquet(s"$datalakeSrc/sales_delivery")
val salesInvoiceSrc = spark.read.parquet(s"$datalakeSrc/sales_invoice")

// Do some operation
val salesDelivery = salesDeliverySrc
    .filter(
        $"`sales.sales_delivery.additional_data.tripType`" === "CULINARY"
    )
    .select(
        $"`sales.sales_delivery.additional_data.bookingId`".as("booking_id"),
        $"`sales.sales_delivery.additional_data.locale`".as("locale"),
        $"`sales.sales_delivery.sales_delivery_id`".as("sales_delivery_id"),
        $"`sales.sales_delivery.total_amount`".as("total_amount"),
        $"`sales.sales_delivery.additional_data.tripType`".as("trip_type")
    )
    .as("bp_sales_delivery")

val salesInvoice = salesInvoiceSrc
    .filter(
        $"`sales.sales_invoice.additional_data.tripType`" === "CULINARY"
    )
    .select(
        $"`sales.sales_invoice.additional_data.bookingId`".as("booking_id"),
        $"`sales.sales_invoice.sales_invoice_id`".as("sales_invoice_id")    
    )
    .as("bp_sales_invoice")

// Join data
salesDelivery
    .join(salesInvoice, $"bp_sales_delivery.booking_id" === $"bp_sales_invoice.booking_id", "left")
    .select(
        $"*"
    )
    .explain()

### About Lazy Evaluation

`Lazy evaulation` means that `Spark` will wait until the very last moment to execute the graph of computation instructions. In `Spark`, instead of modifying the data immediately when you express some operation, we build up a plan of transformations that you would like to apply to your source data. By waiting until the last minute to execute the code, `Spark` compiles this plan from our raw `DataFrame` transformations to a streamlined physical plan that will run as efficiently as possible across the cluster. So on the examples above, any transform operation is not executed until `show()` function getting called. 