## DATABRICKS

Databricks is a zero-management cloud platform that provides:

* Fully managed Spark clusters
* An interactive workspace for exploration and visualization
* A production pipeline scheduler
* A platform for powering your favorite Spark-based applications

## Databricks File System
**Databricks File System(DBFS)** is a layer over Azure's blob storage
* Files in DBFS persist to the blob store, so data is not lost even after clusters are terminated

**Databricks Utilities - dbutils** 
* Access the DBFS through the Databricks utilities class (and other file IO routines)
* An instansce of DBUtils is already declared as dbutils

* **Note:** 
    * Please go through the "**The-Databricks-Environment.dbc**" and "**Databricks-Setup.pdf**" before proceeding further.
    * This dbc file is created only with **Python** code

## APACHE SPARK ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png)
SPARK is a unified processing engine that helps in managing and coordinating the execution of tasks on data across a cluster of computers. 

![Spark Engines](https://files.training.databricks.com/images/wiki-book/book_intro/spark_4engines.png)
<br/>
<br/>
* At its core is the Spark Engine.
* The DataFrames API provides an abstraction above RDDs while simultaneously improving performance 5-20x over traditional RDDs with its Catalyst Optimizer.
* Spark ML provides high quality and finely tuned machine learning algorithms for processing big data.
* The Graph processing API gives us an easily approachable API for modeling pairwise relationships between people, objects, or nodes in a network.
* The Streaming APIs give us End-to-End Fault Tolerance, with Exactly-Once semantics, and the possibility for sub-millisecond latency.

**RDDs**
* The primary data abstraction of Spark engine is the RDD: Resilient Distributed Dataset
  * Resilient, i.e., fault-tolerant with the help of RDD lineage graph and so able to recompute missing or damaged partitions due to node failures.
  * Distributed with data residing on multiple nodes in a cluster.
  * Dataset is a collection of partitioned data with primitive values or values of values, e.g., tuples or other objects.
* The original paper that gave birth to the concept of RDD is <a href="https://cs.stanford.edu/~matei/papers/2012/nsdi_spark.pdf" target="_blank">Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing</a> by Matei Zaharia et al.
* Today, with Spark 2.x, we treat RDDs as the assembly language of the Spark ecosystem.
* DataFrames, Datasets & SQL provide the higher level abstraction over RDDs.


**SPARK** is a distributed programming model where the user specifies
* **Transformations**, which build-up a directed acyclic- graph of instructions
* **Actions**, which begin the process of executing that graph of instructions, as a single job, by breaking it down into stages and tasks to execute across the cluster

-sandbox

##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) The Cluster: Drivers, Executors, Slots & Tasks
![Spark Physical Cluster, slots](https://files.training.databricks.com/images/105/spark_cluster_slots.png)

* The **Driver** is the JVM in which our application runs.
* The secret to Spark's awesome performance is parallelism.
  * Scaling vertically is limited to a finite amount of RAM, Threads and CPU speeds.
  * Scaling horizontally means we can simply add new "nodes" to the cluster almost endlessly.
* We parallelize at two levels:
  * The first level of parallelization is the **Executor** - a Java virtual machine running on a node, typically, one instance per node.
  * The second level of parallelization is the **Slot** - the number of which is determined by the number of cores and CPUs of each node.
* Each **Executor** has a number of **Slots** to which parallelized **Tasks** can be assigned to it by the **Driver**.

##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Task,Jobs & Stages
* **Task** : A task is a series of operations that works on the same parition and operations in a task pipelined together.
* **Stage** : A group of tasks that operate on the same sequence of RDD is a stage.
* **Job** : A job is made up of all the stages in aquery that is a series of transformations completed by an action.

* Each parallelized action is referred to as a **Job**.
* The results of each **Job** (parallelized/distributed action) is returned to the **Driver**.
* Depending on the work required, multiple **Jobs** will be required.
* Each **Job** is broken down into **Stages**.

![Spark Physical Cluster, tasks](https://files.training.databricks.com/images/105/spark_cluster_tasks.png)
<br/>
<br/>
* The JVM is naturally multithreaded, but a single JVM, such as our **Driver**, has a finite upper limit.
* By creating **Tasks**, the **Driver** can assign units of work to **Slots** for parallel execution.
* Additionally, the **Driver** must also decide how to partition the data so that it can be distributed for parallel processing (not shown here).
* Consequently, the **Driver** is assigning a **Partition** of data to each task - in this way each **Task** knows which piece of data it is to process.
* Once started, each **Task** will fetch from the original data source the **Partition** of data assigned to it.

In [0]:
dbutils.help()

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local').appName("spark").getOrCreate()

22/10/19 11:07:23 WARN Utils: Your hostname, tars resolves to a loopback address: 127.0.1.1; using 192.168.1.66 instead (on interface wlan0)
22/10/19 11:07:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/19 11:07:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [11]:
myrange = spark.range(1000).toDF("number")

In [12]:
myrange.printSchema()

root
 |-- number: long (nullable = false)



In [0]:
divisBy2 = myrange.where("number % 2 = 0")

In [0]:
divisBy2.count()

* This dbc file is created based on my understanding from ADB training and SPARK definitive guide from Databricks
* All the data that will be used as part of this can be downloaded from "https://github.com/databricks/Spark-The-Definitive-Guide"

In [0]:
%fs ls /FileStore/tables/FileStore/tables

path,name,size
dbfs:/FileStore/tables/FileStore/tables/2015_summary-ebaee.csv,2015_summary-ebaee.csv,7080


In [0]:
%fs head /FileStore/tables/FileStore/tables/2015_summary-ebaee.csv

In [0]:
import pandas as pd
my_df = pd.read_csv('https://raw.githubusercontent.com/databricks/Spark-The-Definitive-Guide/master/data/flight-data/csv/2015-summary.csv')
flightData2015 = spark.createDataFrame(my_df)

In [0]:
flightData2015.take(3)

Out[3]: [Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [0]:
flightData2015.show(3)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
+-----------------+-------------------+-----+
only showing top 3 rows



In [0]:
flightData2015.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [0]:
flightData2015.sort('count').explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#30L ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#30L ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [id=#65]
      +- Scan ExistingRDD arrow[DEST_COUNTRY_NAME#28,ORIGIN_COUNTRY_NAME#29,count#30L]




In [0]:
spark.conf.get('spark.sql.shuffle.partitions')

Out[7]: '200'

In [0]:
spark.conf.set('spark.sql.shuffle.partitions',"5")

In [0]:
flightData2015.sort('count').show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|              Cyprus|      United States|    1|
|       United States|         Montenegro|    1|
|       United States|          Lithuania|    1|
|       United States|             Cyprus|    1|
|       United States|            Namibia|    1|
|       United States|          Singapore|    1|
|       New Caledonia|      United States|    1|
|       United States|           Bulgaria|    1|
|            Djibouti|      United States|    1|
|            Suriname|      United States|    1|
|Saint Vincent and...|      United States|    1|
|                Iraq|      United States|    1|
|               Malta|      United States|    1|
|        Burkina Faso|      United States|    1|
|             Moldova|      United States|    1|
|           Indonesia|      United States|    1|
|       United States|            Croatia|    1|
|       United State

In [0]:
flightData2015.sort('count').take(2)

Out[10]: [Row(DEST_COUNTRY_NAME='Iraq', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='Indonesia', ORIGIN_COUNTRY_NAME='United States', count=1)]

Any dataFrame can be made into a table or view with a simple method called **createOrReplaceTempView**

In [0]:
flightData2015.createOrReplaceTempView("flight_data_2015")

To execute a SQL query, we’ll use the **spark.sql** function (remember spark is our SparkSession variable?) that conveniently, returns a new DataFrame

In [0]:
sqlWay = spark.sql("SELECT DEST_COUNTRY_NAME, count(1) FROM flight_data_2015 GROUP BY DEST_COUNTRY_NAME")

In [0]:
dataFrameWay = flightData2015\
              .groupBy("DEST_COUNTRY_NAME")\
              .count()

We can see that these plans compile to the exact same underlying plan!

In [0]:
sqlWay.explain()
dataFrameWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#28], functions=[finalmerge_count(merge count#75L) AS count(1)#63L])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#28, 5), ENSURE_REQUIREMENTS, [id=#140]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#28], functions=[partial_count(1) AS count#75L])
         +- Project [DEST_COUNTRY_NAME#28]
            +- Scan ExistingRDD arrow[DEST_COUNTRY_NAME#28,ORIGIN_COUNTRY_NAME#29,count#30L]


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#28], functions=[finalmerge_count(merge count#81L) AS count(1)#70L])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#28, 5), ENSURE_REQUIREMENTS, [id=#191]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#28], functions=[partial_count(1) AS count#81L])
         +- Project [DEST_COUNTRY_NAME#28]
            +- Scan ExistingRDD arrow[DEST_COUNTRY_NAME#28,ORIGIN_COUNTRY_NAME#29,count#30L]




use the **max function** to find out what the maximum number of flights to and from any given location

In [0]:
spark.sql("SELECT max(count) from flight_data_2015").take(1)

Out[15]: [Row(max(count)=370002)]

In [0]:
from pyspark.sql.functions import max

flightData2015.select(max("count")).take(1)

Out[16]: [Row(max(count)=370002)]

In [0]:
display(flightData2015)

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
Egypt,United States,15
United States,India,62
United States,Singapore,1
United States,Grenada,62
Costa Rica,United States,588
Senegal,United States,40
Moldova,United States,1


In [0]:
import pyspark.sql.functions as F

In [0]:
flightData2015.groupBy('DEST_COUNTRY_NAME').agg(F.sum('count').alias('sum')).orderBy(F.col('sum').desc()).limit(5).show()

+-----------------+------+
|DEST_COUNTRY_NAME|   sum|
+-----------------+------+
|    United States|411352|
|           Canada|  8399|
|           Mexico|  7140|
|   United Kingdom|  2025|
|            Japan|  1548|
+-----------------+------+



In [0]:
flightData2015.groupBy('DEST_COUNTRY_NAME').agg(F.sum('count').alias('sum')).orderBy(F.col('sum').desc()).limit(5).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=5, orderBy=[sum#139L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#28,sum#139L])
   +- HashAggregate(keys=[DEST_COUNTRY_NAME#28], functions=[finalmerge_sum(merge sum#143L) AS sum(count#30L)#138L])
      +- Exchange hashpartitioning(DEST_COUNTRY_NAME#28, 5), ENSURE_REQUIREMENTS, [id=#466]
         +- HashAggregate(keys=[DEST_COUNTRY_NAME#28], functions=[partial_sum(count#30L) AS sum#143L])
            +- Project [DEST_COUNTRY_NAME#28, count#30L]
               +- Scan ExistingRDD arrow[DEST_COUNTRY_NAME#28,ORIGIN_COUNTRY_NAME#29,count#30L]




To get top five destination countries using SPARK SQL

In [0]:
maxSql = spark.sql("select DEST_COUNTRY_NAME, sum(count) as destination_total \
           from flight_data_2015\
           GROUP By DEST_COUNTRY_NAME\
           ORDER By sum(count) DESC\
           LIMIT 5")
maxSql.collect()

Out[21]: [Row(DEST_COUNTRY_NAME='United States', destination_total=411352),
 Row(DEST_COUNTRY_NAME='Canada', destination_total=8399),
 Row(DEST_COUNTRY_NAME='Mexico', destination_total=7140),
 Row(DEST_COUNTRY_NAME='United Kingdom', destination_total=2025),
 Row(DEST_COUNTRY_NAME='Japan', destination_total=1548)]

Writing the same using DataFrames

In [0]:
from pyspark.sql.functions import desc

dataFrameMax = flightData2015\
              .groupBy('DEST_COUNTRY_NAME')\
              .sum('count')\
              .withColumnRenamed("sum(count)","destination_total")\
              .sort(desc("destination_total"))\
              .limit(5)

dataFrameMax.collect()

Out[23]: [Row(DEST_COUNTRY_NAME='United States', destination_total=411352),
 Row(DEST_COUNTRY_NAME='Canada', destination_total=8399),
 Row(DEST_COUNTRY_NAME='Mexico', destination_total=7140),
 Row(DEST_COUNTRY_NAME='United Kingdom', destination_total=2025),
 Row(DEST_COUNTRY_NAME='Japan', destination_total=1548)]

In [0]:
maxSql.explain()
dataFrameMax.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   TakeOrderedAndProject(limit=5, orderBy=[destination_total#151L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#28,destination_total#151L])
   +- *(2) HashAggregate(keys=[DEST_COUNTRY_NAME#28], functions=[finalmerge_sum(merge sum#157L) AS sum(count#30L)#152L])
      +- AQEShuffleRead coalesced
         +- ShuffleQueryStage 0, Statistics(sizeInBytes=4.9 KiB, rowCount=139, isRuntime=true)
            +- Exchange hashpartitioning(DEST_COUNTRY_NAME#28, 5), ENSURE_REQUIREMENTS, [id=#565]
               +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#28], functions=[partial_sum(count#30L) AS sum#157L])
                  +- *(1) Project [DEST_COUNTRY_NAME#28, count#30L]
                     +- *(1) Scan ExistingRDD arrow[DEST_COUNTRY_NAME#28,ORIGIN_COUNTRY_NAME#29,count#30L]
+- == Initial Plan ==
   TakeOrderedAndProject(limit=5, orderBy=[destination_total#151L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#28,destination_tot

## DATASET

** DataFrames** are
a distributed collection of objects of type Row but Spark also allows JVM users to create their own objects (via case classes or java beans) and manipulate them using function programming concepts.

Amazing thing about Datasets is that we can use them only when we need or want to.ability to manipulate arbitrary case classes with arbitrary functions makes expressing business logic simple.


I’ll define my own object and manipulate it via arbitrary map and filter functions. Once we’ve
performed our manipulations, Spark can automatically turn it back into a DataFrame and we can manipulate it
further using the hundreds of functions that Spark includes. This makes it easy to drop down to lower level, type
secure coding when necessary, and move higher up to SQL for more rapid analysis

In [0]:
%scala

case class ValueAndDouble(value:Long, valueDoubled:Long)

spark.range(2000)
      .map(value => ValueAndDouble(value, value*2))
      .filter(vAndD => vAndD.valueDoubled % 2 == 0)
      .where("value%3 =0")
      .count()

## Caching Data for Faster Access

Sometimes we are going to access a dataFrame multiple times in the same data flow and we want to avoid performing expensive joins over and over again. To avoid that we can use **CACHE()** function and to cache the data in memory

In [0]:
DF1 = spark.read.format("csv")\
      .option("inferschema","true")\
      .option("header","true")\
      .load("/FileStore/tables/FileStore/tables/2015_summary-ebaee.csv")


In [0]:
DF2 = DF1.groupBy("DEST_COUNTRY_NAME").count().collect()
DF3 = DF1.groupBy("ORIGIN_COUNTRY_NAME").count().collect()
DF4 = DF1.groupBy("count").count().collect()

The time taken to process these commands is to 3.07 seconds

Now using cache to estimate the time taken

In [0]:
DF1.cache()
DF1.count()

In [0]:
DF2 = DF1.groupBy("DEST_COUNTRY_NAME").count().collect()
DF3 = DF1.groupBy("ORIGIN_COUNTRY_NAME").count().collect()
DF4 = DF1.groupBy("count").count().collect()

This may not seem that wild but picture a large data set or one that requires a lot of computation in order to create. The savings can be immense.

## Structured Streaming

The best thing about Structured Streaming is that it allows you to rapidly and quickly get value out of streaming systems with simple switches, it also makes it easy to reason about because you can write your batch job as a way to prototype it and then you can convert it to streaming job.


The way all of this works is by incrementally processing that data.


* Note: Upload the **retail_data** found in "https://github.com/databricks/Spark-The-Definitive-Guide"

In [0]:
%fs ls /FileStore/tables/retail-data/by-day/

path,name,size
dbfs:/FileStore/tables/retail-data/by-day/2010_12_01-ec65d.csv,2010_12_01-ec65d.csv,275001
dbfs:/FileStore/tables/retail-data/by-day/2010_12_02-cddbc.csv,2010_12_02-cddbc.csv,191826
dbfs:/FileStore/tables/retail-data/by-day/2010_12_03-b5f23.csv,2010_12_03-b5f23.csv,190700
dbfs:/FileStore/tables/retail-data/by-day/2010_12_05-e147f.csv,2010_12_05-e147f.csv,246056
dbfs:/FileStore/tables/retail-data/by-day/2010_12_06-f177a.csv,2010_12_06-f177a.csv,339039
dbfs:/FileStore/tables/retail-data/by-day/2010_12_07-139ea.csv,2010_12_07-139ea.csv,255832
dbfs:/FileStore/tables/retail-data/by-day/2010_12_08-073ef.csv,2010_12_08-073ef.csv,235974
dbfs:/FileStore/tables/retail-data/by-day/2010_12_09-e1f01.csv,2010_12_09-e1f01.csv,252904
dbfs:/FileStore/tables/retail-data/by-day/2010_12_10-ff80e.csv,2010_12_10-ff80e.csv,241468
dbfs:/FileStore/tables/retail-data/by-day/2010_12_12-fe693.csv,2010_12_12-fe693.csv,132120


In [0]:
staticDataFrame = spark.read.format("csv")\
                  .option("header","true")\
                  .option("inferSchema","true")\
                  .load("/FileStore/tables/retail-data/by-day/*.csv")

In [0]:
staticDataFrame.createOrReplaceTempView("retail_data")

In [0]:
staticSchema = staticDataFrame.schema
staticSchema

In [0]:
staticDataFrame.show(5)

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window

In [0]:
(staticDataFrame.select(F.col('CustomerID'),(F.col('Quantity')*F.col('UnitPrice')).alias("total_cost"),F.col('InvoiceDate'))
                .groupBy(F.col('CustomerID'),F.window(F.col('InvoiceDate'),"1 day")).agg(F.sum('total_cost').alias("sum"))
                .orderBy(F.col('sum').desc()).show(5))

To find the largest sale hours where given customer makes large purchase

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", "5")

In [0]:
staticDataFrame.select(F.e)

In [0]:
from pyspark.sql.functions import window, column, desc, col

staticDataFrame\
  .selectExpr(
  "CustomerID",
  "(UnitPrice * Quantity) as total_cost",
  "InvoiceDate")\
  .groupBy(
  col("CustomerID"),window(col("InvoiceDate"),"1 day"))\
  .sum("total_cost")\
  .orderBy(desc("sum(total_cost)"))\
  .show(5)


In [0]:
%sql

SELECT
sum(total_cost),
CustomerId,
to_date(InvoiceDate) AS Invoice_date
FROM
(SELECT
CustomerId,
(UnitPrice * Quantity) as total_cost,
InvoiceDate
FROM
retail_data)
GROUP BY
CustomerId,to_date(InvoiceDate)
ORDER By
sum(total_cost) DESC

sum(total_cost),CustomerId,Invoice_date
71601.44,17450.0,2011-09-20
55316.08,,2011-11-14
42939.17,,2011-11-07
33521.39999999998,,2011-03-29
31975.590000000007,,2011-12-08
31661.540000000005,18102.0,2011-09-15
31347.479999999938,,2010-12-21
29693.82,18102.0,2011-10-21
25920.37,18102.0,2010-12-07
25833.559999999998,14646.0,2011-10-20


That’s the static DataFrame version. To make this as **streaming** use **readstream** instead of **read**

We are also using **maxFilesPerTrigger** which simply specifies the number of file we should read in at once. This is to make
our demonstration more “streaming” and in a production scenario this would be omitted.

The number of partitions that should be created after a shuffle, by default the value is **200** but since there aren’t many executors on my local machine it’s worth reducing this to **five**.

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", "5")

streamingDataFrame = spark.readStream\
                    .schema(staticSchema)\
                    .option("maxFilesPerTrigger",1)\
                    .format("csv")\
                    .option("header","true")\
                    .load("/FileStore/tables/retail-data/by-day/*.csv")

Now we can see the DataFrame is streaming **streamingDataFrame.isStreaming // returns true**

This is still a lazy operation, so we need to call streaming action to start the execution flow. So we will run the same query. It is expected Spark will only read in **one file at a time**.

In [0]:
streamingDataFrame.isStreaming

In [0]:
purchaseByCustomerPerHour = streamingDataFrame\
                            .selectExpr(
                            "CustomerId",
                            "(UnitPrice * Quantity) as total_cost",
                            "InvoiceDate")\
                            .groupBy(col("CustomerId"),window(col("InvoiceDate"),"1 day"))\
                            .sum("total_cost")

Now let’s kick off the stream! We’ll write it out to an in-memory table that we will update after each trigger. In this
case, each trigger is based on an individual file (the read option that we set). Spark will mutate the data in the inmemory
table such that we will always have the highest value.

In [0]:
purchaseByCustomerPerHour.writeStream\
                        .format("memory")\
                        .queryName("customer_purchases")\
                        .outputMode("complete")\
                        .start()

Now we can run queries against this table. Note to take a minute before doing so, this will allow the values to change
over time.

In [0]:
spark.sql("SELECT * FROM customer_purchases\
          ORDER BY 'sum(total_cost)' DESC")\
          .take(5)

You’ll notice that as we read in more data - the composition of our table changes! With each file the results may or
may not be changing based on the data. Since we’re grouping customers we hope to see an increase in the
top customer purchase amounts over time

## STRUCTURED API OVERVIEW

Structured APIs are a way of manipulating all sorts of data, from unstructured log files, to semi-structured CSV files, and highly structured Parquet files. 
These APIs refer to three core types of distributed collection APIs.

* Datasets
* DataFrames
* SQL Views and Tables

The way we store data on which to perform transformations and actions are DataFrames and Datasets. 


* To create a new DataFrame or Dataset, we call a **transformation**.
* To start computation or convert to native language types, we call an **action**.
* DataFrames and Datasets are (distributed) table like collections with well-defined rows and columns. Both are **immutable** and **lazily-evaluated**
* Each column must have the same number of rows as all the other columns (although you can use null to specify the lack of a value) and columns have type information that must be consistent for every row in the collection

###### Structured SPARK Types
* Spark is effectively a programming language of its own. It use **Catalyst Optimizer** to effectively execute the code. 
* Spark types map directlyto the different language APIs that Spark maintains and there exists a lookup table for each of these in each of Scala,Java, Python, SQL, and R.
    * Even if we use Spark’s Structured APIs from Python/R, the majority of our manipulations will operate strictly on Spark types, not   Python types. 
    For example, the below code does not perform addition in Scala or Python, it actually performs addition purely in Spark.

In [0]:
df = spark.range(1000).toDF("number")
df.select(df["number"]+10)

In [0]:
from pyspark.sql.types import 

This addition operation happens because Spark will convert an expression expressed in an input language to Spark’s
internal Catalyst representation of that same type information.

###### DataFrame Vs DataSet
* DataFrames are **untyped** by definition, but they have types internally but Spark maintains them completely and only checks whether those types line up to those specified in the schema at runtime
* DataSets check whether or not **types** conform to the specification at compile time. Datasets are only available to JVM based languages (Scala and Java) and we specify types with case classes or Java beans.
* To Spark in Scala, DataFrames are simply Datasets of Type Row. 

**The “Row” type is Spark’s internal representation of its optimized in memory format for computation. This format makes for highly specialized and efficient computation because rather than leveraging JVM types which can cause high garbage collection and object instantiation costs, Spark can operate on its own internal format without incurring any of those costs**.

#### Structured API Execution

The execution of a single structured API query from user code to executed code in following steps.
1. Write DataFrame/Dataset/SQL Code
2. If valid code, Spark converts this to a **Logical Plan**
3. Spark transforms this Logical Plan to a **Physical Plan**
4. Spark then executes this Physical Plan on the cluster

![Catalyst-Optimizer-diagram](https://databricks.com/wp-content/uploads/2018/05/Catalyst-Optimizer-diagram.png)
<br/>
<br/>


written code is submitted to Spark either through the console or via a submitted job.This code then passes through the **Catalyst Optimizer** which decides how the code should be executed and lays out a plan for doing so, before finally the code is run and the result is returned to the user.

###### Logical Planning
* This logical plan only represents a set of abstract transformations that do not refer to executors or drivers, it’s purely to convert the user’s set of expressions into the most optimized version.
* It does this by converting user code into an unresolved logical plan. This unresolved because while your code may be valid, the tables or columns that it refers to may or may not exist. 
* Spark uses the catalog, a repository of all table and DataFrame information, in order to resolve columns and tables in the analyzer. The analyzer may reject the unresolved logical plan if it the required table or column name does not exist in the catalog. 
* If it can resolve it, this result is passed through the optimizer, a collection of rules, which attempts to optimize the logical plan by **pushing down predicates or selections**.

###### Physical Planning
* The physical plan, often called a Spark plan, specifies how the logical plan will execute on the cluster by generating different physical execution strategies and comparing them through a cost model.  
  * An example of the cost comparison might be choosing how to perform a given join by looking at the physical attributes of a given table (how big the table is or how big its partitions are.)
* Physical planning results in a series of RDDs and transformations.

###### Execution
* Upon selecting a physical plan, Spark runs all of this code over RDDs, the lower-level programming interface of Spark.
* Spark performs further optimizations by, at runtime, generating native Java Bytecode that can remove whole tasks or stages during execution. Finally the result is returned to the user.

# Basic Structure Operations
###### Schemas
* schema defines the column names and types of a DataFrame. 
* Users can define schemas manually or users can read a schema from a data source (often called schema on read)

###### Partitioning
* The partitioning of the DataFrame defines the layout of the DataFrame or Dataset’s physical distribution across the cluster.
* The partitioning scheme defines how that is broken up, this can be set to be based on values in a certain column or non-deterministically.

In [2]:
df = spark.read.format("json")\
      .load("data/2015-summary.json")
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



[Stage 0:>                                                          (0 + 1) / 1]                                                                                

When using Spark for production ETL, it is often a good idea to define your schemas manually, especially when working with untyped data sources like csv and json because schema inference can vary depending on the type of data that you read in.

A schema is a 
* **StructType** made up of a number of fields
* **StructFields**, that have a name, type, and a boolean flag which specifies whether or not that column can contain missing or null values. Schemas can also contain other StructType (Spark’s complex types).

In [22]:
from pyspark.sql.types import StructField, StructType, StringType, LongType
from pyspark import SparkFiles

myManualSchema = StructType([
  StructField("DEST_COUNTRY_NAME",StringType(),True),
  StructField("ORIGIN_COUNTRY_NAME",StringType(),True),
  StructField("count",LongType(),False)
])


df = spark.read.format("json")\
      .schema(myManualSchema)\
      .load('data/2015-summary.json')

df.show(3)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
+-----------------+-------------------+-----+
only showing top 3 rows



In [33]:
from pyspark.sql.functions import col, column
col('someColumnName')
column('someColumnName')

df.first()

Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)

In [42]:
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType,StringType, LongType

myManualSchema = StructType([
  StructField("some",StringType(),True),
  StructField("col",StringType(),True),
  StructField("names",LongType(),False)
])

myRow = Row("Hello", None, 1)
myDf = spark.createDataFrame([myRow], myManualSchema)
myDf.show()

[Stage 4:>                                                          (0 + 1) / 1]

+-----+----+-----+
| some| col|names|
+-----+----+-----+
|Hello|null|    1|
+-----+----+-----+



                                                                                

In [43]:
myDf.columns

['some', 'col', 'names']

In [45]:
# create dataframe
df = spark.read.format('json').load('data/2015-summary.json')
df.createOrReplaceTempView('dfTable')
df.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

USE
* **select** method when you’re working with columns or expressions 
* **selectExpr** method when you’re working with expressions in strings.

In [46]:
df.select("DEST_COUNTRY_NAME","ORIGIN_COUNTRY_NAME").show(2)

+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
+-----------------+-------------------+
only showing top 2 rows



In [54]:
from pyspark.sql.functions import expr, col, column
df.select(expr('DEST_COUNTRY_NAME as destination'), col('DEST_COUNTRY_NAME'), column('DEST_COUNTRY_NAME')).show(2)

+-------------+-----------------+-----------------+
|       pupupu|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
+-------------+-----------------+-----------------+
|United States|    United States|    United States|
|United States|    United States|    United States|
+-------------+-----------------+-----------------+
only showing top 2 rows



In [55]:
df.selectExpr('DEST_COUNTRY_NAME as newColumnName', 'DEST_COUNTRY_NAME').show(2)

+-------------+-----------------+
|newColumnName|DEST_COUNTRY_NAME|
+-------------+-----------------+
|United States|    United States|
|United States|    United States|
+-------------+-----------------+
only showing top 2 rows



In [56]:
df.selectExpr(
"*", # all original columns
"(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry" )\
.show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



In [62]:
df.selectExpr('avg(count)', 'count(distinct(DEST_COUNTRY_NAME))').show(2)

+-----------+---------------------------------+
| avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
+-----------+---------------------------------+
|1770.765625|                              132|
+-----------+---------------------------------+



In [64]:
# literals in spark
from pyspark.sql.functions import lit

df.select(expr('*'), lit(1).alias('One')).show(2)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows



###### DataFrame Transformations
create DataFrames on the fly by taking a set of rows and converting them to a DataFrame.

###### Adding Columns
* new column can be added to a DataFrame using the **withColumn** method on DataFrame. 
* withColumn function takes two arguments: 
  * the column name and 
  * the expression that will create the value for that given row in the DataFrame

In [4]:
from pyspark.sql.functions import lit
from pyspark.sql.functions import expr, col, column

df.withColumn("numberOne", lit(1)).show(2)

+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
+-----------------+-------------------+-----+---------+
only showing top 2 rows



In [66]:
df.withColumn(
  "withinCountry",
  expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME"))\
  .show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



###### Renaming Columns
Using **withCOlumn** name works to rename the column, it’s often much easier (and readable) to use the **withColumnRenamed** method.

In [67]:
df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns

['dest', 'ORIGIN_COUNTRY_NAME', 'count']

###### Reserved Characters and Keywords in Column Names

One thing that you may come across is reserved characters like spaces or dashes in column names. Handling these means escaping column names appropriately. In Spark this is done with backtick (‘) characters. 

Let’s use the withColumn that we just learned about to create a Column with reserved characters.

In [70]:
dfWithLongColName = df.withColumn("This Long Column-Name",expr("ORIGIN_COUNTRY_NAME"))
dfWithLongColName.columns
dfWithLongColName.selectExpr("`This Long Column-Name`","`This Long Column-Name` as `new col`").show(2)
dfWithLongColName.createOrReplaceTempView("dfTableLong")

+---------------------+-------+
|This Long Column-Name|new col|
+---------------------+-------+
|              Romania|Romania|
|              Croatia|Croatia|
+---------------------+-------+
only showing top 2 rows



###### Removing Columns

we can remove columns using **select** method. However there is also a dedicated method called **drop**

In [74]:
dfWithLongColName.drop("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").columns

['count', 'This Long Column-Name']

###### Changing a Column’s Type (cast)
We can convert columns from one type to another by **casting** the column from one type to another.For instance let’s convert our count column from an integer to a Long type.

In [5]:
df.printSchema()
df.withColumn("count", col("count").cast("int")).printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)



###### Filtering Rows
There are two methods to perform this operation, we can use where or filter and they both will perform the same operation and accept the same argument types when used with DataFrames

In [19]:
colCondition = df.filter(col("count") < 2).take(2)
conditional = df.where("count < 2").take(2)

df.where(col('count') == 15).where(col('DEST_COUNTRY_NAME') != 'Egypt').show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|          Morocco|      United States|   15|
|    United States|             Latvia|   15|
|           Angola|      United States|   15|
+-----------------+-------------------+-----+



In [21]:
df.select('ORIGIN_COUNTRY_NAME').distinct().count()

125

In [24]:
# random samples
seed = 5
withReplacement = False
fraction = 0.5
df.sample(withReplacement, fraction, seed).count()

fish: Command substitutions not allowed
help(df.sample())
    ^


In [45]:
new_df = spark.range(10)
new_df.sample(True, 0.5, 6).show()

+---+
| id|
+---+
|  0|
|  1|
|  1|
|  2|
|  6|
|  9|
+---+



In [54]:
# random splits
dataFrames = df.randomSplit([0.25, 0.75], seed)
dataFrames[0].count() # 71
dataFrames[1].count() # 185

185

In [56]:
# concatenating and appending rows (union)
# since dataframes are immutable, we cannot append to DataFrame
# rather, we need to union the original DataFrame along with the new DataFrame

from pyspark.sql import Row

# schema object
schema = df.schema

# new rows
newRows = [Row("New Country", "Other Country", 5), Row("New Country 2", "Other Country 3", 1)]

# parallelized rows
parallelizedRows = spark.sparkContext.parallelize(newRows)

# new dataframe (schema connected to parallelized rows)
newDF = spark.createDataFrame(parallelizedRows, schema)

In [57]:
df.union(newDF)\
  .where("count = 1")\
  .where(col("ORIGIN_COUNTRY_NAME") != "United States")\
  .show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
|    United States|          Gibraltar|    1|
|    United States|             Cyprus|    1|
|    United States|            Estonia|    1|
|    United States|          Lithuania|    1|
|    United States|           Bulgaria|    1|
|    United States|            Georgia|    1|
|    United States|            Bahrain|    1|
|    United States|   Papua New Guinea|    1|
|    United States|         Montenegro|    1|
|    United States|            Namibia|    1|
|    New Country 2|    Other Country 3|    1|
+-----------------+-------------------+-----+



[Stage 97:>                                                         (0 + 1) / 1]                                                                                

In [60]:
# sorting rows
# sort and orderBy both work the same way
df.sort('count').show(5)
df.orderBy('count', 'DEST_COUNTRY_NAME').show(5)
df.orderBy(col('count').desc(), col('DEST_COUNTRY_NAME')).show(5)

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
+--------------------+-------------------+-----+
only showing top 5 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|     Burkina Faso|      United States|    1|
|    Cote d'Ivoire|      United States|    1|
|           Cyprus|      United States|    1|
|         Djibouti|      United States|    1|
|        Indonesia|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+

###### Limit
Often times you may just want the top ten of some DataFrame. For example, you might want to only work with the top
50 of some dataset. We do this with the limit method.

In [65]:
df.limit(5).show()
df.orderBy(expr("count").desc()).limit(6).show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
|           Canada|      United States|  8399|
|    United States|             Mexico|  7187|
|           Mexico|      United States|  7140|
|   United Kingdom|      United States|  2025|
+-----------------+-------------------+------+



##### Repartition and Coalesce

Repartition will incur a full shuffle of the data, regardless of whether or not one is necessary. This means that you should typically only repartition when the future number of partitions is greater than your current number of partitions or when you are looking to partition by a set of columns.

In [66]:
df.rdd.getNumPartitions()

1

In [67]:
df.repartition(5)

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

If we know we are going to be filtering by a certain column often, it can be worth repartitioning based on that column.

In [69]:
df.repartition(col("DEST_COUNTRY_NAME"))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

We can optionally specify the number of partitions we would like too.

In [70]:
df.repartition(5, col("DEST_COUNTRY_NAME"))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

**Coalesce** on the other hand will not incur a full shuffle and will try to combine partitions. This operation will shuffle
our data into 5 partitions based on the destination country name, then coalesce them (without a full shuffle).

In [71]:
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

###### Collecting Rows to the Driver
Spark has a Driver that maintains cluster information and runs user code. This means that when we call some method to collect data, this is collected to the Spark Driver.

* **collect** gets all data from the entire DataFrame
* **take** selects the first N rows
* **show** prints out a number of rows nicely.

In [74]:
collectDF = df.limit(10)
collectDF.take(5) # take works with an Integer count
collectDF.show() # this prints it out nicely

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
|    United States|          Singapore|    1|
|    United States|            Grenada|   62|
|       Costa Rica|      United States|  588|
|          Senegal|      United States|   40|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+



In [75]:
collectDF.show(5, False)
collectDF.collect()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |15   |
|United States    |Croatia            |1    |
|United States    |Ireland            |344  |
|Egypt            |United States      |15   |
|United States    |India              |62   |
+-----------------+-------------------+-----+
only showing top 5 rows



[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Grenada', count=62),
 Row(DEST_COUNTRY_NAME='Costa Rica', ORIGIN_COUNTRY_NAME='United States', count=588),
 Row(DEST_COUNTRY_NAME='Senegal', ORIGIN_COUNTRY_NAME='United States', count=40),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

Please use ** SPARK Definitive Guide** to understand more