##Apache Spark 2.4 for Data Science and Machine Learning at Scale (Part 2)

#### Class Logistics and Operations
* Start, End
* Questions
* Breaks

#### Topics

  * Vectorized Python PandasUDF with Apache Arrow
  * PandasUDF for batch inference (Python/DL prediction example)
  * Productionizing Spark Models: Challenges and Options
    * Model Export: MLeap, PMML, PFA, ONNX
  * Present/Future Patterns for Spark and Distributed DL Training, GPU Analytics
  * Wrapup

### Integrating Python and Spark ... Performantly

In the past, mixing Python code (RDD/lambda/UDF) with Spark DataFrame code was a major performance anti-pattern.

But __Spark 2.3__ introduces a building block for major improvements in Spark + Python interoperability, especially exciting because it opens the door to elegant integrations between Spark and the Python scientific computing, data science, and machine learning stack, including such favorites as SciPy, NumPy, Pandas, TensorFlow, PyTorch, Numba, Scikit-Learn, and more.

This new capability takes the form of:
* Vectorized Pandas scalar UDFs
* Vectorized Pandas group-map UDFs (partial aggregations / flatmap groups to any number of rows)
  * *Full-on aggregations (UDAF) likely coming in Spark 2.4!*
* Can be registered for SQL/Scala/etc. access via regular `spark.udf.register` 
* Integration with Apache Arrow, a columnar in-memory format supporting zero-copy reads (https://arrow.apache.org/)

<img src="https://i.imgur.com/DQkDbUH.png" width=900>

__This sounds like a complex integration! The API must be crazy! ... Actually, it's really easy:__

In [6]:
from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.createDataFrame(
  [(9, 100, 0, 56), (17, 0, 150, 0), (25, 50, 75, 56)], #grams
  ("bacon", "eggs", "sausage", "spam"))

df.show()

In [7]:
# important business method, or machine learning model!

@pandas_udf("float", PandasUDFType.SCALAR)
def total_calories(bacon, eggs, sausage, spam):
  return 5.41*bacon + 1.96*eggs + 3.01*sausage + 2.8*spam

In [8]:
# use it like any other function of column(s):
df.withColumn("calories", total_calories(*df.columns)).show()

Let's grab some zipcode data...

In [10]:
%sh

wget https://materials.s3.amazonaws.com/2019/odsc/zips.json -O /tmp/zips.json

In [11]:
spark.read.json('file:///tmp/zips.json').show(truncate=False)

In [12]:
spark.read.json('file:///tmp/zips.json').withColumnRenamed('_id', 'zip').createOrReplaceTempView('zip')

In [13]:
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf("string", PandasUDFType.GROUPED_AGG)
# Input is Pandas Series, out is scalar
def merge_zips(zips):    
    return zips.str.cat(sep=' ')

df = spark.table("zip").filter("city IN ('BOSTON', 'CHICAGO', 'AUSTIN')").select("city", "state", "zip")

out = df.groupby("city", "state").agg(merge_zips(df['zip']))

display(out)

city,state,merge_zips(zip)
AUSTIN,NV,89310
AUSTIN,MN,55912
BOSTON,PA,15135
BOSTON,TX,75570
AUSTIN,IN,47102
AUSTIN,PA,16720
CHICAGO,IL,60601 60602 60603 60604 60605 60606 60607 60608 60609 60610 60611 60612 60613 60614 60615 60616 60617 60618 60619 60620 60621 60622 60623 60624 60625 60626 60628 60629 60630 60631 60632 60636 60637 60639 60640 60641 60644 60647 60648 60649 60651 60652 60653 60654 60657 60660 60661
BOSTON,VA,22713
AUSTIN,UT,84754
BOSTON,GA,31626


Hopefully that's not too bad.

Next up is the "group-map" functionality ... 
* we perform a groupBy in Spark
* then batches from each group are passed to a PandasUDF 
* they show up in Python as a Pandas DF
* from the Python function, we return a Pandas DF
  * with 0 or more rows and any schema we like

In [15]:
import pandas as pd

@pandas_udf("city STRING, state STRING, info FLOAT", PandasUDFType.GROUPED_MAP) 
# Input/output are both a pandas.DataFrame
def flatmap_demo(pdf):
    if len(pdf) > 1:
      return pd.DataFrame([
        [pdf['city'][0], pdf['state'][0], pdf['pop'].mean()],
        [pdf['city'][0], pdf['state'][0], pdf['pop'].sum()]
      ])
    else:
      return pd.DataFrame([[pdf['city'][0], pdf['state'][0], None]])

out = spark.table("zip").filter("city IN ('BOSTON', 'CHICAGO', 'AUSTIN')") \
  .select("city", "state", "pop").groupby("city", "state") \
  .apply(flatmap_demo)

display(out)

city,state,info
AUSTIN,NV,
AUSTIN,MN,
BOSTON,PA,
BOSTON,TX,
AUSTIN,IN,
AUSTIN,PA,
CHICAGO,IL,52173.98
CHICAGO,IL,2452177.0
BOSTON,VA,
AUSTIN,UT,


That's a bit of a silly example, but we could also be ...

* collecting characteristic statistics by group
* normalizing or anonymizing (you can return multiple rows per group)
* building models
* deskewing based on their within-group distribution

or anything else!

*Note that Arrow is not used unless you install pyarrow and turn it on via config*

Take a look at some more details and examples: https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html

#### Let's try something a little more practical

We're going to perform a Box-Cox deskewing transformation, which is a common operation that reshapes certain skews toward normality

http://www.itl.nist.gov/div898/handbook/eda/section3/boxcoxno.htm

There are two steps:
1. Determine the *lambda* exponent
2. Apply the transform formula to each record

Part 1 is a "full aggregation" function, meaning it depends on all of the records. 

That flavor of PandasUDF isn't supported yet, but in Spark 2.4 we will (hopefully) be able to do it.

For now, we'll take a sample of the data and use that to locally calculate *lambda*, then use a PandasUDF to apply the transformation to the whole dataset.

In [19]:
# Just look at one variable for now:

path = "/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv"
data = spark.read.option("header", True).option("inferSchema", True).csv(path).select("carat")

In [20]:
display(data)

carat
0.23
0.21
0.23
0.29
0.31
0.24
0.24
0.26
0.22
0.23


In [21]:
from scipy.stats import boxcox
import numpy as np

local_sample = data.sample(0.05).collect()

_, lams = boxcox(local_sample)

lam = lams[0]

print(lam)

Now that we have a value for *lambda* we can implement the deskewing transform:

In [23]:
@pandas_udf("double", PandasUDFType.SCALAR)
def deskew(value):
  if lam == 0: #note that lam is serialized and sent to executors automagically via PySpark closure semantics
    return np.log(value)
  else:
    return (value**lam - 1)/lam

How does it look?

In [25]:
adjusted = data.withColumn("deskewed", deskew("carat"))

display(adjusted)

carat,deskewed
0.23,-1.5610246035730464
0.21,-1.6639145720415414
0.23,-1.5610246035730464
0.29,-1.302266833161613
0.31,-1.2287182553052536
0.24,-1.5131497665663138
0.24,-1.5131497665663138
0.26,-1.423557852235391
0.22,-1.611204967903562
0.23,-1.5610246035730464
