# Spark - The Definitive Guide (DataBricks)

<img src="images/spark_history.png">

In [5]:
val myRange = spark.range(1000).toDF("number")
myRange.show(10)

+------+
|number|
+------+
|     0|
|     1|
|     2|
|     3|
|     4|
|     5|
|     6|
|     7|
|     8|
|     9|
+------+
only showing top 10 rows



myRange: org.apache.spark.sql.DataFrame = [number: bigint]


## Dataframe Transformation

Tranformations are lazy. There were only executed when we call an Action on that.

### Lazy Evaluation
An example of this might be “predicate pushdown”. If we build a large Spark job consisting of narrow dependencies, but specify a filter at the end that only requires us to fetch one row from our source data, the most efficient way to execute this is to access the single record that we need. Spark will actually optimize this for us by pushing the filter down automatically.

In [6]:
val divBy2 = myRange.where("number % 2 == 0")
divBy2.show(10)

+------+
|number|
+------+
|     0|
|     2|
|     4|
|     6|
|     8|
|    10|
|    12|
|    14|
|    16|
|    18|
+------+
only showing top 10 rows



divBy2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [number: bigint]


### Read flight data csv file

In [8]:
val flightData2015 = spark.read
                .option("inferSchema",true)
                .option("header", true)
                .csv("./data/2015-summary.csv")
                

flightData2015: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]


In [9]:
flightData2015.sort("count").explain()

== Physical Plan ==
*(1) Sort [count#50 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#50 ASC NULLS FIRST, 200), true, [id=#82]
   +- FileScan csv [DEST_COUNTRY_NAME#48,ORIGIN_COUNTRY_NAME#49,count#50] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/Users/vishalmishra/projects/github_repos/LearningSpark/data/2015-summary...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [11]:
flightData2015.sort("count").take(2)

res6: Array[org.apache.spark.sql.Row] = Array([United States,Singapore,1], [Moldova,United States,1])


In [13]:
flightData2015.createOrReplaceTempView("flight_data_2015")

### Exercise - 1:
We will use the max function, to find out what is the maximum number of flights to and from any given location are ?

In [17]:
//SQL way
println("SQL way")
spark.sql("select max(count) from flight_data_2015").show()

//DataFrame way
println("DataFrame way")
import org.apache.spark.sql.functions.max
flightData2015.select(max("count")).show()


SQL way
+----------+
|max(count)|
+----------+
|    370002|
+----------+

DataFrame way
+----------+
|max(count)|
+----------+
|    370002|
+----------+



import org.apache.spark.sql.functions.max


### Exercise - 2:
Find out the top five destination countries in the data set?

In [23]:
println("SQL way")
val top5Dest = spark.sql("""
    select DEST_COUNTRY_NAME, sum(count) from flight_data_2015
    group by DEST_COUNTRY_NAME
    order by sum(count) DESC
    Limit 5
    """)

top5Dest.collect.foreach(println)

println("\nDataFrame way")
val top5DestDF = flightData2015
            .groupBy("DEST_COUNTRY_NAME")
            .sum("count")
            .withColumnRenamed("sum(count)", "destination_total")
            .sort(desc("destination_total"))
            .limit(5)
            .collect.foreach(println)

SQL way
[United States,411352]
[Canada,8399]
[Mexico,7140]
[United Kingdom,2025]
[Japan,1548]

DataFrame way
[United States,411352]
[Canada,8399]
[Mexico,7140]
[United Kingdom,2025]
[Japan,1548]


top5Dest: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, sum(count): bigint]
top5DestDF: Unit = ()


In [24]:
val top5DestDF = flightData2015
            .groupBy("DEST_COUNTRY_NAME")
            .sum("count")
            .withColumnRenamed("sum(count)", "destination_total")
            .sort(desc("destination_total"))
            .limit(5)
            .explain()

== Physical Plan ==
TakeOrderedAndProject(limit=5, orderBy=[destination_total#209L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#48,destination_total#209L])
+- *(2) HashAggregate(keys=[DEST_COUNTRY_NAME#48], functions=[sum(cast(count#50 as bigint))])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#48, 200), true, [id=#379]
      +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#48], functions=[partial_sum(cast(count#50 as bigint))])
         +- FileScan csv [DEST_COUNTRY_NAME#48,count#50] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/Users/vishalmishra/projects/github_repos/LearningSpark/data/2015-summary...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>




top5DestDF: Unit = ()


## Datasets

DataFrames are a distributed collection of objects of type Row but Spark also allows JVM users to create
their own objects (via case classes or java beans) and manipulate them using function programming concepts. For instance, rather than creating a range and manipulating it via SQL or DataFrames, we can manipulate it just as we might manipulate a local scala collection. We can map over the values with a user defined function, and convert it into a new arbitrary case class object. 

In essence, within the Structured APIs, there are two more APIs, the “untyped” DataFrames and the “typed” Datasets. To say that DataFrames are untyped is a bit of a misnomer, they have types but Spark maintains them completely and only checks whether those types line up to those specified in the schema at runtime. Datasets, on the other hand, check whether or not types conform to the specification at compile time. Datasets are only available to JVM based languages (Scala and Java) and we specify types with case classes or Java beans.

#### Columns
Columns can represent a simple type like an integer or string, a complex types like an array or map, or a null value. Spark tracks all of this type information to you and has a variety of ways that you can transform columns.

#### Rows

There are two ways of getting data into Spark, through Rows and Encoders. 

Row objects are the most general way of getting data into, and out of, Spark and are available in all languages. 

Each record in a DataFrame must be of Row type.



## Overview of Structured API Execution

1. Write DataFrame/Dataset/SQL Code
2. If valid code, Spark converts this to a Logical Plan
3. Spark transforms this Logical Plan to a Physical Plan
4. Spark then executes this Physical Plan on the cluster



<img src="images/df_execution_plan.png">

### Logical Planning
The first phase of execution is meant to take user code and convert it into a logical plan.
This logical plan only represents a set of abstract transformations that do not refer to executors or drivers, it’s purely to convert the user’s set of expressions into the most optimized version. It does this by converting user code into an unresolved logical plan. This unresolved because while your code may be valid, the tables or columns that it refers to may or may not exist. 

Spark uses the catalog, a repository of all table and DataFrame information, in order to resolve
columns and tables in the analyzer. 

The analyzer may reject the unresolved logical plan if it the required table or
column name does not exist in the catalog. If it can resolve it, this result is passed through the optimizer, a collection of rules, which attempts to optimize the logical plan by pushing down predicates or selections.

<img src="images/logical_planning.png">

### Physical Planning

After successfully creating an optimized logical plan, Spark then begins the physical planning process. 

The physical plan, often called a Spark plan, specifies how the logical plan will execute on the cluster by generating different physical execution strategies and comparing them through a cost model. 

An example of the cost comparison might be choosing how to perform a given join by looking at the physical attributes of a given table (how big the table is or how big its partitions are.)

Physical planning results in a series of RDDs and transformations. This result is why you may have heard Spark referred to as a compiler, it takes queries in DataFrames, Datasets, and SQL and compiles them into RDD transformations for you.

<img src="images/physical_planning.png">


### Schemas

A schema defines the column names and types of a DataFrame. 

Deciding whether or not you need to define a schema prior to reading in your data depends your use case. Often times for ad hoc analysis, schema on read works just fine (although at times it can be a bit slow with plain text file formats like csv or json). However, this can also lead to precision issues like a long type incorrectly set as an integer when reading in a file. 

When using Spark for production ETL, it is often a good idea to define your schemas
manually, especially when working with untyped data sources like csv and json because schema inference can vary depending on the type of data that you read in.




In [33]:
//Auto Loading of schema

val df=spark.read.format("json")
    .load("data/2015-summary.json")
    .schema

df: org.apache.spark.sql.types.StructType = StructType(StructField(DEST_COUNTRY_NAME,StringType,true), StructField(ORIGIN_COUNTRY_NAME,StringType,true), StructField(count,LongType,true))


A schema is a StructType made up of a number of fields, StructFields, that have a name, type, and a boolean flag which specifies whether or not that column can contain missing or null values. 
Schemas can also contain other StructType (Spark’s complex types).

In [45]:
//Mannually defning the schema
import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}

val mannualSchema = new StructType(Array(
        new StructField("DEST_COUNTRY_NAME", StringType, true),
        new StructField("ORIGIN_COUNTRY_NAME", StringType, true),
        new StructField("COUNT", LongType, false)))

val df=spark.read.format("json")
    .schema(mannualSchema)
    .load("data/2015-summary.json")

import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}
mannualSchema: org.apache.spark.sql.types.StructType = StructType(StructField(DEST_COUNTRY_NAME,StringType,true), StructField(ORIGIN_COUNTRY_NAME,StringType,true), StructField(COUNT,LongType,false))
df: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]


### Columns
There are a lot of different ways to construct and or refer to columns but the two simplest ways are with the col or column functions. To use either of these functions, we pass in a column name.

%scala

import org.apache.spark.sql.functions.{col, column}

col(“someColumnName”)

column(“someColumnName”)

$”myColumn”

#### Explicit Column References

df.col(“count”)





In [61]:
import org.apache.spark.sql.functions.{expr, col, column}
df.select(
    df.col("DEST_COUNTRY_NAME"),
    $"ORIGIN_COUNTRY_NAME",
    'count).show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania| null|
|    United States|            Croatia| null|
+-----------------+-------------------+-----+
only showing top 2 rows



import org.apache.spark.sql.functions.{expr, col, column}


In [63]:
df.select(
expr("DEST_COUNTRY_NAME as destination").alias("DEST_COUNTRY_NAME")
)

res30: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string]


In [64]:
df.selectExpr(
"*", // all original columns
"(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry"
).show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|COUNT|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania| null|        false|
|    United States|            Croatia| null|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



### Expressions

An expression is a set of transformations on one or more values in a record in a DataFrame. Think of it like a function that takes as input one or more column names, resolves them and then potentially applies more expressions to create a single value for each record in the dataset. 

Importantly, this “single value” can actually be a complex type like a Map type or Array type.

%scala

import org.apache.spark.sql.functions.{expr, col}

In this simple instance, expr(“someCol”) is equivalent to col(“someCol”).

expr(“someCol - 5”) is the same transformation as performing col(“someCol”) - 5 or even
expr(“someCol”) - 5. That’s because Spark compiles these to a logical tree specifying the order of operations. 

%scala

import org.apache.spark.sql.functions.expr

expr(“(((someCol + 5) * 200) - 6) < otherCol”)






### Creating Rows

You can create rows by manually instantiating a Row object with the values that below in each column. It’s important to note that only DataFrames have schema. Rows themselves do not have schemas. This means if you create a Row manually, you must specify the values in the same order as the schema of the DataFrame they may be appended to.

%scala

import org.apache.spark.sql.Row

val myRow = Row(“Hello”, null, 1, false)




### Creating DataFrames

We can also create DataFrames on the fly by taking a set of rows and converting them to a DataFrame


In [49]:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StructType,StringType, LongType}

val myManualSchema = new StructType(Array(
        new StructField("some", StringType, true),
        new StructField("col", StringType, true),
        new StructField("names", LongType, false)
    ))

val myRows = Seq(Row("Hello", null, 1L))
val myRDD = spark.sparkContext.parallelize(myRows)
val myDf = spark.createDataFrame(myRDD, myManualSchema)

myDf.show()

+-----+----+-----+
| some| col|names|
+-----+----+-----+
|Hello|null|    1|
+-----+----+-----+



import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}
myManualSchema: org.apache.spark.sql.types.StructType = StructType(StructField(some,StringType,true), StructField(col,StringType,true), StructField(names,LongType,false))
myRows: Seq[org.apache.spark.sql.Row] = List([Hello,null,1])
myRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[141] at parallelize at <console>:46
myDf: org.apache.spark.sql.DataFrame = [some: string, col: string ... 1 more field]


#### Converting to Spark Types (Literals)

Sometimes we need to pass explicit values into Spark that aren’t a new column but are just a value. This might be a constant value or something we’ll need to compare to later on. The way we do this is through literals. This is basically a translation from a given programming language’s literal value to one that Spark understands. Literals are expressions and can be used in the same way. 



In [66]:
import org.apache.spark.sql.functions.lit

df.select(
        expr("*"),
        lit(1).as("something")
    ).show(2)

+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|COUNT|something|
+-----------------+-------------------+-----+---------+
|    United States|            Romania| null|        1|
|    United States|            Croatia| null|        1|
+-----------------+-------------------+-----+---------+
only showing top 2 rows



import org.apache.spark.sql.functions.lit


#### Adding/Renaming Columns

In [89]:
var dfWithLongColName = df
.withColumn("This Long Column-Name",expr("ORIGIN_COUNTRY_NAME"))

dfWithLongColName.printSchema
dfWithLongColName.show(2)

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- COUNT: long (nullable = true)
 |-- This Long Column-Name: string (nullable = true)

+-----------------+-------------------+-----+---------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|COUNT|This Long Column-Name|
+-----------------+-------------------+-----+---------------------+
|    United States|            Romania| null|              Romania|
|    United States|            Croatia| null|              Croatia|
+-----------------+-------------------+-----+---------------------+
only showing top 2 rows



dfWithLongColName: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 2 more fields]


In [91]:
dfWithLongColName = dfWithLongColName.withColumnRenamed("This Long Column-Name", "somecolumn")

dfWithLongColName.show(2)

+-----------------+-------------------+-----+----------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|COUNT|somecolumn|
+-----------------+-------------------+-----+----------+
|    United States|            Romania| null|   Romania|
|    United States|            Croatia| null|   Croatia|
+-----------------+-------------------+-----+----------+
only showing top 2 rows



dfWithLongColName: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 2 more fields]


#### Changing a Column’s Type (cast)

%sql

SELECT cast(count as int) FROM dfTable

In [93]:
df.printSchema()
df.withColumn("count", col("count").cast("int")).printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- COUNT: long (nullable = true)

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)



#### Filtering Rows

There are two methods to perform this operation, we can use where or filter and they both will perform the same operation and accept the same argument types when used with DataFrames. 

The Dataset API has slightly different options.



In [95]:
val colCondition = df.filter(col("count") < 2).take(2)
val conditional = df.where("count < 2").take(2)

colCondition: Array[org.apache.spark.sql.Row] = Array()
conditional: Array[org.apache.spark.sql.Row] = Array()


#### Getting Unique Rows

%sql

SELECT COUNT(DISTINCT ORIGIN_COUNTRY_NAME, DEST_COUNTRY_NAME) FROM dfTable

In [98]:
df.select("ORIGIN_COUNTRY_NAME").distinct().count

res47: Long = 125


#### Sorting Rows

When we sort the values in a DataFrame, we always want to sort with either the largest or smallest values at the top of a DataFrame. There are two equivalent operations to do this **sort** and **orderBy** that work the exact same way. They accept both column expressions and strings as well as multiple columns. The default is to sort in ascending order.

In [101]:
df.sort("count").show(5)
df.orderBy(desc("count"), asc("DEST_COUNTRY_NAME")).show(2)
df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|COUNT|
+-----------------+-------------------+-----+
|    United States|            Romania| null|
|    United States|            Croatia| null|
|    United States|            Ireland| null|
|            Egypt|      United States| null|
|    United States|              India| null|
+-----------------+-------------------+-----+
only showing top 5 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|COUNT|
+-----------------+-------------------+-----+
|          Algeria|      United States| null|
|           Angola|      United States| null|
+-----------------+-------------------+-----+
only showing top 2 rows

+-------------------+-------------------+-----+
|  DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|COUNT|
+-------------------+-------------------+-----+
|            Algeria|      United States| null|
|             Angola|      United States| null|
|           Anguilla