# DO NOT RUN THIS NOTEBOOK LOCALLY
## This notebook is set up to run on Google dataproc

# wk8 Demo - Advanced Spark - Pipelines and Optimizations with DataFrames
__`MIDS w261: Machine Learning at Scale | UC Berkeley School of Information | Spring 2019`__

So far we've been using Spark's low level APIs. In particular, we've been using the RDD (Resilient Distiributed Datasets) API to implement Machine Learning algorithms from scratch. This week we're going to take a look at how Spark is used in a production setting. We'll look at DataFrames, SQL, and UDFs (User Defined Functions).  As discussed previously, we still need to understand the internals of Spark and MapReduce in general to write efficient and scalable code.

In class today we'll get some practice working with larger data sets in Spark. We'll start with an introduction to efficiently storing data and approach a large dataset for analysis. After that we'll discuss a ranking problem which was covered in Chapter 6 of the High Performance Spark book and how we can apply that to our problem. We'll follow up with a discussion on things that could be done to make this more effiicent.
* ... __describe__ differences between data serialization formats.
* ... __choose__ a data serialization format based on use case.
* ... __change__ submission arguements for a `SparkSession`.
* ... __set__ custom configuration for a `SparkSession`.
* ... __describe__ and __create__ a data pipeline for analysis.
* ... __use__ a user defined function (UDF).
* ... __understand__ feature engineering and aggregations in Spark.

__`Additional Resources:`__ Writing performant code in Spark requires a lot of thought. Holden's High Performance Spark book covers this topic very well. In addition, Spark - The Definitive Guide, by Bill Chambers and Matei Zaharia, provides some recent developments.

### Notebook Set-Up

This is running on dataproc with the following setup

```{bash}
BUCKET="w261-bucket"
CLUSTER="w261-demo"
PROJECT="w261-216504"
JUPYTER_PORT="8123"
PORT="10000"
ZONE=$(gcloud config get-value compute/zone)

# CREATE DATAPROC CLUSTER
gcloud dataproc clusters create ${CLUSTER} \
    --metadata "JUPYTER_PORT=${JUPYTER_PORT}" \
    --metadata "JUPYTER_CONDA_PACKAGES=numpy:pandas:scipy:pyarrow" \
    --metadata "JUPYTER_CONDA_CHANNELS=conda-forge" \
    --project ${PROJECT} \
    --bucket ${BUCKET} \
    --image-version "1.3.10-deb9" \
    --initialization-actions \
       gs://dataproc-initialization-actions/jupyter/jupyter.sh \
    --num-preemptible-workers=4 \
    --num-workers=2 \
    --worker-machine-type=n1-standard-8 \
    --master-machine-type=n1-standard-8
    
# CREATE SOCKS PROXY
gcloud compute ssh ${CLUSTER}-m \
    --project=${PROJECT} \
    --zone=${ZONE}  \
    --ssh-flag="-D" \
    --ssh-flag=${PORT} \
    --ssh-flag="-N"

```
--------

```
# USE SOCKS PROXY (if your local computer is linux)
/usr/bin/google-chrome \
  --proxy-server="socks5://localhost:${PORT}" \
  --user-data-dir=/tmp/${CLUSTER}-m   

# USE SOCKS PROXY (if your local computer is a mac)
/Applications/Google\ Chrome.app/Contents/MacOS/Google\ Chrome \
  --proxy-server="socks5://localhost:${PORT}" \
  --user-data-dir=/tmp/${CLUSTER}-m  

# MORE INFO ON SETTING UP SOCKS PROXIES IN GCP:
# https://cloud.google.com/solutions/connecting-securely#socks-proxy-over-ssh
```

In [None]:
# imports
import re
import os
import numpy as np
import pandas as pd

### Load the data
Today we'll be using GSOD weather station data, avaliable from Google in BigQuery.

Since this is a decent sized dataset (21 GB uncompressed) we won't be running code, but rather reviewing the process.

In [None]:
# Get data from BigQuery into Google Cloud Storage as GZIP compressed CSV files
!bq --location=US extract --compression GZIP 'bigquery-public-data:samples.gsod' gs://w261-bucket/gsod/gsod-*.csv.gz
!bq --location=US extract --compression GZIP 'bigquery-public-data:noaa_gsod.stations' gs://w261-bucket/gsod/stations.csv.gz

### Initialize Spark

In [None]:
spark

In [None]:
# Here we show how to do a custom configuration
sc = spark.sparkContext

In [None]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [None]:
sc.getConf().getAll()

# Exercise 1. Structured API - Datasets, DataFrames, and SQL Tables and Views

A Dataset is a distributed collection of data. Datasets provide the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.). The Dataset API is available in Scala and Java. Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName). The case for R is similar.

A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R. In Scala and Java, a DataFrame is represented by a Dataset of Rows. In the Scala API, DataFrame is simply a type alias of Dataset[Row]. While, in Java API, users need to use Dataset<Row> to represent a DataFrame.
    
This makes the analysis of data similar to how we would do analysis with Python's Pandas or R's dataframes. Spark DataFrames are heavily inspired by Pandas and we're actually able to create Pandas user-defined functions (UDFs) to use with Spark which leverage the Apache Arrow project to vectorized computation instead of row-by-row operations. This can lead to significant performance boosts for large datasets. 
    
SQL Tables and Views are basically the same thing as DataFrames. We simply just execute SQL against them instead of DataFrame code  *(Defintive Guide, pg. 50)*. You can choose to express some of your data manipulations in SQL and others in DataFrames and they will compile to the same underlying code. *(Defintive Guide, pg. 179)*

 > __DISCUSSION QUESTIONS:__ 
 * _Why would we want to use RDDs in this class over DataFrames?_
 * _What is a UDF? Why do we need to create them?_
 * _What is vectorized computation and how does that differ from row-by-row function calls_
 * _How is a Dataset different than a DataFrame?_
 * _Are Datasets avaliable in the Python API?_

# Exercise 2. Data Serialization Formats. 
This week you read [Format Wars](http://www.svds.com/dataformats/) which covered the characteristics, structure, and differences between raw text, sequence, Avro, Parquet, and ORC data serializations. 

There were several points discussed: 

* Human Readable
* Row vs Column Oriented
* Read vs Write performance
* Appendable
* Splittable
* Metadata storage

*For additional information see Definitive Guide, Chapter 9: Data Sources, pg.153*

## First let's understand our data

In [None]:
!mkdir data
!wget -q -O data/gsod-000000000000.csv.gz gs://w261-bucket/gsod/gsod-000000000000.csv.gz | head

Here we see that we have several compressed CSV files as we expect based on our bq command specifying compressions. BigQuery was nice enough to split the files into 30 MB chunks so that our analysis will be partitioned nicely for ingestion.

Now let's try to ingest these CSV's without any special commands or unzipped.

In [None]:
data_csv = spark.read.option("header", "true").csv("gs://w261-bucket/gsod/gsod-*.csv.gz")

In [None]:
data_csv.head()

In [None]:
data_csv.printSchema()

In [None]:
%%time
print((data_csv.count(), len(data_csv.columns)))

Wow that's nice we didn't even have to handle the decompression and it saves a ton on disk space! Next we're going to save this in a few different serializations so that we can see the effect on disk space.

Also notice that since we have 114 million observations and 31 columns we should see some huge performance boosts for compression in general and particularly columnar compression with parquet since it takes into account the data type to improve compression further. While row based compression will be less.

_Which Data Serialization do you think will do best?_

### How do these look?

We have 4 data types below

- Compressed CSV
- Parquet
- Avro
- CSV

Of these 3 are row oriented and 1 is column oriented. We have over 100M rows and 31 columns. Columnar compression should do fairly well in this scenerio. 

In [None]:
# Our original Compressed data already exists
!gsutil du -sh gs://w261-bucket/gsod/gsod*.csv.gz

In [None]:
data_csv.write.format("parquet").save("gs://w261-bucket/gsod/data.parquet")
!gsutil du -sh gs://w261-bucket/gsod/data.parquet

In [None]:
data_csv.write.format("com.databricks.spark.avro").save("gs://w261-bucket/gsod/data.avro")
!gsutil du -sh gs://w261-bucket/gsod/data.avro

In [None]:
data_csv.write.format("com.databricks.spark.csv").save('gs://w261-bucket/gsod/data.csv')
!gsutil du -sh gs://w261-bucket/gsod/data.csv

### How do these compare for simple computations?

First we need to read in the data again to ensure we're working with non-cached versions

In [None]:
data_parquet = spark.read.parquet("gs://w261-bucket/gsod/data.parquet")

### Count: 
Parquet keeps metadata about the data in order to compute some calculations extremely quickly such as row counts

In [None]:
%%time
data_parquet.count()

In [None]:
%%time
data_csv.count()

### Average of a column: 
Parquet is column oriented so it can go through the sequence of data in one step instead of taking each row. This should have much higher performance

In [None]:
from pyspark.sql import functions as F

In [None]:
%%time
data_parquet.agg(F.avg(data_parquet.max_temperature)).collect()

In [None]:
%%time
data_csv.agg(F.avg(data.max_temperature)).collect()

 > __DISCUSSION QUESTIONS:__ For each key term from the reading, briefly explain what it means in the context of this demo code. Specifically:
 * _What is the compression ratio for the parquet to csv file?_
 * _Which serialization would query a column faster?_
 * _Which types of columns do you think has the best compression for parquet?_
 * _When should you use flat files vs other data formats?_
 * _If we want to do analysis with lots of aggregations what serialization should we use?_
 * _Is there any downside to Parquet?_
 * _If you had to partition data into days as new data comes in with aggregations happening at end of day how would you operationalize this?_

# Exercise 3. Working with DataFrames and simple User-Defined Functions (UDFs)

In this example we're going to do some simple analysis of our data using built in spark functions. We'll look into UDFs and use a few instances of them to process our data

In [None]:
# Using built-in Spark functions are always more efficient
from pyspark.sql import types
import pyspark.sql.functions as F

timed = data_parquet.withColumn("time", F.concat(F.col("year"), F.lit("-"), F.col("month"), F.lit("-"), F.col("day")) \
                                .cast(types.TimestampType()))

In [None]:
%%time
timed.select('time').show(5)

In [None]:
%%time
timed.select('time').take(1)

In [None]:
# A simple UDF for converting year, month, day to timestamps
def create_date_from_parts(year, month, day):
    return f'{year}-{month}-{day}'

create_date_udf = F.udf(create_date_from_parts, types.StringType())
timed_udf = data_parquet.withColumn("date", create_date_udf('year', 'month', 'day').cast(types.TimestampType()))

In [None]:
%%time
timed_udf.take(1)

There's many things we could do from here but there are some important performance considerations when using UDFs. 


> User-defined functions and user-defined aggregate functions provide you with ways to extend the DataFrame and SQL APIs with your own custom code while keeping the Catalyst optimizer. The Dataset API (see “Datasets” on page 62) is another performant option for much of what you can do with UDFs and UDAFs. This is quite useful for performance, since otherwise you would need to convert the data to an RDD (and potentially back again) to perform arbitrary functions, which is quite expensive. (HP Spark pg 66) 


UDFs are typically much slower than built-in Spark functionality. The reason for this is becauase they have to serialize and deserialize the data for every row that the function is applied to. There have been recent improvements to UDF for some analytical results with Pandas UDFs that return scalars or groupby maps. Some more information about why UDFs are inefficent can be found here https://blog.cloudera.com/blog/2017/02/working-with-udfs-in-apache-spark/

Pandas UDFs solve the serialization issue by vectorizing the inputs and outputs, decreasing the serialziation from 3-100x; however, it isn't a golden bullet. See this blog for details http://garrens.com/blog/2018/03/04/using-new-pyspark-2-3-vectorized-pandas-udfs-lessons/

See also: http://sparklingpandas.com/

In [None]:
from IPython.display import Image
from IPython.core.display import HTML 
Image(url= "https://databricks.com/wp-content/uploads/2017/10/image1-4.png")

>__DISCUSSION QUESTION:__ 
* What is the task here? What did we really accomplish?
* What type does the UDF create_date_from_parts return?
* What information is being stored in the data frame? Is  there anything inefficient about this data structure? 
* What types of situations would lead to an inefficeint data structure in DataFrames? Could we be more efficient using an RDD in those situations?
* What questions would you ask of this table?

#  Exercise 4. EDA

In this exercise we'll do some basic EDA/Sanity checks of our DataFrame, and start preparing it for our analysis in exercise 5.

In [None]:
timed.printSchema()

In [None]:
stations = spark.read.option("header", "true").csv("gs://w261-bucket/gsod/stations.csv.gz")

In [None]:
stations.show(5)

In [None]:
# Let's filter for just the US since this is a US based dataset
stations_us = stations.filter(F.col('Country')=='US')

'''
# Equivalently, we could write:
stations_us = stations.where('Country'=='US')
'''

There are two methods to perform this operation: you can use `where` or `filter` (as above) and they will both perform the same operation. (pg 74 - Spark, The Definitive Guide). Remember that the DataFrame API and Spark SQL compile to the same execution plan.

Take a look at the explain plan for pushdown predicates: (`PushedFilters: [IsNotNull(station_number)]`). (More info on pg 169, 325 - Spark, The Definitive Guide)  

In [None]:
# We need to bring that back to our timed dataframe
timed_stations = timed.join(F.broadcast(stations_us), stations_us.usaf==timed.station_number, 'inner')

In [None]:
timed_stations.explain()

Spark will automatically broadcast a small table, it's usually best to let Spark decide. See explain plan below. Notice that Spark broadcast the join for us even though we took that function out in our code. (p.151 Defintive Guide)

In [None]:
timed_stations_NB = timed.join(stations_us, stations_us.usaf==timed.station_number, 'inner')

In [None]:
timed_stations_NB.explain()

In [None]:
# Let's only keep what we care about so we minimize our pain
keep_columns = ['station_number', 'mean_temp', 'time', 'lat', 'lon']
temp = timed_stations.select(*keep_columns)

In [None]:
# Let's recast types
temp = temp.withColumn("mean_temp", temp["mean_temp"].cast(types.DoubleType()))
temp = temp.withColumn("lat", temp["lat"].cast(types.DoubleType()))
temp = temp.withColumn("lon", temp["lon"].cast(types.DoubleType()))
temp = temp.withColumn("station_number", temp["station_number"].cast(types.IntegerType()))

In [None]:
%%time
# How is our dataframe looking? We did filter a bunch of data
temp.describe().show()
# You could also use 'summary()' to extract individual numbers for future use.

In [None]:
# Let's look at some of the data in histograms
# def plot_hist(hist_list):
#     pd.DataFrame(
#         list(zip(*hist_list)), 
#         columns=['bin', 'frequency']
#     ).set_index(
#         'bin'
#     ).plot(kind='bar');

In [None]:
def plot_hist(labels,values):
    df = pd.DataFrame({'lab':labels, 'val':values})
    df.plot.bar(x='lab', y='val', rot=0)

In [None]:
%%time
temp.cache()

In [None]:
# We will use filter instead of rdd, and take advantage of predicate pushdown
import math
def makeHistogram(_min,_max,numBuckets,colName):
    _range = list(range(math.floor(_min), math.ceil(_max), round((abs(_min)+abs(_max))/numBuckets)))
    _counts = np.zeros(len(_range))
    for idx, val in enumerate(_range):
        if idx < len(_range)-1:
            _counts[idx] = temp.filter(F.col(colName) >= _range[idx]) \
                               .filter(F.col(colName) <= _range[idx+1]) \
                               .count()
    plot_hist(_range,_counts)

In [None]:
%%time
makeHistogram(-69.0,110.0,11,'mean_temp')

In [None]:
%%time
makeHistogram(-179.63,179.583,11,'lon')

In [None]:
%%time
makeHistogram(-60.483,80.13,11,'lat')

### Using the RDD method (below) took almost an hour to run for a single plot. Even after caching the temp DF. Bummer.

In [None]:
# %%time
# temp_hist = temp.select('mean_temp').rdd.flatMap(lambda x: x).histogram(11)
# plot_hist(temp_hist)

<img src="../screenshots/lost-proxy.png">

In [None]:
# %%time
# temp_hist = temp.select('lon').rdd.flatMap(lambda x: x).histogram(11)

# # Loading the Computed Histogram into a Pandas Dataframe for plotting
# plot_hist(temp_hist)

In [None]:
# %%time
# temp_hist = temp.select('lat').rdd.flatMap(lambda x: x).histogram(11)

# # Loading the Computed Histogram into a Pandas Dataframe for plotting
# plot_hist(temp_hist)

# Exercise 5 - Analysis

In this section we're going to perform some aggregations to answer the question of "Is there global warming?" Don't take it too seriously, it's just an exercise! We're going to prepare our data so that we can draw an interactive graph displaying the change in avarage temperatures over time, as a function of latitude.



In [None]:
# let's use a struct to build a composite key
temp = temp.withColumn('time-lat', F.struct('time','lat'))
daily_average_at_latitude = temp.select('time-lat','mean_temp').groupBy("time-lat").agg(F.avg('mean_temp'))

If you’re used to RDDs you might be concerned by groupBy, but it is now a safe operation on thanks to the Spark SQL DataFrames optimizer, which automatically pipelines our reductions, avoiding giant shuffles and mega records. (HP Spark, pg. 43)

In [None]:
daily_average_at_latitude.printSchema()

In [None]:
# this function is here to make it easier to reason with the column names, flattens structs
def flatten_df(nested_df):
    flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
    nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']

    flat_df = nested_df.select(flat_cols +
                               [F.col(nc+'.'+c).alias(nc+'_'+c)
                                for nc in nested_cols
                                for c in nested_df.select(nc+'.*').columns])
    return flat_df

In [None]:
daily_average_at_latitude = flatten_df(daily_average_at_latitude)

In [None]:
daily_average_at_latitude.printSchema()

In [None]:
# Now let's get the average on each latitude
daily_average_at_latitude = daily_average_at_latitude.withColumn('time-rounded-lat', F.struct('time-lat_time',F.round(daily_average_at_latitude['time-lat_lat'],0)))
average_by_lat = daily_average_at_latitude.groupby('time-rounded-lat').agg(F.avg('avg(mean_temp)'))

In [None]:
# we used the struct in order to do a simple groupby that we can flatten again to get information
average_by_lat.printSchema()

In [None]:
# the names weren't very descriptive, let's rework that
average_by_lat = flatten_df(average_by_lat)
average_by_lat = average_by_lat.withColumnRenamed('time-rounded-lat_col2', 'rounded-lat')
average_by_lat = average_by_lat.withColumnRenamed('time-rounded-lat_time-lat_time', 'time')
average_by_lat = average_by_lat.withColumnRenamed('avg(avg(mean_temp))', 'temp')
average_by_lat.printSchema()

In [None]:
# oh man that's a lot of stuff and since we can't cache the data this is taking forever to run.
average_by_lat.explain()

In [None]:
# Let's output to file and then read from that file to reduce our load.
average_by_lat.write.format("parquet").save("gs://w261-bucket/gsod/average_by_lat.parquet")
average_by_lat_read = spark.read.parquet("gs://w261-bucket/gsod/average_by_lat.parquet")

In [None]:
# I didn't time this, but it took about 17 minutes to save this to parquet

In [None]:
average_by_lat_read.explain()

In [None]:
average_by_lat_read.show(10)

In [None]:
# Small enough to fit in pandas for our final analysis. Let's do that
average_by_lat_read.count()

In [None]:
df = average_by_lat_read.toPandas()

In [None]:
df.head()

In [None]:
df = df.set_index(['rounded-lat','time'])

In [None]:
df = df.sort_index()

In [None]:
df.head()

In [None]:
lat_list = df.index.levels[0]
print(lat_list)

In [None]:
# Is temperature increasing? Data isn't very clean and we didn't perform any sensor corrections.
%matplotlib notebook
from ipywidgets import *
import numpy as np
import matplotlib.pyplot as plt

n_roll = 365
def f(x):
    df.loc[x].rolling(n_roll).mean().plot()

interact(f, x=lat_list);

## Below is just a screenshot of the chart. To see the ineractive chart, follow the instructions below

<img src="../screenshots/plot-screenshot.png">

> __DISCUSSION QUESTIONS:__
* Why did we create a struct for our groupBy?
* Why did we push our transformations to a file and load them again? - compare and contrast cache(), save to disk (checkpoint), saveManagedTable
* Where could we have done this before to save computation time?
* Why did we do a rolling average of temperature?
* Isn't pandas a lot easier to use?

In [None]:
# lets save this df for later plotting, just in case. We're still in the cloud here.
df.to_csv("pandas.csv")

In [None]:
# now download the file to local machine
!gcloud compute scp w261-demo-m:/pandas.csv ~/MyDocuments/UCB/w261/Instructors/LiveSessionMaterials/wk08Demo_DataFrames/data

# SKIP THIS unless you want to render the chart while working locally:
IMPORTANT: We are now on the local machine. Jupyter lab does not support matplotlib notebook. To view the chart, open in jupyter notebook.

In [None]:
import pandas as pd

In [None]:
df = pd.read_csv("pandas.csv")

In [None]:
df.head()

In [None]:
df = df.set_index(['rounded-lat','time'])

In [None]:
df = df.sort_index()

In [None]:
lat_list = df.index.levels[0]
print(lat_list)

In [None]:
# Is temperature increasing? Data isn't very clean and we didn't perform any sensor corrections.
%matplotlib notebook
#%matplotlib inline
from ipywidgets import *
import numpy as np
import matplotlib.pyplot as plt

n_roll = 365
def f(x):
    df.loc[x].rolling(n_roll).mean().plot()

interact(f, x=lat_list);