# Cloud Workshop Azure Databricks
## 02. Introduction Apache Spark avec Databricks

** Welcome to Databricks! **

This notebook is intended to be the first step in your process to learn more about how to best use Apache Spark on Databricks together. We'll be walking through the core concepts, the fundamental abstractions, and the tools at your disposal. This notebook will teach the fundamental concepts and best practices directly from those that have written Apache Spark and know it best.

#### Databricks Terminology

Databricks has key concepts that are worth understanding. You'll notice that many of these line up with the links and icons that you'll see on the left side. These together define the fundamental tools that Databricks provides to you as an end user. They are available both in the web application UI as well as the REST API.

-   ****Workspaces****
    -   Workspaces allow you to organize all the work that you are doing on Databricks. Like a folder structure in your computer, it allows you to save ****notebooks**** and ****libraries**** and share them with other users. Workspaces are not connected to data and should not be used to store data. They're simply for you to store the ****notebooks**** and ****libraries**** that you use to operate on and manipulate your data with.
-   ****Notebooks****
    -   Notebooks are a set of any number of cells that allow you to execute commands. Cells hold code in any of the following languages: `Scala`, `Python`, `R`, `SQL`, or `Markdown`. Notebooks have a default language, but each cell can have a language override to another language. This is done by including `%[language name]` at the top of the cell. For instance `%python`. We'll see this feature shortly.
    -   Notebooks need to be connected to a ****cluster**** in order to be able to execute commands however they are not permanently tied to a cluster. This allows notebooks to be shared via the web or downloaded onto your local machine.
    -   Here is a demonstration video of [Notebooks](http://www.youtube.com/embed/MXI0F8zfKGI).
    -   ****Dashboards****
        -   ****Dashboards**** can be created from ****notebooks**** as a way of displaying the output of cells without the code that generates them. 
    - ****Notebooks**** can also be scheduled as ****jobs**** in one click either to run a data pipeline, update a machine learning model, or update a dashboard.
-   ****Libraries****
    -   Libraries are packages or modules that provide additional functionality that you need to solve your business problems. These may be custom written Scala or Java jars; python eggs or custom written packages. You can write and upload these manually or you may install them directly via package management utilities like pypi or maven.
-   ****Tables****
    -   Tables are structured data that you and your team will use for analysis. Tables can exist in several places. Tables can be stored on Amazon S3, they can be stored on the cluster that you're currently using, or they can be cached in memory. [For more about tables see the documentation](https://docs.cloud.databricks.com/docs/latest/databricks_guide/index.html#02%20Product%20Overview/07%20Tables.html).
-   ****Clusters****
    -   Clusters are groups of computers that you treat as a single computer. In Databricks, this means that you can effectively treat 20 computers as you might treat one computer. Clusters allow you to execute code from ****notebooks**** or ****libraries**** on set of data. That data may be raw data located on S3 or structured data that you uploaded as a ****table**** to the cluster you are working on. 
    - It is important to note that clusters have access controls to control who has access to each cluster.
    -   Here is a demonstration video of [Clusters](http://www.youtube.com/embed/2-imke2vDs8).
-   ****Jobs****
    -   Jobs are the tool by which you can schedule execution to occur either on an already existing ****cluster**** or a cluster of its own. These can be ****notebooks**** as well as jars or python scripts. They can be created either manually or via the REST API.
    -   Here is a demonstration video of [Jobs](<http://www.youtube.com/embed/srI9yNOAbU0).
-   ****Apps****
    -   Apps are third party integrations with the Databricks platform. These include applications like Tableau.


## Databricks and Apache Spark Abstractions


### The Contexts/Environments

Let's now tour the core abstractions in Apache Spark to ensure that you'll be comfortable with all the pieces that you're going to need to understand in order to understand how to use Databricks and Spark effectively.

Historically, Apache Spark has had two core contexts that are available to the user. The `sparkContext` made available as `sc` and the `SQLContext` made available as `sqlContext`, these contexts make a variety of functions and information available to the user. The `sqlContext` makes a lot of DataFrame functionality available while the `sparkContext` focuses more on the Apache Spark engine itself.

However in Apache Spark 2.X, there is just one context - the `SparkSession`.

### The Data Interfaces

There are several key interfaces that you should understand when you go to use Spark.

-   ****The Dataset****
    -   The Dataset is Apache Spark's newest distributed collection and can be considered a combination of DataFrames and RDDs. It provides the typed interface that is available in RDDs while providing a lot of conveniences of DataFrames. It will be the core abstraction going forward.
-   ****The DataFrame****
    -   The DataFrame is collection of distributed `Row` types. These provide a flexible interface and are similar in concept to the DataFrames you may be familiar with in python (pandas) as well as in the R language.
-   ****The RDD (Resilient Distributed Dataset)****
    -   Apache Spark's first abstraction was the RDD or Resilient Distributed Dataset. Essentially it is an interface to a sequence of data objects that consist of one or more types that are located across a variety of machines in a cluster. RDD's can be created in a variety of ways and are the "lowest level" API available to the user. While this is the original data structure made available, new users should focus on Datasets as those will be supersets of the current RDD functionality.


# Getting Started with Some Code!

Whew, that's a lot to cover thus far! But we've made it to the demonstration so we can see the power of Apache Spark and Databricks together. To do this you can do one of several things. First, and probably simplest, is that you can copy this notebook into your own environment via the `Import Notebook` button that is available at the top right or top left of this page. If you'd rather type all of the commands yourself, you can create a new notebook and type the commands as we proceed.

## Creating a Cluster

*If you're in the Community Edition of Databricks, this will all happen automatically once you start running cells in a notebook! However you're free to follow the below directions if you wish.*

Click the Clusters button that you'll notice on the left side of the page. On the Clusters page, click on ![img]

Enter Cluster Details, 

-   Select a unique name for the cluster.
-   Select the Spark Version.
    -   Optionally, you can test out experimental versions of Spark.
-   Enter the number of workers to bring up - at least 1 is required to run Spark commands.
-   Select whether to use a Serverless Pool (Beta) or a dedicated cluter.

first let's explore the previously mentioned `SparkSession`. We can access it via the `spark` variable. As explained, the Spark Session is the core location for where Apache Spark related information is stored. For Spark 1.X the variables are `sqlContext` and `sc`.

Cells can be executed by hitting `shift+enter` while the cell is selected.

In [4]:
spark

In [5]:
print("Hello")

In [6]:
x=10
y=20
z=x+y

In [7]:
print(z)

In [8]:
import sys;
print("Version PySpark : ", sys.version)

In [9]:
import datetime
date = datetime.datetime.now()
print("Date : ", str(date))

We can use the Spark Context to access information but we can also use it to parallelize a collection as well. Here we'll parallelize a small python range that will provide a return type of `DataFrame`.

In [11]:
firstDataFrame = sqlContext.range(1000000)

# The code for python 2.X is
# spark.range(1000000)
print(firstDataFrame)

In [12]:
display(firstDataFrame)

id
0
1
2
3
4
5
6
7
8
9


Now one might think that this would actually print out the values of the `DataFrame` that we just parallelized, however that's not quite how Apache Spark works. Spark allows two distinct kinds of operations by the user. There are **transformations** and there are **actions**.

### Transformations

Transformations are operations that will not be completed at the time you write and execute the code in a cell - they will only get executed once you have called a **action**. An example of a transformation might be to convert an integer into a float or to filter a set of values.

### Actions

Actions are commands that are computed by Spark right at the time of their execution. They consist of running all of the previous transformations in order to get back an actual result. An action is composed of one or more jobs which consists of tasks that will be executed by the workers in parallel where possible

Here are some simple examples of transformations and actions. Remember, these **are not all** the transformations and actions - this is just a short sample of them. We'll get to why Apache Spark is designed this way shortly!

![transformations and actions](http://training.databricks.com/databricks_guide/gentle_introduction/trans_and_actions.png)

In [14]:
# An example of a transformation
# select the ID column values and multiply them by 2

secondDataFrame = firstDataFrame.selectExpr("(id * 2) as value")

In [15]:
# an example of an action
# take the first 5 values that we have in our firstDataFrame

firstDataFrame.take(5)

# take the first 5 values that we have in our secondDataFrame
secondDataFrame.take(5)

In [16]:
display(secondDataFrame)

value
0
2
4
6
8
10
12
14
16
18


Now we've seen that Spark consists of actions and transformations. Let's talk about why that's the case. The reason for this is that it gives a simple way to optimize the entire pipeline of computations as opposed to the individual pieces. This makes it exceptionally fast for certain types of computation because it can perform all relevant computations at once. Technically speaking, Spark `pipelines` this computation which we can see in the image below. This means that certain computations can all be performed at once (like a map and a filter) rather than having to do one operation for all pieces of data then the following operation.

![transformations and actions](http://training.databricks.com/databricks_guide/gentle_introduction/pipeline.png)

Apache Spark can also keep results in memory as opposed to other frameworks that immediately write to disk after each task.

## Apache Spark Architecture

Before proceeding with our example, let's see an overview of the Apache Spark architecture. As mentioned before, Apache Spark allows you to treat many machines as one machine and this is done via a master-worker type architecture where there is a `driver` or master node in the cluster, accompanied by `worker` nodes. The master sends work to the workers and either instructs them to pull to data from memory or from disk (or from another data source like S3 or Redshift).

The diagram below shows an example Apache Spark cluster, basically there exists a Driver node that communicates with executor nodes. Each of these executor nodes have slots which are logically like execution cores. 

![spark-architecture](http://training.databricks.com/databricks_guide/gentle_introduction/videoss_logo.png)

The Driver sends Tasks to the empty slots on the Executors when work has to be done:

![spark-architecture](http://training.databricks.com/databricks_guide/gentle_introduction/spark_cluster_tasks.png)

You can view the details of your Apache Spark application in the Apache Spark web UI.  The web UI is accessible in Databricks by going to "Clusters" and then clicking on the "View Spark UI" link for your cluster, it is also available by clicking at the top left of this notebook where you would select the cluster to attach this notebook to. In this option will be a link to the Apache Spark Web UI.

At a high level, every Apache Spark application consists of a driver program that launches various parallel operations on executor Java Virtual Machines (JVMs) running either in a cluster or locally on the same machine. In Databricks, the notebook interface is the driver program.  This driver program contains the main loop for the program and creates distributed datasets on the cluster, then applies operations (transformations & actions) to those datasets.
Driver programs access Apache Spark through a `SparkSession` object regardless of deployment location.

## A Worked Example of Transformations and Actions

To illustrate all of these architectural and most relevantly **transformations** and **actions** - let's go through a more thorough example, this time using `DataFrames` and a csv file. 

The DataFrame and SparkSQL work almost exactly as we have described above, we're going to build up a plan for how we're going to access the data and then finally execute that plan with an action. We'll see this process in the diagram below. We go through a process of analyzing the query, building up a plan, comparing them and then finally executing it.

![Spark Query Plan](http://training.databricks.com/databricks_guide/gentle_introduction/query-plan-generation.png)

While we won't go too deep into the details for how this process works, you can read a lot more about this process on the [Databricks blog](https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html). For those that want a more information about how Apache Spark goes through this process, I would definitely recommend that post!

Going forward, we're going to access a set of public datasets that Databricks makes available. Databricks datasets are a small curated group that we've pulled together from across the web. We make these available using the Databricks filesystem. Let's load the popular diamonds dataset in as a spark  `DataFrame`. Now let's go through the dataset that we'll be working with.

In [18]:
%fs ls /databricks-datasets/Rdatasets/data-001/datasets.csv

path,name,size
dbfs:/databricks-datasets/Rdatasets/data-001/datasets.csv,datasets.csv,168536


In [19]:
dataPath = "/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv"

diamonds = spark.read.format("com.databricks.spark.csv")\
  .option("header","true")\
  .option("inferSchema", "true")\
  .load(dataPath)
  

Now that we've loaded in the data, we're going to perform computations on it. This provide us a convenient tour of some of the basic functionality and some of the nice features that makes running Spark on Databricks the simplest! In order to be able to perform our computations, we need to understand more about the data. We can do this with the `display` function.

In [21]:
display(diamonds)

_c0,carat,cut,color,clarity,depth,table,price,x,y,z
1,0.23,Ideal,E,SI2,61.5,55.0,326,3.95,3.98,2.43
2,0.21,Premium,E,SI1,59.8,61.0,326,3.89,3.84,2.31
3,0.23,Good,E,VS1,56.9,65.0,327,4.05,4.07,2.31
4,0.29,Premium,I,VS2,62.4,58.0,334,4.2,4.23,2.63
5,0.31,Good,J,SI2,63.3,58.0,335,4.34,4.35,2.75
6,0.24,Very Good,J,VVS2,62.8,57.0,336,3.94,3.96,2.48
7,0.24,Very Good,I,VVS1,62.3,57.0,336,3.95,3.98,2.47
8,0.26,Very Good,H,SI1,61.9,55.0,337,4.07,4.11,2.53
9,0.22,Fair,E,VS2,65.1,61.0,337,3.87,3.78,2.49
10,0.23,Very Good,H,VS1,59.4,61.0,338,4.0,4.05,2.39


what makes `display` exceptional is the fact that we can very easily create some more sophisticated graphs by clicking the graphing icon that you can see below. Here's a plot that allows us to compare price, color, and cut.

In [23]:
display(diamonds)

_c0,carat,cut,color,clarity,depth,table,price,x,y,z
1,0.23,Ideal,E,SI2,61.5,55.0,326,3.95,3.98,2.43
2,0.21,Premium,E,SI1,59.8,61.0,326,3.89,3.84,2.31
3,0.23,Good,E,VS1,56.9,65.0,327,4.05,4.07,2.31
4,0.29,Premium,I,VS2,62.4,58.0,334,4.2,4.23,2.63
5,0.31,Good,J,SI2,63.3,58.0,335,4.34,4.35,2.75
6,0.24,Very Good,J,VVS2,62.8,57.0,336,3.94,3.96,2.48
7,0.24,Very Good,I,VVS1,62.3,57.0,336,3.95,3.98,2.47
8,0.26,Very Good,H,SI1,61.9,55.0,337,4.07,4.11,2.53
9,0.22,Fair,E,VS2,65.1,61.0,337,3.87,3.78,2.49
10,0.23,Very Good,H,VS1,59.4,61.0,338,4.0,4.05,2.39


In [24]:
display(diamonds)

_c0,carat,cut,color,clarity,depth,table,price,x,y,z
1,0.23,Ideal,E,SI2,61.5,55.0,326,3.95,3.98,2.43
2,0.21,Premium,E,SI1,59.8,61.0,326,3.89,3.84,2.31
3,0.23,Good,E,VS1,56.9,65.0,327,4.05,4.07,2.31
4,0.29,Premium,I,VS2,62.4,58.0,334,4.2,4.23,2.63
5,0.31,Good,J,SI2,63.3,58.0,335,4.34,4.35,2.75
6,0.24,Very Good,J,VVS2,62.8,57.0,336,3.94,3.96,2.48
7,0.24,Very Good,I,VVS1,62.3,57.0,336,3.95,3.98,2.47
8,0.26,Very Good,H,SI1,61.9,55.0,337,4.07,4.11,2.53
9,0.22,Fair,E,VS2,65.1,61.0,337,3.87,3.78,2.49
10,0.23,Very Good,H,VS1,59.4,61.0,338,4.0,4.05,2.39


Now that we've explored the data, let's return to understanding **transformations** and **actions**. I'm going to create several transformations and then an action. After that we will inspect exactly what's happening under the hood.

These transformations are simple, first we group by two variables, cut and color and then compute the average price. Then we're going to inner join that to the original dataset on the column `color`. Then we'll select the average price as well as the carat from that new dataset.

In [26]:
df1 = diamonds.groupBy("cut", "color").avg("price")

df2 = df1\
  .join(diamonds, on='color', how='inner')\
  .select("`avg(price)`", "carat")


In [27]:
display(df2)

avg(price),carat
3682.3125,0.23
3423.6441586280816,0.23
2597.5500896746094,0.23
3214.6520833333334,0.23
3538.9144201968334,0.23
3682.3125,0.21
3423.6441586280816,0.21
2597.5500896746094,0.21
3214.6520833333334,0.21
3538.9144201968334,0.21


These transformations are now complete in a sense but nothing has happened. As you'll see above we don't get any results back! 

The reason for that is these computations are *lazy* in order to build up the entire flow of data from start to finish required by the user. This is a intelligent optimization for two key reasons. Any calculation can be recomputed from the very source data allowing Apache Spark to handle any failures that occur along the way, successfully handle stragglers. Secondly, Apache Spark can optimize computation so that data and computation can be `pipelined` as we mentioned above. Therefore, with each transformation Apache Spark creates a plan for how it will perform this work.

To get a sense for what this plan consists of, we can use the `explain` method. Remember that none of our computations have been executed yet, so all this explain method does is tells us the lineage for how to compute this exact dataset.

In [29]:
df2.explain()

Now explaining the above results is outside of this introductory tutorial, but please feel free to read through it. What you should deduce from this is that Spark has generated a plan for how it hopes to execute the given query. Let's now run an action in order to execute the above plan.

In [31]:
df2.count()

This will execute the plan that Apache Spark built up previously. Click the little arrow next to where it says `(2) Spark Jobs` after that cell finishes executing and then click the `View` link. This brings up the Apache Spark Web UI right inside of your notebook. This can also be accessed from the cluster attach button at the top of this notebook. In the Spark UI, you should see something that includes a diagram something like this.

![img](http://training.databricks.com/databricks_guide/gentle_introduction/spark-dag-ui-before-2-0.png)

or

![img](http://training.databricks.com/databricks_guide/gentle_introduction/spark-dag-ui.png)

These are significant visualizations. The top one is using Apache Spark 1.6 while the lower one is using Apache Spark 2.0, we'll be focusing on the 2.0 version. These are Directed Acyclic Graphs (DAG)s of all the computations that have to be performed in order to get to that result. It's easy to see that the second DAG visualization is much cleaner than the one before but both visualizations show us all the steps that Spark has to get our data into the final form. 

Again, this DAG is generated because transformations are *lazy* - while generating this series of steps Spark will optimize lots of things along the way and will even generate code to do so. This is one of the core reasons that users should be focusing on using DataFrames and Datasets instead of the legacy RDD API. With DataFrames and Datasets, Apache Spark will work under the hood to optimize the entire query plan and pipeline entire steps together. You'll see instances of `WholeStageCodeGen` as well as `tungsten` in the plans and these are apart of the improvements [in SparkSQL which you can read more about on the Databricks blog.](https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html)

In this diagram you can see that we start with a CSV all the way on the left side, perform some changes, merge it with another CSV file (that we created from the original DataFrame), then join those together and finally perform some aggregations until we get our final result!

### Caching

One of the significant parts of Apache Spark is its ability to store things in memory during computation. This is a neat trick that you can use as a way to speed up access to commonly queried tables or pieces of data. This is also great for iterative algorithms that work over and over again on the same data. While many see this as a panacea for all speed issues, think of it much more like a tool that you can use. Other important concepts like data partitioning, clustering and bucketing can end up having a much greater effect on the execution of your job than caching however remember - these are all tools in your tool kit!

To cache a DataFrame or RDD, simply use the cache method.

In [34]:
df2.cache()

Caching, like a transformation, is performed lazily. That means that it won't store the data in memory until you call an action on that dataset. 

Here's a simple example. We've created our df2 DataFrame which is essentially a logical plan that tells us how to compute that exact DataFrame. We've told Apache Spark to cache that data after we compute it for the first time. So let's call a full scan of the data with a count twice. The first time, this will create the DataFrame, cache it in memory, then return the result. The second time, rather than recomputing that whole DataFrame, it will just hit the version that it has in memory.

Let's take a look at how we can discover this.

In [36]:
df2.count()

In the above example, we can see that this cuts down on the time needed to generate this data immensely - often by at least an order of magnitude. With much larger and more complex data analysis, the gains that we get from caching can be even greater!

> Fin