<a href="https://colab.research.google.com/github/rzl-ds/gu511/blob/master/016_spark.ipynb" target="_parent">
    <img src="https://colab.research.google.com/assets/colab-badge.svg"/>
</a>

# `spark`

## making some environments

there are two `spark` environments we could create in this lecture

1. a `databricks` community edition databricks cluster
2. an `emr` cluster (just like last time)

we will only create the former because it is simply much easier to use interactively. it is a limited resource (especially on the storage side!), but we will not be doing anything in-class that calls for massive data.

that being said, I will leave the instructions here for starting up an `emr` cluster for later homework or exam exercises or your personal use in the future.

### `databricks` community edition signup

`databricks` is one of the primary `spark` services companies, and (among many products) has a platform for managing `spark` cluster deployments (more on `databricks` later).

we will create "community edition" clusters -- fully managed `spark` environments that are free (but pretty limited).

**<div align="center">starting up a `databricks` community edition cluster</div>**

+ go to the community edition signup page: https://databricks.com/signup/signup-community
+ use your `georgetown.edu` email address to sign up
+ wait for an account confirmation email
+ log in [here](https://community.cloud.databricks.com/login.html)
+ start a cluster
    + click on "Clusters" in the left-hand menu
    + click the "Create Cluster" button
    + pick any name you want and launch your cluster
+ import this lecture
    + click on "Workspace" in the left-hand menu > "Users" > your email address
    + click the down arrow next to your user name in the right-most column and select "import"
    + import from URL and paste: https://raw.githubusercontent.com/rzl-ds/gu511/master/016_spark.ipynb

### `emr` spinup

**note: you don't need to do this! I'm just leaving the instructions here for your reference**

just as in the previous `hadoop` lecture, we will want to work with an `emr` cluster in this lecture. spinning one up is easy but takes about 12 or so minutes to fully spin up, so let's do that asap!

**<div align="center">starting up an `emr` cluster by cloning the one we made previously</div>**

the below steps will clone the cluster we made in the `hadoop` lecture, which we expected to csot about 1.8 USD over the 2.5 hours of this class

+ the following assumes you can use the same `.ssh` key you used for this cluster last lecture, so hopefully you still have that!
+ in the `aws` web console open the `emr` service
+ click the `clusters` menu
+ select the cluster you created last week and click the "Clone" button
+ that's it!

if any of this didn't work for you, go through the walkthrough in the next cell

**<div align="center">starting up an `emr` cluster from scratch</div>**

the below steps will create a cluster which, if we leave on throughout class, will cost about 1.8 USD.

+ in the `aws` web console open the `emr` service
+ click create cluster, and on the "quick options" screen select "advanced options"
+ software and steps
    + stay at emr-5.28.0
    + for software, click:
        + `hadoop`
        + `ganglia`
        + `hive`
        + `hue`
        + `spark`
        + `livy`
        + `pig`
    + notice but don't click: `jupyterhub`, `mxnet`, `tensorflow`
    + click next
+ hardware config -- leave all defaults
    + generally, think about space requirements [ala this](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-instances-guidelines.html#emr-plan-instances-hdfs)
+ general options -- pick a name
+ security options -- ***choose a key pair! make sure you have that key!!***

## the problem(s) with `hadoop`

last lecture we covered `hadoop` and `hadoop` tools. `hadoop` is great for solving a particular problem: efficient processing of truly massive data. while it *is* actually still somewhat common to write `hadoop streaming` `python` scripts for doing `etl` work, it's really not great for data science. data scientists are great at a lot of things, but `java` is usually not one of those things

as you've seen, it can feel intimidatingly architected and low-level. we also experienced some of the complexity involved in shoe-horning our analysis questions into the `mapreduce` framework - `hadoop` has us thinking pretty hard about the simple things we want to do (like count words or calculate averages), and when we've figured them out, we're only doing them *once*.

think about how that looks when we want to move on to something more complicated, like a gradient descent algorithm.

or anything iterative, for that matter.

1. take parameters and applying a model defined by those parameters to every record in our `hdfs` dataset -- map records of features to predicted `y` values, calculate individual `y` error term and gradients for each record, and `emit` those
2. reduce those partial gradients to determine the parameter update
3. update parameters (overwrite the input files)

each time we move between steps we are reading and writing to `hdfs` and that can be crazy wasteful

so in addition to being technically heavy and somewhat clunky to use, for our particular workflow it will also be pretty wasteful (spending a *ton* of time on IO, which is computationally expensive) and hard to pivot (want more than one `mapreduce`? want to fork a `mapreduce` pipeline? better just make `N` separate streaming jobs)

this is 2020. we should expect that someone has just done this for us. that's fair.

folks, guess what

## `spark` details

the name of the game in big data data science applications is `spark`.

### a bit of history

in the 2000s, `hadoop` users and developers were well aware of these limitations, and they started thinking of ways to improve distributed computing which addressed these problems

one of those developers was a computer science grad student at Cal named Matei Zaharia. he came up with the idea of a ["resilient distributed dataset"](https://www.usenix.org/conference/nsdi12/technical-sessions/presentation/zaharia) (aka `rdd`) - a way to load those distributed blocks of files in `hdfs` into the ***memory*** of distributed computers, and then hold it there while we perform our calculations.

`rdd`s are the core concept behind `apache spark`, and they allow us to do much faster and more flexible computation.

a key difference between the `spark` framework and the `hadoop + yarn` framework is that in `spark` we are avoiding all of those expensive IO operations. this is *faster* because it skips a lot of expensive IO and doesn't repeat reading data when it doesn't need to (it's in memory after all).

this is also more *flexible* because we can hold on to references to intermediate aggregations and views on data in memory. the products of our data workflow are "right here" in our `spark` session

Matei Zaharia, creator of `spark`, went on to become the CEO of `databricks` -- hence the massive prevalence of `databricks` in the `spark` ecosystem. `databricks` is one of those companies that is still actively developing OSS software (they still are the major maintainers of `spark` as an OSS project) as well as an extremely profitable software services company surrounding that OSS project.

### the `spark` stack

`spark` is a fast query and iterative algorithm calculation platform. it was built as a replacement for `mapreduce` for calculation workflows just like the ones in which we are most interested.

#### a compute engine

`spark` is a *compute engine*, which means that it is a *way* of doing computations. it supports a lot of different **data sources** for those computations, and knows how to perform them in a lot of different **environments**.

<br><div align="center"><img width=1100 src="https://files.training.databricks.com/images/105/unified-engine.png"></div>

focusing for a moment on the items in that `spark` box:

+ "core" `spark`: `scala` code implementing common computation tasks (file `io`, `mapping`, `reducing` , `filtering`) in as efficient a way as possible.
+ `rdd`s: the basic data structure for `spark` (more below)
+ `dataframe`s and a `sql` `api` for querying `dataframe`s (more below)
+ streaming: a slight tweak on the way you define you `dataframe` objects and you can pretty easily handle *streaming* data instead of static files
+ machine learning tools: `spark.ml` is a machine learning `api` built on `dataframe`s (`spark.mllib` still exists and is the same but for `rdd`s)
+ graph processing: a special type of dataframe and computation tools for working with relationship data

additionally, there are a handful of tools around `spark` for actually executing code, including

+ `databricks` notebooks and the `databricks` platform
+ `zeppelin`
+ `jupyter notebooks`

all of the above are notebooks with slightly different features, levels of complication to implement, and dramatically different price points

#### languages

`spark` happens to be about 600k lines of code, most of which is [`scala`](https://www.scala-lang.org/). `scala` is a functional programming language which can be compiled into `java`, and serves as the backbone of two very popular big data technologies (`spark` and `kafka`).

*note: we won't write much (if any) `scala` in this class, but if you are looking for a major level-up project, learning `scala` is the one for you*

you don't need to know `scala` to use `spark`, however. the `spark` community has developed accessible `spark` apis in a number of languages including:

+ `python` (`pyspark`)
+ `R` (`sparkr`)
+ `sql` (much of what core `spark` is doing is directly mappable to `sql` computations, so this integration is extremely tight)

also worth mentioning: many things come to `sql` before `pyspark`, and everything comes to `pyspark` before `sparkr`

#### `driver`s and `executor`s

as I wrote above, there is a similar division of responsibility happening in `spark` as there was in `hadoop`:

+ we have one central process responsible for orchestrating and delegating all computations
+ we have a fully scalable set of "dumb but fast" process for doing the needed tasks

for `spark` that top layer (the central orchestrator process) is called a `driver`, and it "drives" the `spark` program. it is responsible for knowing where the chunks of the loaded data live and how to convert your simple request into the complicated, distributed, highly-optimized `rdd` code that `spark` core understands

<br><div align="center"><img width=1100 src="https://files.training.databricks.com/images/105/spark_cluster_tasks.png"></div>

`spark` also introduces the concept of an `executor` -- a per-worker-node organizer process that *locally* manages the resources on that node. there are a number of workers on that machine, and they are either doing something (in which case there is a `task` they are working on) or they are not doing anything (in which case there is a `slot` that could receive a `task`)

<br><div align="center"><img width=1100 src="https://files.training.databricks.com/images/105/spark_cluster_tasks.png"></div>

think of these processes as distributed computing middle-managers. they get their directives from the CEO and figure out how to best use their team of workers to finish that task.

##### scaling clusters

when you want to scale your cluster (to handle larger datasets in shorter time), you increase the number of `executors`. the burden still falls on the `driver` to manage the complexity

while `spark` can easily adapt to having a new `executor`, the *way* that you get a new `executor` is not trivial

depending on your environment, you may have to do this all yourself.

+ `databricks`: completely managed for you
+ `emr`: must be configured correctly by you, but otherwise `aws` handles it

at the lower level, you are responsible for spinning up the node, getting the `executor` up and running, and informing the `driver` that a new `executor` is available for `task` assignment

**<div align="center">PAUSE FOR ZOOM BREAK</div>**

### `rdd`s and `dataframe`s

#### `rdd`s

`rdd`s are the core data object for `spark`. the basic idea of an `rdd` is simple enough: do for large datasets in memory what we did for large files on disk.

in the `hadoop` lecture we cited the requirements of distributed computing frameworks to be *fault tolerance, recoverability, consistency, and scalability*.

we addressed those items in `hdfs` by breaking big files up into blocks and saving them across multiple machines.

we created a central record (in the `NameNode` service) of where those blocks lived, and a `ResourceManager` and `ApplicationManager` new how to break up a large job involving that file into smaller sub-tasks which could be pushed to the workers that had those blocks

in `spark`, all of that is done in-memory. datasets are broken into blocks of rows and those rows are saved on workers. a central process is in charge of remembering where those blocks went and breaking up computation tasks into smaller pieces that can be distributed to the workers that have those memory blocks.

<br><div align="center"><img width=1100 src="http://image.slidesharecdn.com/youtubespk-141216130447-conversion-gate02/95/apache-spark-rdd-101-3-638.jpg"></div>

of course, it's more complicated than I've described it here - the way that `spark` decides to distribute those blocks of data will depend on the functional plan of attack (a sequence of functions) you define by writing `spark` code. `spark` will figure out how to distribute the data to many different nodes (in memory) to optimize that plan.

this is very similar to the `tensorflow` execution graph - delay computation until the whole roadmap is defined and the users asks for something specific

some important facts about `rdd`s

+ `rdd`s are immutable, read-only collections of objects
+ they can be built from a lineage (a series of functional programming language function calls)
    + this makes them *fault tolerant, recoverable, consistent*
+ they work in parallel, so *scalable*
+ they are operated on by `scala`, a `fpl` (functional programming language), so *consistent*
+ they are immutable, so *recoverable*

**<div align="center">SWITCH TO DATABRICKS</div>**

In [None]:
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
rdd

In [None]:
rdd.collect()

In [None]:
rdd_sq = rdd.map(lambda x: x ** 2)

In [None]:
rdd_sq.collect()

#### `dataframe`s

in `spark` version 1.x, `rdd`s were the *only* object you could interact with, and you did all of your calculations by building `rdd`s from files (in `hdfs`, `s3`, etc), and then writing calculations on them.

as part of their noble-and-generous-but-oops-I-also-made-a-bunch-of-money work, `databricks` developed a new `api` wrapping the underlying `rdd`s. the objects they created are called `dataframe`s.

the `dataframe` `api` was open-sourced and included in `spark` version 2.0 -- now it is strongly recommended that you use the `dataframe` `api` at all times.

in addition to being a more readable `api`, `dataframe` calculations are much more optimized and significantly faster than traditional `rdd` work. at a high level, they created a simpler `api` which handles almost all of the implementation details at the `rdd` level in a highly optimized way.

***so use `dataframe`s!***

In [None]:
data = [1, 2, 3, 4, 5]

# deal with some schema annoyance, more later
import pandas as pd
pddf = pd.DataFrame({'x': data})

df = spark.createDataFrame(pddf)
display(df)

In [None]:
import pyspark.sql.functions as F

df_sq = df.withColumn('x_sq', F.col('x') ** 2)
display(df_sq)

**<div align="center">PAUSE FOR ZOOM BREAK</div>**

## actually doing stuff

so enough of this wall of text -- let's do some `spark` stuff

### using `databricks`

it will be much easier to walk through the rest of the notebook if you have loaded this notebook into the `databricks` cluster we started at the beginning of this lecture, and if that cluster is up and running.

if you don't have that done, please go back to [this section](#databricks-community-edition-signup) and do that

`databricks` is

+ a company
+ the name of a `spark` cluster management tool
+ the name of a particular runtime of `spark` that is also called the "databricks environment"

we will use the `databricks` notebook which is running in the `databricks` community edition cluster we started at the start of the lecture

there are a few really important things to know about working in this `databricks` environment and using `databricks` notebooks

#### `dbfs`

`databricks` has created one normalized `api` over a ton of other cloud data stores (e.g. azure, aws, google, hadoop, ...). you can think of this as a high-level `api` so that you don't have to know the implementation details. this is in direct analogy to

| wrapper `api` | underlying technologies |
|-|-|
| `keras` | `tensorflow`, `theano`, `cntk` |
| `dataframes` | `rdd`s |
| `dbfs` | `s3`, `hdfs`, ... |

the only thing you need to know is that you have a new `schema` for your `uri`s and a new place you can read / save files (this is usually `s3` under the hood but not necessarily -- that's the point!)

```
dbfs:///path/to/your/special/parquet/files
```

#### magic commands

when in the `databricks` notebook, you have access to a handful of "magic" commands -- just like you do in any `jupyter` notebook.

you access these by putting a special "magic" prefix at the start of a cell. these prefixes are of the format `%XXX`

there are many different use cases for which `databricks` notebooks provide a magic command

+ code execution: `scala`, `python`, `sql`, `sh`, `r`

In [None]:
%scala
// this is a scala comment
println("hello world")

+ rendering notes in markdown: `md`

paste the following in the empty cell below

```
%md

# h1
## h2
### h3

| col1 | col2 |
|-|-|
| val1 | val2 |
| val3 | val4 |
```

+ running other notebooks: `run`
+ `dbfs` stuff: `fs` (e.g. `%fs ls dbfs:///my/special/path`)

In [None]:
%fs ls /

#### `dbutils`

`databricks` also provides a package of useful `python` code which only works inside their notebooks (you thought they were trying to help you avoid vendor lock-in didn't you, haha, jokes on you they're a company not your friend). this includes

+ `dbutils.fs`: a toolkit for doing underlying file store type agnostic file system querying

In [None]:
dbutils.fs.ls('/')

+ `dbutils.widgets`: widgets are a way of parameterizing notebooks

In [None]:
dbutils.widgets.dropdown('option', 'A', ['A', 'B', 'C'])

In [None]:
option = dbutils.widgets.get('option')
option

change the value in the widget and watch the cell auto-re-run!

+ `display` and `displayHTML` commands: special webby ways of looking at `dataframe`s

+ `dbutils.secrets`: a package for securely storing secrets (credentials) and then loading them securely into running notebooks (e.g. `dbutils.secrets.get(...)`)

### interactive `pyspark` sessions

`spark` is a `scala` program running on the `java` `jvm`, which means it is a compiled language at its heart. however, most development work with `spark` is done in interactive `repl` sessions.

`pyspark` is a `repl` for the `spark` api bindings in `python`, so if you want to code `spark` programs using `python`, this is your starting point.

just like with the `python` language, there are a few different ways you could execute `pyspark` commands:

+ in a terminal shell via the `pyspark` command
+ in a notebook via several options
    + `databricks` notebooks
    + `zeppelin`
    + amazon `emr` `notebooks`
    + extension kernels for `jupyter`

we will use our `databricks` CE notebooks.

when you are in a `spark` session, your main access point for `spark` features is a `python` object called `spark`:

In [None]:
spark

`spark` is a `SparkSession` object, and it has a number of features:

In [None]:
dir(spark)

### creating `dataframe`s

the first `spark` object method we will look at is `.createDataFrame()` -- this function takes four arguments, the two most important of which are:

+ `data`: an iterable of records
    + accepted formats include a list of lists
    + a `pandas` dataframe
    + an `rdd`
+ `schema`: the data types for the provided `data`
    + sometimes this is **inferred** and sometimes you must provide it manually, depending on the `data`
    + note that we are using very `sql`-esque terms to describe our `dataframes`

In [None]:
import numpy as np
import pandas as pd

np_array = np.random.randint(0,100,size=(100, 4))
pandas_input_df = pd.DataFrame(np_array,
                               columns=['A', 'B', 'C', 'D'])
list_of_lists = [[1, 'hello'],
                 [2, 'world']]

In [None]:
df_np = spark.createDataFrame(np_array.tolist())  # notice we must convert np --> list
df_pd = spark.createDataFrame(pandas_input_df)
df_lol = spark.createDataFrame(list_of_lists)

in all of the above instances, we provided `spark` with `data` but not with a `schema`. how did this work?

for the `pandas` object, `pyspark` knows how to build a `schema` using the `pd.DataFrame.dtype` values and the field column names (one could ask why the same is not try of `np.array` types but I digress).

for the list objects, `pyspark` has to check the base `python` types of the first element's record -- it has to consume some data to do this.

these are two types of `schema` **inference** for items in memory

let's look at the schemas created this way

In [None]:
df_np.printSchema()

In [None]:
df_pd.printSchema()

In [None]:
df_lol.printSchema()

clearly we are getting something extra from the `pandas` `DataFrame` here (column names and data types, if we it was possible there might be confusion (not here)).

there are a number of ways to add column names to our other dataframes, but the only one we can do at the time we *create* the dataframe is to *providing the schema*

In [None]:
import pyspark.sql.types as T

lol_schema = T.StructType([T.StructField('id', T.LongType()),
                           T.StructField('word', T.StringType()), ])

# lol_schema = 'id long, word string'

df_lol_w_schema = spark.createDataFrame(list_of_lists,
                                        schema=lol_schema)
df_lol_w_schema.printSchema()

it is often helpful to look at a few records of a `dataframe` to make sure things are going well -- you can do this in any `pyspark` session with the `.show()` method

In [None]:
df_pd.show()

in addition, `databricks` has a special `databricks`-notebook-only function `display()` (not a method, a global-level function) which is a fancier `html` view of the dataframe:

In [None]:
display(df_pd)

there are a handful of extra features supported by `display` (many more records pulled in, sort capabilities, graphing capabilities). read more [here](https://docs.databricks.com/notebooks/visualizations/index.html#display-function-2)

### `read`ing `dataframe`s

note that in the above we have created `dataframe` objects from data we are already holding in the driver's memory. that is *nice*, but it's not our goal -- our goal is to work with data that is so large it cannot fit in one machine's memory.

to do that in `spark`, we are generally **reading** data from distributed files into a `dataframe`

`spark` has built-in support as well as community provided "connectors" (plugins, like `python` packages) for a number of file formats, including

+ `csv`
+ `parquet`
+ `avro`
+ `json`
+ a `spark`-specific `table` in-memory data store
+ `jdbc` for database connections

in addition, you can read in any new-line-delimited `text` file as an array of strings. it may take some work to convert that list of strings into the data structure of your dreams, but it can be done!

`spark` also can read from a number of different file sources

+ `hdfs`
+ `s3`
+ windows blob storage
+ google file storage
+ `cassandra`
+ `kafka`
+ `hbase`
+ `hive`

the syntax for reading files is consistent across various file formats and sources:

1. start by getting the `spark` `DataFrameReader` object at `spark.read`
1. create modified versions of that object by adding `.option`s
1. use input format function (a method on the `DataFrameReader` like `.csv` or `.json`) to read in a `uri` of your file (often a directory containing many `csv`s or a `glob` expression)

generally:

```python
(spark.read
 # provide options if you want (not required!)
 .option(key1=value1)
 .option(key2=value2)
 .option(key3=value3)
 # a special set of optional methods:
 .schema(schema)
 .[file format](uri_of_input_file))
```

where `[file format]` is the function that corresponds to the input file format (e.g. `csv` or `json`)

*advanced note*: the `[file format]` methods are all aliases for a pair of operations: setting the `.format` value and calling the `.load` method. e.g.

```python
spark.read.csv(uri)
```

is the same as

```python
(spark.read
 .format('csv')
 .load(uri))
```

additionally, it is possible to combine the multiple `.option(key=value)` calls into one single dictionary-based call:

```python
(spark.read
 .option(key1=value1)
 .option(key2=value2)
 .option(key3=value3))
```

is equivalent to

```python
(spark.read
 .options({'key1': 'value1',
           'key2': 'value2',
           'key3': 'value3', })
```

#### `schema` inference

for a few data source formats (e.g. `parquet` or `rdbms`), the schema of the data can be easily determined from the data source. for the rest (e.g. `txt`, `csv`, `json`), `pyspark` must read the input data to make an educated guess about the type of the data provided -- this is called **`schema` inference**

the main thing to know about `schema` inference is that it requires opening a file. this means that just to define the inputs sometimes `pyspark` will need to execute a distributed `job`. there are some performance implications to this so if in production settings it is preferable to provide your own `schema`

#### `csv`

we can read `csv` files with

```python
spark.read.csv(uri)
```

common `option`s for our `csv` reading include

+ `sep` (`str`): the character that delimits values in our records (default is ',')
+ `header` (`bool`): whether or not we have a header row
+ `inferSchema` (`bool`): whether or not we should infer the schema

confusingly, `pyspark` often provides us with two ways to set the most important options:

1. in the `.csv(...)` method, or
1. as `.option(key=value)` methods "along the way"

for example, the following two calls are equivalent

```python
(spark.read
 .csv(uri, sep='\t', header=True, inferSchema=False))
```

and

```python
(spark.read
 .option('sep', '\t')
 .option('header', True)
 .option('inferSchema', False)
 .csv(uri))
```

neither method is preferred or more conventional, in my experience. pick one and try to stick with it!

our `databricks` CE clusters come pre-configured with access to a lot of datasets. we can easily read a `csv` among them with the following command

In [None]:
f_csv = '/databricks-datasets/flights/departuredelays.csv'

departure_delays = (spark.read
                    .csv(f_csv, header=True))

In [None]:
display(departure_delays)

note that if you ran the two cells above both resulted in the execution of a `spark` `job` (see the dropdown "(1) Spark Jobs" that appears after the cell. this is because both commands required some action -- `spark` needed to look at the underlying files in both cases.

the need for a `job` for the `display(departure_delays)` command is obvious -- it had to open the underlying `csv` file to get records to display to you.

why does the `spark.read.csv` line need to open the file?

the answer is that `spark.read.csv` needs to read the *first line* of the file to know how many columns are in it and what their names are (this is true for `header=True` but also any time you don't provide a schema)

if we had provided the `inferSchema=True` argument to `.csv`, we actually would have launched *two* jobs -- try it

In [None]:
departure_delays_infer = (spark.read
                          .csv(f_csv, header=True, inferSchema=True))

+ the first `job` is the number of columns (and maybe names) check
+ the second `job` is the schema inference -- `spark` had to open the file to figure out the contents of it, and this required reading many lines from disk.

`spark.read.csv(...)` has three ways it could be created, and it changes the number of jobs invoked as a result

+ provide no schema information: one job (to learn the number of columns)
+ request the schema be inferred: two jobs (one to learn the number of columns and one to read the first N records to pick an appropriate data type)
+ provide a schema: no jobs

#### `parquet`

`parquet` is a file format which is specifically optimized for `spark` settings. some properties:

+ it is a columnar block storage built for distribution and fast aggregation
+ because it is columnar, it has good compression on disk (files are smaller!) and it's much faster to read in (computations are faster!)
+ the schema of the data is saved in the file itself (no need to infer or provide your own)

we can read `parquet` files with

```python
spark.read.parquet(uri)
```

for example, in `databricks`,

In [None]:
f_parquet = '/databricks-datasets/amazon/data20K/'

amazon_review = spark.read.parquet(f_parquet)

In [None]:
display(amazon_review)

two things to note:

1. the `spark.read.parquet` command launches a `job` (it has to read the schema from the `parquet` file metadata -- this is fast, but not nothing!)
2. the `uri` here is a directory, not a single file
    1. for `parquet`, the `uri` is almost always a directory or a `glob` expression. `parquet` is block storage, so it is saved as a directory with many files in multiple parts

take a look at this back in `databricks`

In [None]:
%fs ls /databricks-datasets/amazon/data20K/

#### `json`

as we saw in our web scraping lecture, internet-based data is often in the `json` format. additionally, the increasingly popular paradigm of event-based architecture means more and more data is coming to us as `json` blobs.

in `databricks`,

In [None]:
df_json = spark.read.json('/databricks-datasets/iot/iot_devices.json')

display(df_json)

`json` records are hierarchical and so are the `dataframe`s created from them in `spark` -- **this is new!**. you probably haven't seen a *hierarchical* dataframe before.

let's create a hierarchical `json` file just to test it out. in `databricks`:

In [None]:
j = """{"dc_id": "dc-101","source": {"sensor-igauge": {"id": 10,"ip": "68.28.91.22","description": "Sensor attached to the container ceilings","temp":35,"c02_level": 1475,"geo": {"lat":38.00, "long":97.00}},"sensor-ipad": {"id": 13,"ip": "67.185.72.1","description": "Sensor ipad attached to carbon cylinders","temp": 34,"c02_level": 1370,"geo": {"lat":47.41, "long":-122.00}},"sensor-inest": {"id": 8,"ip": "208.109.163.218","description": "Sensor attached to the factory ceilings","temp": 40,"c02_level": 1346,"geo": {"lat":33.61, "long":-111.89}},"sensor-istick": {"id": 5,"ip": "204.116.105.67","description": "Sensor embedded in exhaust pipes in the ceilings","temp": 40,"c02_level": 1574,"geo": {"lat":35.93, "long":-85.46}}}}
{"dc_id": "dc-102","source": {"sensor-igauge": {"id": 10,"ip": "68.28.91.23","description": "Sensor attached to the container ceilings","temp":42,"c02_level": 1400,"geo": {"lat":38.01, "long":96.88}},"sensor-ipad": {"id": 14,"ip": "67.185.72.2","description": "Sensor ipad attached to carbon cylinders","temp": 44,"c02_level": 1360,"geo": {"lat":47.42, "long":-123.00}},"sensor-inest": {"id": 9,"ip": "208.109.163.219","description": "Sensor attached to the factory ceilings","temp": 41,"c02_level": 1347,"geo": {"lat":33.62, "long":-111.90}},"sensor-istick": {"id": 5,"ip": "204.116.105.68","description": "Sensor embedded in exhaust pipes in the ceilings","temp": 41,"c02_level": 1575,"geo": {"lat":35.94, "long":-85.47}}}}
{"dc_id": "dc-103","source": {"sensor-igauge": {"id": 10,"ip": "68.28.91.24","description": "Sensor attached to the container ceilings","temp":108,"c02_level": 1337,"geo": {"lat":38.20, "long":97.01}},"sensor-ipad": {"id": 15,"ip": "67.185.72.3","description": "Sensor ipad attached to carbon cylinders","temp": 47,"c02_level": 1350,"geo": {"lat":47.31, "long":-124.00}},"sensor-inest": {"id": 10,"ip": "208.109.163.220","description": "Sensor attached to the factory ceilings","temp": 42,"c02_level": 1348,"geo": {"lat":33.63, "long":-111.91}},"sensor-istick": {"id": 6,"ip": "204.116.105.69","description": "Sensor embedded in exhaust pipes in the ceilings","temp": 42,"c02_level": 1576,"geo": {"lat":35.95, "long":-85.48}}}}"""

with open('hierarchical.json', 'w') as fp:
    fp.write(j)

then load that file we created back into a `dataframe` using `spark.read.json`

In [None]:
df_hierarchical = spark.read.json('file:///databricks/driver/hierarchical.json')

In [None]:
display(df_hierarchical)

note that the elements in the `source` column have dropdown arrows -- you can drill down into the items in the `display` view

furthermore, when we are performing transformations like `select` (more on these later) we can reference the elements in this dataframe with dot notation, e.g.

In [None]:
display(df_hierarchical
        .select('source.sensor-igauge.geo.lat', 'source.sensor-igauge.geo.lat'))

like the `csv` and `parquet` cases before this, the basic `spark.read.json` function triggers a `spark` `job` to infer the schema. however, **unlike** those tasks, this `job` must read the **entire** `json` file. this is because `json` itself is schemaless - users can add any key at any time, and any record could or could not have any key

`pypsark` must read the entire set of `json` records to know the possible fields, and then build a `dataframe` which has every possible field, even if almost all values in that `dataframe` are `null`

the end result: reading a lot of `json` files is **extremely** expensive. you should definitely lean towards providing your own `schema` object on `read`. it may be a little extra work but is very much worth it

#### `text`

`csv` and `json` are special file formats that `spark` knows how to parse immediately; for all other `text` file formats there is a vanilla `spark.read.text` method.

this method simply splits a file on new line characters `\n` and returns a dataframe which contains the lines as string types in a dataframe with a `value` column

for example, in `databricks`

In [None]:
df_text = (spark.read
           .text('/databricks-datasets/sample_logs/'))

In [None]:
display(df_text)

note that the first cell here **didn't** cause a `job` to fire -- the `schema` is already known, no need to open the file

#### `rdbm` via `jdbc`

we are in `java`-world so our connections to databases will all be done via the `j`ava `d`ata`b`ase `c`onnector, aka the `jdbc`.

to connect to the database we need to go through a hopefully-now-familiar number of steps

1. tell `pyspark` what program to use to communicate to the database: load a `driver` for that database
1. configure the connection parameters
    1. this includes a normal database `uri` with
        1. schema
        1. host name
        1. database name
    1. this also includes a separate connection parameter which holds authentication information

there is one important difference now though: because we are working in a *distributed* setting, we also need to tell `spark` how to *distribute* the records in the table we are reading. this is done by picking some column we can partition (e.g. global numeric id), the maximum and minimum value in that column, and the number of different partitions we'd like. this is called "striding"

if we don't do these things, `spark` will read *every* record directly into a single partition -- not what we want to do at all!

because we don't have a database to connect to right now, I'll simply display the code you could use to access one:

```python
jdbc_uri = "jdbc:postgresql://xx.xx.xxx.xxx/dbname"

connProperties = {"user" : "username",
                  "password" : "password", }

exampleTwoDF = spark.read.jdbc(
  url=jdbc_uri,                 # the JDBC URL
  table=tableName,              # the name of the table
  column="id",                  # the name of a column of an integral type that will be used for partitioning.
  lowerBound=1,                 # the minimum value of columnName used to decide partition stride.
  upperBound=200000,            # the maximum value of columnName used to decide partition stride
  numPartitions=8,              # the number of partitions/connections
  properties=connProperties)    # the connection properties
```

#### a `spark` `table`

don't confuse the `spark.read.table` method with reading a table in a database - they're conceptually related, but `table` means something pretty specific in `spark`.

internally, `spark` is able to save `dataframes` we've already created into a shared data store called a `table`. there are two types of `tables`

1. permanent: the contents are written to `s3`, `hdfs`, or `dbfs` as `parquet` files and will be around for as long as the files remain there (i.e. for new clusters)
1. temporary: the contents are saved in memory (technically as `hive` metastores) and will be around as long as the `session` (local) or `cluster` (global) remains active

think of a `table` as creating a shareable shortcut name for all the steps you used to create your dataframe. you can give other users of your cluster the table name and they will be able to load it directly.

in the interest of time, we will cheat and create a `table` from one of our already-existing dataframes. to see other ways of creating `table`s, feel free to walk through the demos [here](https://docs.databricks.com/data/tables.html)

In [None]:
amazon_review.write.saveAsTable('amazon_review', mode='overwrite')

after running this, click on the "Data" menu in the left hand side nav bar. you should see a `default` database which contains a `amazon_review` table; click on that table line item to see the table contents (then press back in your browser to return to this notebook)

at this point any other user of this cluster could access that `table` by running

In [None]:
amazon_review = spark.read.table('amazon_review')

additionally, we can write `sql` code directly against that table now:

In [None]:
%sql
select *
from default.amazon_review
where rating = 5
limit 100

In [None]:
%sql
select rating, count(*) as ct
from default.amazon_review
group by rating
order by rating

#### summary on `schema` inference

this table has some info about how `schema`s are inferred for different `read` methods

| Type | Inference Type| Inference Speed | Reason | Should Supply Schema? |
|-|-|-|-|-|
| CSV | Full-Data-Read | Slow | File size | Yes |
| Parquet | Metadata-Read | Fast/Medium	Number of Partitions | might have to read many headers | No (most cases) |
| Tables | n/a | n/a | Predefined | n/a |
| JSON | Full-Read-Data	| Slow | File size | Yes |
| Text | Dictated | Zero | Only 1 Column | Never |
| JDBC | DB-Read | Fast | DB Schema | No |

**<div align="center">PAUSE FOR ZOOM BREAK</div>**

### doing computations

`spark`, like `tensorflow`, is a compute engine. it offers you a way to define computations and then it implements them.

#### transformations and actions

`spark` offers you a handful of operations that you can perform on `dataframe` objects. we break these things down into two types:

+ *transformations*: `dataframe` $\rightarrow$ new `dataframe`
    + this is like a `map`: take a chunk of rows in memory, apply something, create a new chunk of rows as a result
+ *actions*: return something back to the `driver` (aggregation, e.g.)
    + this is like a `shuffle and sort` then a `reduce`: repartition `dataframe` by group key, aggregate (sum, mean)

another way of thinking about these: transformations define *what data* we will do "something" to; *actions* define that something we do to that data. an *action* is the first time you actually need to open up a file to figure something out.

it is often said that transformations are **lazy** whereas actions are **eager**.

in `spark`, everything flows from a `dataframe` object. `transformations` take an input `dataframe` and produce a new transformed `dataframe`, and `aggregations` finally return information to the `driver`.

so given a dataframe `df`, `pyspark` code will have you chaining operations one after the other

```python
result = (df
          .transformation_1(args_1)
          .transformation_2(args_2)
          .transformation_3(args_3)
          .action(args_action))
```

some extremely common transformations and actions are

+ transformations:
    + `limit(..)`: limit the number of records to `N`
    + `select(..)`: select a subset of columns
    + `drop(..)`: drop columns
    + `distinct()`: limit to only distinct records
    + `dropDuplicates(..)`: an alias of `distinct`
+ actions:
    + `show(..)`: get the first `N` records and print an `ascii` table
    + `display(..)`: `databricks`-only html table version of `show`
    + `count()`: count the number of records in the dataframe
    + `groupby(...).agg(...)`: group by some key and aggregate across that group

the basic workflow is:

1. you `spark.read` in a base dataset
1. you perform as many transformations to that dataset as you need to get the data into the state that you want it
    1. you can think of this as building up a `sql` query piece by piece -- `select`, `groupby`, `distinct`, etc
1. when the dataframe is in the desired state and you need to calculate something, you execute an action

we can use the airport delay data we read in as `csv` before to demonstrate that

1. transformations don't cause jobs to be kicked off
1. actions do

In [None]:
import pyspark.sql.functions as F
import pyspark.sql.types as T

f_csv = '/databricks-datasets/flights/departuredelays.csv'

schema = T.StructType([T.StructField('date', T.StringType()),
                       T.StructField('delay', T.IntegerType()),
                       T.StructField('distance', T.LongType()),
                       T.StructField('origin', T.StringType()),
                       T.StructField('destination', T.StringType()), ])

departure_delays = (spark.read
                    .csv(f_csv, header=True, schema=schema)
                    # parse the date timestamp
                    .withColumn('ydate', F.concat(F.lit('2019'), F.col('date')))
                    .withColumn('date', F.to_timestamp('ydate', 'yyyyMMddHHss'))
                    .drop('ydate'))

the above should have caused *no* jobs to run -- only transformations!

a `display` or a `.count`, on the other hand:

In [None]:
display(departure_delays)

In [None]:
departure_delays.count()

let's build a second `dataframe` which is an aggregation of the first

In [None]:
avg_delay_by_origin = (departure_delays
                       .groupby('origin')
                       .agg(F.avg('delay').alias('avg_delay')))

again, no job until we perform an action

In [None]:
display(avg_delay_by_origin
        .sort(F.col('avg_delay').desc()))

##### wide vs. narrow transformations

above we have been suggesting that transformation operations were completely without cost - that's not entirely true.

there are two types of transformations that you could do: **wide** and **narrow**

+ a **narrow** transformation is one that can be done entirely within a single partition
    + e.g. `.filter`, `.drop`, `.coalesce`
    + e.g. an aggregation where the partition was done on the `.groupby` keys

<br><div align="center"><img width=200 src="https://files.training.databricks.com/images/105/transformations-narrow.png"></div>

+ a **wide** transformation is one that needs data from multiple partitions
    + e.g. `.groupby().sum()`, `.distinct`
    + a **wide** transformation triggers a **shuffle**
    + data is grouped into batches based on how it needs to be transformed (e.g. a `.groupby` key) and written to file
    + those files are sent to a different executor (per the driver's instructions)
    + on that other executor those files are then read back into memory
    + `spark` will break up jobs into **stages** based on these read/writes

<div align="center"><img width=200 src="https://files.training.databricks.com/images/105/transformations-wide.png"></div>

you can actually see this in the `groupby(...).agg(...).sort(...)` we ran above. run this cell again, but this time open the "(1) Spark jobs" dropdown and the "Jobs" dropdown within it to see the multiple Stages that exist on account of the aggregation

In [None]:
display(avg_delay_by_origin
        .sort(F.col('avg_delay').desc()))

#### `udf`s

not everything you will ever want to do is built into the `pyspark` library, so it would be nice to have ways to implement your tailored computations without leaving the distributed setting

`pypsark` handles this via `u`ser `d`efined `f`unctions, or [`udf`s](https://docs.databricks.com/spark/latest/spark-sql/udf-python.html)

a `udf` is a pure `python` function that you write and then "convert" into a distributable function

for example, suppose (wrongly!!) that we couldn't round a float to two decimal places in base `pyspark`, but we know we could do that with `python` like so

In [None]:
def my_round(x):
    return round(x, 2)

my_round(0.123456789)

`pyspark` allows us to convert that into a `spark`-world function with the `udf` decorator:

In [None]:
import pyspark.sql.types as T
from pyspark.sql.functions import udf

@udf(returnType=T.FloatType())
def my_round(x):
    return round(x, 2)

# note: base python function now only available as my_round.func`
my_round.func(0.123456789)

this is now a function we can pass to our various transformations:

In [None]:
display(avg_delay_by_origin
        .withColumn('rounded_avg_delay', my_round('avg_delay')))

in addition to one-to-one `udf`s like the one we just defined, `pyspark` [can leverage `pandas` to create some more efficient or complex `udf` types](https://docs.databricks.com/spark/latest/spark-sql/udf-python-pandas.html)

+ scalar `udf`s: a vectorized version of a scalar function (one record input, one value output)
    + useful for things `pandas` or `numpy` have built-in
+ grouped map `udf`s: take an input that is a groupby chunk and make an output that is similarly size (split apply combine)
    + useful for things like normalizing within group or calculating in-group percentiles
+ grouped aggregate `udf`s: take an input that is a groupby chunk and make an output that is a single aggregate number for that entire group
    + useful for group-level statistics like medians, averages, standard deviations, etc

now before you go all crazy taking all the `python` code you feel comfortable with and converting it all to `udf`s, know that you should **always** try to solve your problem with base `pyspark` first. `udf`s have a couple of significant costs:

+ can't be optimized - see the next section
+ function has to be serialized
    + might not be possible!
    + might be **REALLY** expensive
+ if your function carries state (e.g. depends on an external dataframe) you have to broadcast that information to every worker in your cluster for the function definition to make sense
+ now we have to start a `python` interpreter on every executor oh boy

#### the `catalyst` optimizer

we said above that even though the `dataframe` `api` is a simple abstraction on top of `rdd`s it is actually *faster* than working with `rdd`s -- that should raise some eyebrows.

how is that possible?

the answer is that `spark` version 2.x introduced the `catalyst` optimizer: a utility which optimizes the way that lazy transformations on `dataframe`s are converted into the underlying `rdd` operations.

think of `catalyst` as the `rdd` expert - rather than having you write the proper code using `rdd`s, it gives you a set of options to choose from (the `dataframe` `api`) and figures out the best way to do what you ask.

we've seen this exact relationship before with `keras` and `tensorflow`. in `tf.keras`, `google` developers have figured out the best way to implement high-level `keras` `api` code in lower-level `tensorflow` code. the way that your models are built is both simpler for you and better than if you had to put together the building blocks yourself

a schematic representation of how the `catalyst` optimizer works

<br><div align="center"><img width=800 src="https://files.training.databricks.com/images/105/catalyst-diagram.png"></div>

some follow-up articles on the creation of `catalyst`, for the interested:

+ [Deep Dive into Catalyst: Apache Spark 2.0's Optimizer](https://databricks.com/session/deep-dive-into-catalyst-apache-spark-2-0s-optimizer), Yin Huai's Spark Summit 2016 presentation.
+ [Catalyst: A Functional Query Optimizer for Spark and Shark](https://www.youtube.com/watch?v=6bCpISym_0w), Michael Armbrust's presentation at ScalaDays 2016.
+ [Deep Dive into Spark SQL's Catalyst Optimizer](https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html), Databricks Blog, April 13, 2015.
+ [Spark SQL: Relational Data Processing in Spark](http://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf), Michael Armbrust, Reynold S. Xin, Cheng Lian, Yin Huai, Davies Liu, Joseph K. Bradley, Xiangrui Meng, Tomer Kaftan, Michael J. Franklin, Ali Ghodsi, Matei Zaharia,<br/>_Proceedings of the 2015 ACM SIGMOD International Conference on Management of Data_.

#### other optimizations

there are a handful of other optimizations `spark` does when you are doing transformations

+ `spark` will (temporarily) remember shuffle reads and writes, so if you re-execute them it will re-use the existing shuffle files
+ you can always `.cache` the results of any transformation and then it will only be calculated once per session

#### what a computation looks like to `spark`: programming with `rdd`s [advanced]

the way we actually deploy programs in `spark` is similar to how we deployed `mapreduce` jobs in `hadoop streaming`: we write some code, send it to some local machine, that distributes the computation elsewhere

what changes in `spark` is that a master program (the "driver") creates `rdd`s by *parallelizing* a `hadoop` dataset (that is, it partitions a given dataset and pushes those partitions to nodes that perform local computations in memory).

an `rdd` is a structure that manages this partitionting / parallelizing.

from the point of view of the `spark` program, the order of operations is

+ build `rdd`s
    + access data from `hdfs` or local disk storage
    + parallelize that collection of data
    + transform it as necessary
    + cache everything we can
+ pass *closures* (stateless functions, ignorant of the rest of the world) to each element of the `rdd`
    + *closures* are then locally applied in-memory and the outputs are also cached
+ output `rdd`s are *acted on* (aggregated)
    + this is the only place we atually have an eval step.

one quick note on some common terms: *variables* and *closures*

+ *closures* do not rely in any way on external data
    + if they have variables within, they are copied to the nodes with them, but kept in local scope
+ external data, if needed, is passed through shared variables
    + *broadcast* variables: read only, distributed (e.g. lookup tables / stopword lists)
    + *accumulators*: meant to be associatively updated (e.g. counters)

**<div align="center">PAUSE FOR ZOOM BREAK</div>**

### `write`ing `dataframe`s

suppose you have created your dream `dataframe` by `spark.read`ing it in and performing the desired transformations and actions. now it's time to save the results -- you do that via `dataframe.write.[format]`. the `api`.

we have the same general structure as with `read` -- specify `.options` along the way, then the `.[format]` method will take the output path

for example, to save an item as `parquet`, you would run

```python
(dataframe.write
 .option(...)
 .option(...)
 # this part is also optional
 .mode('overwrite')
 .parquet(s3_path_to_parquet_output_dir))
```

let's try this out in `databricks`:

In [None]:
import pyspark.sql.functions as F

f_csv = '/databricks-datasets/flights/departuredelays.csv'

departure_delays = (spark.read
                    .csv(f_csv, header=True)
                    .withColumn('delay', F.col('delay').cast('integer')))

avg_delay_by_origin = (departure_delays
                       .groupby('origin')
                       .agg(F.avg('delay').alias('avg_delay')))

display(avg_delay_by_origin
        .sort(F.col('avg_delay').desc()))

In [None]:
(avg_delay_by_origin
 .write
 .mode('overwrite')
 .parquet('/my/data/avg-delays/'))

and now the next `spark` session we start in our cluster we can skip right to the end:

In [None]:
display(spark.read.parquet('/my/data/avg-delays/')
        .sort(F.col('avg_delay').desc()))

*note: we're not saving the `sort` because the distributed read of the `parquet` will necessarily break that sorting -- you'd have to re-`sort` on `read` even if you `sort`ed before `write`*

<strong><em><div align="center"><code>s = 'spark'; s.replace('a', 'o')</code></div></em></strong>
<div align="center"><img width=300 src="https://images-na.ssl-images-amazon.com/images/I/61u0oKyy3wL._SX466_.jpg"></div>

# END OF LECTURE