# Introduction

## Spark Applications

![Spark Applications](../Images/spark_architecture.png)

## Spark APIs

### Spark Language APIs

* Scala
* Java
* Python
* SQL
* R

### The Spark Session

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", "5")

In [2]:
spark

### DataFrames

In [3]:
myRange = spark.range(1000).toDF("number")

The dataframe is the most common Structured API and simply represents a table of data with rows and columns.

### Partitions

To allow every executor to perform work in parallel, Spark breaks up the data inot chunks called partitions.

A partition is a collection of rows that sit in one physical machine in your cluster.

### Transformations

In Spark, the core data structures are __immutable__, meaning they cannot be changed after they're created.

To "change" a DataFrame, you need to instruct Spark how you would like to modify it to do what you want. These instructions are called __transformations__.

In [4]:
divisBy2 = myRange.where("number %2 = 0")

The transformation described above is an __abstract transformation__, and Spark will not act on transformations until we call an action.

There are __two__ types of transformations, __narrow dependencies__ and __wide dependencies__.

#### Narrow dependencies

![Narrow Dependecy](../Images/narrow_dependency.png)

Narrow dependencies are those for which each input partition will contribute to only one output partition

#### Wide dependencies

![Wide dependencies](../Images/wide_dependency.png)

A wide dependency style transformation will have input partitions contributing to many output partitions. You will often hear this referred to as a __shuffle__ whereby Spark will exchange partitions across the cluster.

#### Lazy Evaluation

Lazy evaluation means that Spark will wait until the last moment to execute the graph of computation instructions. In Spark, instead of modifying the data immediately when you express some operations, you build up a _plan_ of transformations that you would like to apply to your source data. An example of this is something called _predicate pushdown_.

### Actions

Transformations allow us to build up our logical plan. To trigger the computation, we run an __action__.

There are three kinds of __actions__:

* Actions to view data in the console,
* Actions to collect data to native objects in the respective language,
* Actions to write to output data sources.

### An End to End Example

We'll use Spark to analyze some [flight data from the United States Bureau of Transportation statistics.](https://github.com/databricks/Spark-The-Definitive-Guide/tree/master/data/flight-data)

In [5]:
flightData2015 = spark\
    .read\
    .option("inferSchema", "true")\
    .option("header", "true")\
    .csv("../data/flight-data/csv/2015-summary.csv")

flightData2015.take(3)


[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [6]:
flightData2015.sort("count").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#22 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#22 ASC NULLS FIRST, 5), ENSURE_REQUIREMENTS, [id=#33]
      +- FileScan csv [DEST_COUNTRY_NAME#20,ORIGIN_COUNTRY_NAME#21,count#22] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/c:/Users/marco/Documents/Spark Notebook/data/flight-data/csv/201..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




Nothing happens to the data when we call _sort_ because it's just a transformation. However we can see that Spark is building up a plan for how it will execute this across the cluster by looking at the _explain_ plan.

In [7]:
flightData2015.sort("count").take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

### DataFrames and SQL

With Spark SQL, you can register any DataFrame as a table of view (a temporary table) and query it using pure SQL. Both SQL queries or DataFrame code "compile" to the same underlying plan that we specify in DataFrame code.

You can make any DataFrame into a table or view with one simple method call:

In [8]:
flightData2015.createOrReplaceTempView("flight_data_2015")

In [9]:
sqlWay = spark.sql("""
    SELECT DEST_COUNTRY_NAME, count(1)
    FROM flight_data_2015
    GROUP BY DEST_COUNTRY_NAME
    """)

dataFrameWay = flightData2015\
    .groupBy("DEST_COUNTRY_NAME")\
    .count()

sqlWay.explain()
dataFrameWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#20], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#20, 5), ENSURE_REQUIREMENTS, [id=#55]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#20], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#20] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/c:/Users/marco/Documents/Spark Notebook/data/flight-data/csv/201..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#20], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#20, 5), ENSURE_REQUIREMENTS, [id=#68]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#20], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#20] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIn

In [10]:
maxSql = spark.sql("""
    SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
    FROM flight_data_2015
    GROUP BY DEST_COUNTRY_NAME
    ORDER BY sum(count) DESC
    LIMIT 5
""")

maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [11]:
from pyspark.sql.functions import desc

flightData2015.groupBy("DEST_COUNTRY_NAME")\
    .sum("count")\
    .withColumnRenamed("sum(count)", "destination_total")\
    .sort(desc("destination_total"))\
    .limit(5)\
    .show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



![The entire DataFrame transformation flow](../Images/transformation_flow.png)

In [13]:
flightData2015\
    .groupBy("DEST_COUNTRY_NAME")\
    .sum("count")\
    .withColumnRenamed("sum(count)", "destination_total")\
    .sort(desc("destination_total"))\
    .limit(5)\
    .explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=5, orderBy=[destination_total#99L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#20,destination_total#99L])
   +- HashAggregate(keys=[DEST_COUNTRY_NAME#20], functions=[sum(count#22)])
      +- Exchange hashpartitioning(DEST_COUNTRY_NAME#20, 5), ENSURE_REQUIREMENTS, [id=#204]
         +- HashAggregate(keys=[DEST_COUNTRY_NAME#20], functions=[partial_sum(count#22)])
            +- FileScan csv [DEST_COUNTRY_NAME#20,count#22] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/c:/Users/marco/Documents/Spark Notebook/data/flight-data/csv/201..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>


