# Introduction to Apache Spark on Databricks

** Welcome to Databricks! **

This notebook is intended to be the first step in your process to learn more about how to best use Apache Spark on Databricks together. We'll be walking through the core concepts, the fundamental abstractions, and the tools at your disposal. This notebook will teach the fundamental concepts and best practices directly from those that have written Apache Spark and know it best.

First, it's worth defining Databricks. Databricks is a managed platform for running Apache Spark - that means that you do not have to learn complex cluster management concepts nor perform tedious maintenance tasks to take advantage of Spark. Databricks also provides a host of features to help its users be more productive with Spark. It's a point and click platform for those that prefer a user interface like data scientists or data analysts. However, this UI is accompanied by a sophisticated API for those that want to automate aspects of their data workloads with automated jobs. To meet the needs of enterprises, Databricks also includes features such as role-based access control and other intelligent optimizations that not only improve usability for users but also reduce costs and complexity for administrators.

## Databricks Terminology

Databricks has key concepts that are worth understanding. You'll notice that many of these line up with the links and icons that you'll see on the left side. These together define the fundamental tools that Databricks provides to you as an end user. They are available both in the web application UI as well as the REST API.

-   ****Workspaces****
    -   Workspaces allow you to organize all the work that you are doing on Databricks. Like a folder structure in your computer, it allows you to save ****notebooks**** and ****libraries**** and share them with other users. Workspaces are not connected to data and should not be used to store data. They're simply for you to store the ****notebooks**** and ****libraries**** that you use to operate on and manipulate your data with.
-   ****Notebooks****
    -   Notebooks are a set of any number of cells that allow you to execute commands. Cells hold code in any of the following languages: `Scala`, `Python`, `R`, `SQL`, or `Markdown`. Notebooks have a default language, but each cell can have a language override to another language. This is done by including `%[language name]` at the top of the cell. For instance `%python`. We'll see this feature shortly.
    -   Notebooks need to be connected to a ****cluster**** in order to be able to execute commands however they are not permanently tied to a cluster. This allows notebooks to be shared via the web or downloaded onto your local machine.
    -   Here is a demonstration video of [Notebooks](http://www.youtube.com/embed/MXI0F8zfKGI).
    -   ****Dashboards****
        -   ****Dashboards**** can be created from ****notebooks**** as a way of displaying the output of cells without the code that generates them. 
    - ****Notebooks**** can also be scheduled as ****jobs**** in one click either to run a data pipeline, update a machine learning model, or update a dashboard.
-   ****Libraries****
    -   Libraries are packages or modules that provide additional functionality that you need to solve your business problems. These may be custom written Scala or Java jars; python eggs or custom written packages. You can write and upload these manually or you may install them directly via package management utilities like pypi or maven.
-   ****Tables****
    -   Tables are structured data that you and your team will use for analysis. Tables can exist in several places. Tables can be stored on Amazon S3, they can be stored on the cluster that you're currently using, or they can be cached in memory. [For more about tables see the documentation](https://docs.cloud.databricks.com/docs/latest/databricks_guide/index.html#02%20Product%20Overview/07%20Tables.html).
-   ****Clusters****
    -   Clusters are groups of computers that you treat as a single computer. In Databricks, this means that you can effectively treat 20 computers as you might treat one computer. Clusters allow you to execute code from ****notebooks**** or ****libraries**** on set of data. That data may be raw data located on S3 or structured data that you uploaded as a ****table**** to the cluster you are working on. 
    - It is important to note that clusters have access controls to control who has access to each cluster.
    -   Here is a demonstration video of [Clusters](http://www.youtube.com/embed/2-imke2vDs8).
-   ****Jobs****
    -   Jobs are the tool by which you can schedule execution to occur either on an already existing ****cluster**** or a cluster of its own. These can be ****notebooks**** as well as jars or python scripts. They can be created either manually or via the REST API.
    -   Here is a demonstration video of [Jobs](<http://www.youtube.com/embed/srI9yNOAbU0).
-   ****Apps****
    -   Apps are third party integrations with the Databricks platform. These include applications like Tableau.

## Databricks and Apache Spark Help Resources

Databricks comes with a variety of tools to help you learn how to use Databricks and Apache Spark effectively. Databricks holds the greatest collection of Apache Spark documentation available anywhere on the web. There are two fundamental sets of resources that we make available: resources to help you learn how to use Apache Spark and Databricks and resources that you can refer to if you already know the basics.

To access these resources at any time, click the question mark button at the top right-hand corner. This search menu will search all of the below sections of the documentation.

![img](http://training.databricks.com/databricks_guide/gentle_introduction/help_menu.png)

-   ****The Databricks Guide****
    -   The Databricks Guide is the definitive reference for you and your team once you've become accustomed to using and leveraging Apache Spark. It allows for quick reference of common Databricks and Spark APIs with snippets of sample code.
    -   The Guide also includes a series of tutorials (including this one!) that provide a more guided introduction to a given topic.
-   ****The Spark APIs****
    -   Databricks makes it easy to search the Apache Spark APIs directly. Simply use the search that is available at the top right and it will automatically display API results as well.
-   ****The Apache Spark Documentation****
    -   The Apache Spark open source documentation is also made available for quick and simple search if you need to dive deeper into some of the internals of Apache Spark.
-   ****Databricks Forums****
    -   [The Databricks Forums](https://forums.databricks.com/) are a community resource for those that have specific use case questions or questions that they cannot see answered in the guide or the documentation.
    
## Databricks and Apache Spark Abstractions

Now that we've defined the terminology and more learning resources - let's go through a basic introduction of Apache Spark and Databricks. While you're likely familiar with the concept of Spark, let's take a moment to ensure that we all share the same definitions and give you the opportunity to learn a bit about Spark's history.

### The Apache Spark project's History

Spark was originally written by the founders of Databricks during their time at UC Berkeley. The Spark project started in 2009, was open sourced in 2010, and in 2013 its code was donated to Apache, becoming Apache Spark. The employees of Databricks have written over 75% of the code in Apache Spark and have contributed more than 10 times more code than any other organization. Apache Spark is a sophisticated distributed computation framework for executing code in parallel across many different machines. While the abstractions and interfaces are simple, managing clusters of computers and ensuring production-level stability is not. Databricks makes big data simple by providing Apache Spark as a hosted solution.

### The Contexts/Environments

Let's now tour the core abstractions in Apache Spark to ensure that you'll be comfortable with all the pieces that you're going to need to understand in order to understand how to use Databricks and Spark effectively.

Historically, Apache Spark has had two core contexts that are available to the user. The `sparkContext` made available as `sc` and the `SQLContext` made available as `sqlContext`, these contexts make a variety of functions and information available to the user. The `sqlContext` makes a lot of DataFrame functionality available while the `sparkContext` focuses more on the Apache Spark engine itself.

However in Apache Spark 2.X, there is just one context - the `SparkSession`.

### The Data Interfaces

There are several key interfaces that you should understand when you go to use Spark.

-   ****The Dataset****
    -   The Dataset is Apache Spark's newest distributed collection and can be considering a combination of DataFrames and RDDs. It provides the typed interface that is available in RDDs while providing a lot of conveniences of DataFrames. It will be the core abstraction going forward.
-   ****The DataFrame****
    -   The DataFrame is collection of distributed `Row` types. These provide a flexible interface and are similar in concept to the DataFrames you may be familiar with in python (pandas) as well as in the R language.
-   ****The RDD (Resilient Distributed Dataset)****
    -   Apache Spark's first abstraction was the RDD or Resilient Distributed Dataset. Essentially it is an interface to a sequence of data objects that consist of one or more types that are located across a variety of machines in a cluster. RDD's can be created in a variety of ways and are the "lowest level" API available to the user. While this is the original data structure made available, new users should focus on Datasets as those will be supersets of the current RDD functionality.


# Getting Started with Some Code!

Whew, that's a lot to cover thus far! But we've made it to the demonstration so we can see the power of Apache Spark and Databricks together. To do this you can do one of several things. First, and probably simplest, is that you can copy this notebook into your own environment via the `Import Notebook` button that is available at the top right or top left of this page. If you'd rather type all of the commands yourself, you can create a new notebook and type the commands as we proceed.

## Creating a Cluster

*If you're in the Community Edition of Databricks, this will all happen automatically once you start running cells in a notebook! However you're free to follow the below directions if you wish.*

Click the Clusters button that you'll notice on the left side of the page. On the Clusters page, click on ![img](http://training.databricks.com/databricks_guide/create_cluster.png) in the upper left corner.

Then, enter the configuration for the new cluster:

![img](http://training.databricks.com/databricks_guide/create_cluster_2.13v1.png)

Finally, 

-   Select a unique name for the cluster.
-   Select the Spark Version.
    -   Optionally, you can test out experimental versions of Spark.
-   Enter the number of workers to bring up - at least 1 is required to run Spark commands.
-   Select whether to use On-Demand Instances, Spot Instances, or a combination of both.

To read more about some of the other options that are available to users please see the [Databricks Guide on Clusters](https://docs.cloud.databricks.com/docs/latest/databricks_guide/index.html#02%20Product%20Overview/01%20Clusters.html).

first let's explore the previously mentioned `SparkSession`. We can access it via the `spark` variable. As explained, the Spark Session is the core location for where Apache Spark related information is stored. For Spark 1.X the variables are `sqlContext` and `sc`.

Cells can be executed by hitting `shift+enter` while the cell is selected.

In [3]:
sqlContext
sc

In [4]:
## If you're on 2.X the spark session is made available with the variable below
#####

# spark

We can use the Spark Context to access information but we can also use it to parallelize a collection as well. Here we'll parallelize a small python range that will provide a return type of `DataFrame`.

In [6]:
firstDataFrame = sqlContext.range(1000000)

# The code for 2.X is
# spark.range(1000000)
print firstDataFrame

Now one might think that this would actually print out the values of the `DataFrame` that we just parallelized, however that's not quite how APache Spark works. Spark allows two distinct kinds of operations by the user. There are **transformations** and there are **actions**.

### Transformations

Transformations are operations that will not be completed at the time you write and execute the code in a cell - they will only get executed once you have called a **action**. An example of a transformation might be to convert an integer into a float or to filter a set of values.

### Actions

Actions are commands that are computed by Spark right at the time of their execution. They consist of running all of the previous transformations in order to get back an actual result. An action is composed of one or more jobs which consists of tasks that will be executed by the workers in parallel where possible

Here are some simple examples of transformations and actions. Remember, these **are not all** the transformations and actions - this is just a short sample of them. We'll get to why Apache Spark is designed this way shortly!

![transformations and actions](http://training.databricks.com/databricks_guide/gentle_introduction/trans_and_actions.png)

An example of a transformation, select the ID column values and multiply them by 2

**TODO**: create a new dataframe with ID column values multiplied by 2, and rename it to "value

*Hint:* use `.select()` to select the column value by column name `firstDataFrame['id']` and multiply by 2, use `.alias()` to rename the new column

In [9]:
# An example of a transformation
# select the ID column values and multiply them by 2
secondDataFrame = firstDataFrame.select(<FILL_IN>).alias(<FILL_IN>))

take the first 5 values of both DataFrames 

**TODO:** use `.take(5)` to show the first 5 values

In [11]:
# an example of an action
# take the first 5 values that we have in our firstDataFrame
print(firstDataFrame.take(5))

# TODO: take the first 5 values that we have in our secondDataFrame
print(<FILL_IN>)

Now we've seen that Spark consists of actions and transformations. Let's talk about why that's the case. The reason for this is that it gives a simple way to optimize the entire pipeline of computations as opposed to the individual pieces. This makes it exceptionally fast for certain types of computation because it can perform all relevant computations at once. Technically speaking, Spark `pipelines` this computation which we can see in the image below. This means that certain computations can all be performed at once (like a map and a filter) rather than having to do one operation for all pieces of data then the following operation.

![transformations and actions](http://training.databricks.com/databricks_guide/gentle_introduction/pipeline.png)

Apache Spark can also keep results in memory as opposed to other frameworks that immediately write to disk after each task.

## Apache Spark Architecture

Before proceeding with our example, let's see an overview of the Apache Spark architecture. As mentioned before, Apache Spark allows you to treat many machines as one machine and this is done via a master-worker type architecture where there is a `driver` or master node in the cluster, accompanied by `worker` nodes. The master sends work to the workers and either instructs them to pull to data from memory or from disk (or from another data source like S3 or Redshift).

The diagram below shows an example Apache Spark cluster, basically there exists a Driver node that communicates with executor nodes. Each of these executor nodes have slots which are logically like execution cores. 

![spark-architecture](http://training.databricks.com/databricks_guide/gentle_introduction/videoss_logo.png)

The Driver sends Tasks to the empty slots on the Executors when work has to be done:

![spark-architecture](http://training.databricks.com/databricks_guide/gentle_introduction/spark_cluster_tasks.png)

Note: In the case of the Community Edition there is no Worker, and the Master, not shown in the figure, executes the entire code.

![spark-architecture](http://training.databricks.com/databricks_guide/gentle_introduction/notebook_microcluster.png)

You can view the details of your Apache Spark application in the Apache Spark web UI.  The web UI is accessible in Databricks by going to "Clusters" and then clicking on the "View Spark UI" link for your cluster, it is also available by clicking at the top left of this notebook where you would select the cluster to attach this notebook to. In this option will be a link to the Apache Spark Web UI.

At a high level, every Apache Spark application consists of a driver program that launches various parallel operations on executor Java Virtual Machines (JVMs) running either in a cluster or locally on the same machine. In Databricks, the notebook interface is the driver program.  This driver program contains the main loop for the program and creates distributed datasets on the cluster, then applies operations (transformations & actions) to those datasets.
Driver programs access Apache Spark through a `SparkSession` object regardless of deployment location.

## An Example of Transformations and Actions

To illustrate all of these architectural and most relevantly **transformations** and **actions** - let's go through a more thorough example, this time using `DataFrames` and a csv file. 

The DataFrame and SparkSQL work almost exactly as we have described above, we're going to build up a plan for how we're going to access the data and then finally execute that plan with an action. We'll see this process in the diagram below. We go through a process of analyzing the query, building up a plan, comparing them and then finally executing it.

![Spark Query Plan](http://training.databricks.com/databricks_guide/gentle_introduction/query-plan-generation.png)

While we won't go too deep into the details for how this process works, you can read a lot more about this process on the [Databricks blog](https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html). For those that want a more information about how Apache Spark goes through this process, I would definitely recommend that post!

Going forward, we're going to access a set of public datasets that Databricks makes available. Databricks datasets are a small curated group that we've pulled together from across the web. We make these available using the Databricks filesystem. Let's load the popular diamonds dataset in as a spark `DataFrame`. Now let's go through the dataset that we'll be working with.

In [13]:
%fs ls /databricks-datasets/Rdatasets/data-001/datasets.csv

**TODO:** fill in the code below to read the csv file

*Hint: Use `"com.databricks.spark.csv"` to read .csv files and use `.option()` to provide parameters while loading the data*

In [15]:
dataPath = "/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv"
diamonds = sqlContext.read.format(<FILL_IN>)\ # use "com.databricks.spark.csv" to load .csv
  .option(<FILL_IN>)\ # there are headers in the data file, we want to set "header" to be "true"
  .option("inferSchema", "true")\ # this infers schema from the dataset
  .load(<FILL_IN>) # put datapath here
  
# inferSchema means we will automatically figure out column types 
# at a cost of reading the data more than once

Now that we've loaded in the data, we're going to perform computations on it. This provide us a convenient tour of some of the basic functionality and some of the nice features that makes running Spark on Databricks the simplest! In order to be able to perform our computations, we need to understand more about the data. We can do this with the `display` function.

In [17]:
display(diamonds)

Now that we've explored the data, let's return to understanding **transformations** and **actions**. I'm going to create several transformations and then an action.

These transformations are simple, first we group by two variables, cut and color and then compute the average price. Then we're going to inner join that to the original dataset on the column `color`. Then we'll select the average price as well as the carat from that new dataset.

**TODO:** use `.groupBy()` and `.avg()` with column names to finish the task.

In [19]:
df1 = diamonds.groupBy(<FILL_IN>).avg(<FILL_IN>) # group on "cut", "color" and calculate the average "price"

df2 = df1\
  .join(diamonds, on=<FILL_IN>, how='inner')\ # inner join on column "color"
  .select("`avg(price)`", "carat")
# a simple join and selecting some columns

These transformations are now complete in a sense but nothing has happened. As you'll see above we don't get any results back! 

The reason for that is these computations are *lazy* in order to build up the entire flow of data from start to finish required by the user. This is a intelligent optimization for two key reasons. Any calculation can be recomputed from the very source data allowing Apache Spark to handle any failures that occur along the way, successfully handle stragglers. Secondly, Apache Spark can optimize computation so that data and computation can be `pipelined` as we mentioned above. Therefore, with each transformation Apache spark creates a plan for how it will perform this work.

In [21]:
df2.count()

This will execute the plan that Apache Spark built up previously. Click the little arrow next to where it says `(2) Spark Jobs` after that cell finishes executing and then click the `View` link. This brings up the Apache Spark Web UI right inside of your notebook. This can also be accessed from the cluster attach button at the top of this notebook. In the Spark UI, you should see something that includes a diagram something like this.

![img](http://training.databricks.com/databricks_guide/gentle_introduction/spark-dag-ui.png)

These are significant visualizations. These are Directed Acyclic Graphs (DAG)s of all the computations that have to be performed in order to get to that result. The visualization shows us all the steps that Spark has to get our data into the final form. 

Again, this DAG is generated because transformations are *lazy* - while generating this series of steps Spark will optimize lots of things along the way and will even generate code to do so. This is one of the core reasons that users should be focusing on using DataFrames and Datasets instead of the legacy RDD API. With DataFrames and Datasets, Apache Spark will work under the hood to optimize the entire query plan and pipeline entire steps together. You'll see instances of `WholeStageCodeGen` as well as `tungsten` in the plans and these are apart of the improvements [in SparkSQL which you can read more about on the Databricks blog.](https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html)

In this diagram you can see that we start with a CSV all the way on the left side, perform some changes, merge it with another CSV file (that we created from the original DataFrame), then join those together and finally perform some aggregations until we get our final result!

### Caching

One of the significant parts of Apache Spark is its ability to to store things in memory during computation. This is a neat trick that you can use as a way to speed up access to commonly queried tables or pieces of data. This is also great for iterative algorithms that work over and over again on the same data. 

Without Caching, Spark will recompute the DataFrame everytime you call an Action.

While many see this as a panacea for all speed issues, think of it much more like a tool that you can use. Other important concepts like data partitioning, clustering and bucketing can end up having a much greater effect on the execution of your job than caching however remember - these are all tools in your tool kit!

To cache a DataFrame or RDD, simply use the cache method.

In [24]:
df2.cache()

Caching, like a transformation, is performed lazily. That means that it won't store the data in memory until you call an action on that dataset. 

Here's a simple example. We've created our df2 DataFrame which is essentially a logical plan that tells us how to compute that exact DataFrame. We've told Apache Spark to cache that data after we compute it for the first time. So let's call a full scan of the data with a count twice. The first time, this will create the DataFrame, cache it in memory, then return the result. The second time, rather than recomputing that whole DataFrame, it will just hit the version that it has in memory.

Let's take a look at how we can discover this.

In [26]:
df2.count()

However after we've now counted the data. We'll see that the explain ends up being quite different.

In [28]:
df2.count()

In the above example, we can see that this cuts down on the time needed to generate this data immensely - often by at least an order of magnitude. With much larger and more complex data analysis, the gains that we get from caching can be even greater!

# Apache Spark on Databricks for Data Scientists

## Tutorial Overview

This tutorial centers around a core idea that we hope to explore:

**The number of farmer's markets in a given zip code can be predicted from the income and taxes paid in a given area.**

It seems plausible that areas with higher income have more farmer's markets simply because there is more of a market for those goods. Of course there are many potential holes in this idea, but that's part of the desire to test it :). This tutorial will explore our process for discovering whether or not we can accurately predict the number of farmer's markets! *It is worth mentioning that this notebook is not intended to be a academically rigorous, but simply a good example of the work a data scientist might be performing using Apache Spark and Databricks.*

## The Data

![img](https://training.databricks.com/databricks_guide/USDA_logo.png)

The first of the two datasets that we will be working with is the **Farmers Markets Directory and Geographic Data**. This dataset contains information on the longitude and latitude, state, address, name, and zip code of Farmers Markets in the United States. The raw data is published by the Department of Agriculture. The version on the data that is found in Databricks (and is used in this tutorial) was updated by the Department of Agriculture on Dec 01, 2015.

![img](https://training.databricks.com/databricks_guide/irs-logo.jpg)

The second we will be working with is the **SOI Tax Stats - Individual Income Tax Statistics - ZIP Code Data (SOI)**. This study provides detailed tabulations of individual income tax return data at the state and ZIP code level and is provided by the IRS. This repository only has a sample of the data: 2013 and includes "AGI". The ZIP Code data show selected income and tax items classified by State, ZIP Code, and size of adjusted gross income. Data are based on individual income tax returns filed with the IRS and are available for Tax Years 1998, 2001, 2004 through 2013. The data include items, such as:

- Number of returns, which approximates the number of households
- Number of personal exemptions, which approximates the population
- Adjusted gross income
- Wages and salaries
- Dividends before exclusion
- Interest received

You can learn more about the two datasets on data.gov:

- [Farmer's Market Data](http://catalog.data.gov/dataset/farmers-markets-geographic-data/resource/cca1cc8a-9670-4a27-a8c7-0c0180459bef)
- [Zip Code Data](http://catalog.data.gov/dataset/zip-code-data)

### Getting the Data

Apache Spark provides simple and easy connectors to the kinds of data sources that Data Scientists use, and Databricks provides simple demonstrations of how to use them. Just search in the Databricks guide (use the `?` at the top left) to see if your data source is available. For the purposes of this tutorial, our files are already available in cloud storage via `dbfs`, the Databricks file system. [While you're free to upload the csvs made available on data.gov as a table](https://docs.cloud.databricks.com/docs/latest/databricks_guide/index.html#03%20Accessing%20Data/1%20Data%20Import%20UI.html) you can also (more easily) access this data via the `/databricks-datasets` directory which is a repository of public, Databricks-hosted datasets that is available on all Databricks accounts.

First things first! We've got to read in our data. This data is located in in csv files so we'll use the [spark-csv](https://github.com/databricks/spark-csv) package to do this. In Databricks, it's as simple as specifying the format of the csv file and loading it in as a DataFrame. In Apache Spark 2.0, you do not need to use the `spark-csv` package and can just read the data in directly.

As a data scientist, you've likely come across DataFrames either in R, or python and pandas. Apache Spark DataFrames do not stray too much from this abstraction except that they are distributed across a cluster of machines instead of existing on one machine (as is typically the case with R or pandas). If you're not quite familiar with this, it might be worth reading through [a Gentle Introduction to Apache Spark on Databricks](https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/346304/2168141618055043/484361/latest.html).

In [32]:
%python
taxes2013 = spark \
  .read.format(<FILL_IN>) \ # read csv file
  .option(<FILL_IN>) \ # file has header
  .load("dbfs:/databricks-datasets/data.gov/irs_zip_code_data/data-001/2013_soi_zipcode_agi.csv") 

In [33]:
%python
markets = spark \
  .read.format(<FILL_IN>) \ # read csv file
  .option(<FILL_IN>) \ # file has header
  .load("dbfs:/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/market_data.csv")

Now that we've loaded in the data - let's go ahead and register the DataFrames as SparkSQL tables.

While this might seem unnecessary, what it's going to allow you as a data scientist to do is to leverage your SQL skills immediately to manipulate the data. Some people prefer working directly with DataFrames while others prefer working in SparkSQL directly. Whatever the case, they take advantage of the same tungsten optimizations under the hood.

In [35]:
taxes2013.createOrReplaceTempView("taxes2013")
markets.createOrReplaceTempView("markets")

You can see that we are using the `registerTempTable`/`createOrReplaceTempView` method call to create this table. The lifetime of this temporary table is tied to the Spark/SparkSQL Context that was used to create this DataFrame. This means when you shutdown the SparkSession that is associated with a cluster (like when you shutdown the cluster) then the temporary table will disappear as well. In Databricks, SparkSessions are associated 1 to 1 with clusters.

## Running SQL Commands

As we progress through the notebook, you'll notice that all SQL cells are prefaced with `%sql`. This tells the Databricks environment that you'd like to execute a SQL command. You can do the same with python and R as you will see in other tutorials and parts of the documentation.

In order to list the tables, we can show them very easily by simply executing `show tables`. You'll see that this also provides information about their lifetime (and whether or not they are temporary or not).

In [37]:
%sql show tables

Now that we've loaded in the data, let's take a quick look at it. The `display` command makes it easy to quickly display a subset of the table that we have. This operates directly on our `DataFrame`.

In [39]:
display(taxes2013.limit(5))

A roughly equivalent operation would be to do a `SELECT *` on our recently created temp table above just like above. You'll see that this automatically gets limited to the first 1000 rows in order to avoid overflowing the browser.

In [41]:
%sql SELECT * FROM taxes2013 limit 5

We can see that we've got a variety of columns that you might want to look into further however for the purpose of this analysis I'm only going to look at a very small subset. I'm also going to perform two small manipulations to this data:

1. I'm going to do some simple type conversions and rename the columns to something a bit more semantic so that it's easier to talk about them going forward. 
2. I'm also going to shorten each zip code to be four digits instead of 5. This will make it so that we look a bit more at the general location around a zip code as opposed to a very specific one. This is an imprecise overall process, but for the purpose of this example works just fine.

In [43]:
%sql 
DROP TABLE IF EXISTS cleaned_taxes;

CREATE TABLE cleaned_taxes AS
SELECT state, int(zipcode / 10) as zipcode, 
  int(mars1) as single_returns, 
  int(mars2) as joint_returns, 
  int(numdep) as numdep, 
  double(A02650) as total_income_amount,
  double(A00300) as taxable_interest_amount,
  double(a01000) as net_capital_gains,
  double(a00900) as biz_net_income
FROM taxes2013

See how easy it is to create a derivative table to work with? Now that the data is cleaned up. I can start exploring the data. To do that I'm going to leverage the Databricks environment to create some nice plots. 

First I'm going to explore the average total income per zip code per state. We can see here that on the whole there isn't anything drastic. New Jersey and California have higher average incomes per zip code.

*Hint: After finishing and running the following command, Click on the plot type button underneath the table and choose Map. In `Plot Option`, choose `state` as Keys and `avg("total_income_amount")` as Values*

In [45]:
%python
cleanedTaxes = spark.table("cleaned_taxes")

# TODO: we want to group by "state" on "cleanedTaxes" table and calculte the average income based on "total_income_amount" column
display(<FILL_IN>)

You can try to come up with your own queries to explore the datasets.

One thing that is great about Apache Spark is that out of the box it can store and access tables in memory. All that we need to do is to `cache` the data to do so. We can do it through the `SparkSessions` with the `cacheTable` method which will be performed lazily.

In [47]:
spark.table("cleaned_taxes").cache()

Now that we've spent some time exploring the IRS data - let's take a moment to look at the Farmer's Market Data. We'll start off with a total summation of farmer's markets per state. You'll notice that I'm not using SQL at this time. While we can certainly query the SQL table, it's worth showing that all the functions available in SQL are also available directly on a DataFrame.

Select bar chart to display the plot.

In [49]:
display(markets.groupBy("State").count())

While these datasets probably warrant a lot more exploration, let's go ahead and prep the data for use in Apache Spark MLLib. Apache Spark MLLib has some specific requirements about how inputs are structured. Firstly input data has to be numeric unless you're performing a transformation inside of a data pipeline. What this means for you as a user is that Apache Spark won't automatically convert string to categories for instance and the output will be a `Double` type. Let's go ahead and prepare our data so that it meets those requirements as well as joining together our input data with the target variable - the number of farmer's markets in a given zipcode.

In [51]:
%python
cleanedTaxes = spark.sql("SELECT * FROM cleaned_taxes")

summedTaxes = cleanedTaxes \
  .groupBy("zipcode") \
  .sum() # because of AGI, where groups income groups are broken out 

cleanedMarkets = markets \
  .selectExpr("*", "int(zip / 10) as zipcode") \
  .<FILL_IN> \  # group by "zipcode"
  .<FILL_IN> \ # perform a "count"
  .selectExpr("double(count) as count", "zipcode as zip") \
# selectExpr is short for Select Expression - equivalent to what we
# might be doing in SQL SELECT expression

joined = cleanedMarkets \
  .join(summedTaxes, cleanedMarkets["zip"] == summedTaxes["zipcode"], "outer")

Now that we've joined our tax data to our output variable. We're going to have to do a final bit of cleanup before we can input this data into Spark MLLib. For example, when we go to display our joined data, we're going to have null values.

In [53]:
display(joined.limit(5))

Currently Apache Spark MLLib doesn't allow us to enter in null values (nor would it make sense to leave them out). Therefore we're going to replace them with 0's. Luckily, DataFrames make it easy to work with null data under the `.na` prefix as you'll see below. [You can see all of the null functions in the API documentation.](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameNaFunctions) These should be very familiar and similar to what you might find in pandas or in R DataFrames.

In [55]:
%python
prepped = joined.na.fill(0)
display(prepped.limit(5))

Now that all of our data is prepped. We're going to have to put all of it into one column of a vector type for Spark MLLib. This makes it easy to embed a prediction right in a DataFrame and also makes it very clear as to what is getting passed into the model and what isn't without have to convert it to a numpy array or specify an R formula. This also makes it easy to incrementally add new features, simply by adding to the vector. In the below case rather than specifically adding them in, I'm going to create a exclusionary group and just remove what is NOT a feature.

In [57]:
%python

nonFeatureCols = ['zip', 'zipcode', 'count']

featureCols = list(set(prepped.columns) - set(nonFeatureCols))
print(featureCols)

Now I'm going to use the `VectorAssembler` in Apache Spark to Assemble all of these columns into one single vector. To do this I'll have to set the input columns and output column. Then I'll use that assembler to transform the prepped data to my final dataset.

In [59]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=featureCols,
    outputCol="features")

finalPrep = assembler.transform(prepped)

Now in order to follow best practices, I'm going to perform a random split of 70-30 on the dataset for training and testing purposes. This can be used to create a validation set as well however this tutorial will omit doing so. It's worth noting that MLLib also supports performing hyperparameter tuning with cross validation and pipelines. All this can be found in [the Databrick's Guide](https://docs.cloud.databricks.com/docs/latest/databricks_guide/index.html#00%20Welcome%20to%20Databricks.html).

In [61]:
%python

# split the set 70% and 30%
# TODO: Use .randomSplit([<training_percentage>, <test_percentage>]) to split the data into 
[training, test] = finalPrep.<FILL_IN>

# Going to cache the data to make sure things stay snappy!
training.cache()
test.cache()

print(training.count())
print(test.count())

# Apache Spark MLLib

Now we're going to get into the core of Apache Spark MLLib. At a high level, we're going to create an instance of a `regressor` or `classifier`, that in turn will then be trained and return a `Model` type. 

You should import:

`from pyspark.ml.regression import LinearRegression`

In the below example, we're going to use linear regression.

The linear regression that is available in Spark MLLib supports an elastic net parameter allowing you to set a threshold of how much you would like to mix l1 and l2 regularization, for [more information on Elastic net regularization see Wikipedia](https://en.wikipedia.org/wiki/Elastic_net_regularization).

As we saw above, we had to perform some preparation of the data before inputting it into the model. We've got to do the same with the model itself. We'll set our hyper parameters, print them out and then finally we can train it! The `explainParams` is a great way to ensure that you're taking advantage of all the different hyperparameters that you have available.

In [63]:
from pyspark.ml.regression import LinearRegression

lrModel = LinearRegression(
  labelCol="count",
  featuresCol="features",
  elasticNetParam=0.5
)

print("Printing out the model Parameters:\n")
print("-"*20+"\n")
print(lrModel.explainParams())
print("\n"+"-"*20+"\n")

Now finally we can go about fitting our model! You'll see that we're going to do this in a series of steps. First we'll fit it, then we'll use it to make predictions via the `transform` method. This is the same way you would make predictions with your model in the future however in this case we're using it to evaluate how our model is doing. We'll be using regression metrics to get some idea of how our model is performing, we'll then print out those values to be able to evaluate how it performs.

In [65]:
from pyspark.mllib.evaluation import RegressionMetrics
# TODO: use .fit() to fit on training set
lrFitted = lrModel.<FILL_IN>

Now you'll see that since we're working with exact numbers (you can't have 1/2 a farmer's market for example), I'm going to check equality by first rounding the value to the nearest digital value.

In [67]:
holdout = lrFitted \
  .transform(test) \
  .selectExpr("prediction as raw_prediction", 
    "double(round(prediction)) as prediction", 
    "count", 
    """CASE double(round(prediction)) = count 
        WHEN true then 1
        ELSE 0
      END as equal""")
display(holdout.limit(5))

In [68]:
display(holdout.selectExpr("sum(equal)/sum(1)"))

Let's also calculate some regression metrics.

In [70]:
%python
# have to do a type conversion for RegressionMetrics
rm = RegressionMetrics(
  holdout.select("prediction", "count").rdd \
  .map(lambda x: (float(x[0]), float(x[1])))
)

print("MSE: %s" % rm.meanSquaredError)
print("MAE: %s" % rm.meanAbsoluteError)
print("RMSE Squared: %s" % rm.rootMeanSquaredError)
print("R Squared: %s" % rm.r2)
print("Explained Variance: %s\n" % rm.explainedVariance)

# Conclusion

We explored using Spark MLlib for a simple Linear Regression model. You can also check out different other models to get better performance. Hope this introduction to Spark and Spark ML gives you a good start point of using Spark on Databricks for the analytical works you are trying to do!