## Author: Bryan Cafferky Copyright 10/15/2021

#### Caution:  This code provided for demonstration "as is" with no implied warrantees.  Always test and vet any code before using.

# Coding pandas Function API

### In this notebook we will learn how to code the following Spark pandas API function types:

#### - Grouped Map
#### - Map
#### - Cogroup Map

## Notes:

#### - Leverages Apache Arrow.
#### - Does not use Python type hints.
#### - This API is experimental according to the Apache Spark documentation. 
#### - Each pandas.DataFrame size can be controlled by spark.sql.execution.arrow.maxRecordsPerBatch.

![Spark Logo](http://spark-mooc.github.io/web-assets/images/ta_Spark-logo-small.png)

More examples are available on the Spark website: http://spark.apache.org/examples.html

Documentation on pandas unction API at:
https://docs.microsoft.com/en-us/azure/databricks/spark/latest/spark-sql/pandas-function-apis

In [0]:
http://www.ltcconline.net/greenl/courses/201/probdist/zScore.htm%md ### Warning!!!

#### To run this code, you need to have uploaded the files and created the database tables - see Lesson 9 - Creating the SQL Tables on Databricks.  Link in video description to that video.

### Skips Code Cells 1 through 9 if you already have Apache Arrow and PyArrow enabled.

In [0]:
sc.version

In [0]:
# See if Arrow is enabled.
spark.conf.get("spark.sql.execution.arrow.enabled")

In [0]:
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

### Enabling for Conversion to/from Pandas

Arrow is available as an optimization when converting a Spark DataFrame to a Pandas DataFrame using the call toPandas() and when creating a Spark DataFrame from a Pandas DataFrame with createDataFrame(pandas_df). To use Arrow when executing these calls, users need to first set the Spark configuration spark.sql.execution.arrow.pyspark.enabled to true. This is disabled by default.

See https://spark.apache.org/docs/3.0.1/sql-pyspark-pandas-with-arrow.html#enabling-for-conversion-tofrom-pandas

In [0]:
# Enable Arrow-based columnar data transfers
spark.conf.get("spark.sql.execution.arrow.pyspark.enabled")

In [0]:
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

In [0]:
spark.conf.get("spark.sql.execution.arrow.pyspark.enabled")

In addition, optimizations enabled by spark.sql.execution.arrow.pyspark.enabled could fallback automatically to non-Arrow optimization implementation if an error occurs before the actual computation within Spark. This can be controlled by spark.sql.execution.arrow.pyspark.fallback.enabled.

In [0]:
spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled", "true")

Recommended Pandas and PyArrow Versions
For usage with pyspark.sql, the supported versions of Pandas is 0.24.2 and PyArrow is 0.15.1. Higher versions may be used, however, compatibility and data correctness can not be guaranteed and should be verified by the user.

See https://spark.apache.org/docs/3.0.0/sql-pyspark-pandas-with-arrow.html#recommended-pandas-and-pyarrow-versions

In [0]:
import pandas as pd

pd.show_versions()

In [0]:
import pyarrow

pyarrow.__version__

## Coding pandas API functions starts here!!!

### Create dataframe from a Spark SQL table

### Dataframe naming prefix convention:
##### 1st character is s for Spark DF
##### 2nd character is p for Python
##### 3rd and 4th character is df for dataframe
##### 5th = _ separator
##### rest is a meaningful name

##### spdf_salessummary = a Spark Python dataframe containing sales summary information.

In [0]:
spark.sql('use awproject')

spdf_sales = spark.sql('select CustomerKey, SalesAmount from factinternetsales limit 15000').dropna()

In [0]:
display(spdf_sales)

### Group Map

###### You transform your grouped data via groupBy().applyInPandas() to implement the “split-apply-combine” pattern. Split-apply-combine consists of three steps:

- Split the data into groups by using DataFrame.groupBy.
- Apply a function on each group. The input and output of the function are both pandas.DataFrame. 
- The input data contains all the rows and columns for each group.
- Combine the results into a new DataFrame.

In [0]:
df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v_minus_mean = v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double, v_minus_mean double").show()

#### See information at blog
Link https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.GroupedData.applyInPandas.html#pyspark-sql-groupeddata-applyinpandas

#### For information about calculating the z-score of a number see
https://developers.google.com/machine-learning/data-prep/transform/normalization

http://www.ltcconline.net/greenl/courses/201/probdist/zScore.htm


### To Get the Z-score:

z-score  = (value - values mean) / standard deviation of the values

In [0]:
import pandas as pd

def get_z_score(pdf):
    # pdf is a pandas.DataFrame
    SalesAmount = pdf.SalesAmount
    return pdf.assign(ZScoreSales = (SalesAmount - SalesAmount.mean()) / SalesAmount.std(), AvgSales =  SalesAmount.mean())

spdf_sales.groupby("CustomerKey").applyInPandas(get_z_score, schema="CustomerKey long, SalesAmount double, ZScoreSales double, AvgSales double").show()

### Map 

 #### - Note:  Seems to be primarily used to filter Spark dataframes.


- Uses DataFrame.mapInPandas() in order to transform an iterator of pandas.DataFrame to another iterator of pandas.DataFrame that represents the current PySpark DataFrame.
- Returns the result as a PySpark DataFrame.
- The underlying function takes and outputs an iterator of pandas.DataFrame. 
- It can return the output of arbitrary length in contrast to some pandas UDFs such as Series to Series pandas UDF.

In [0]:
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()

### Some experimentation with the Map API

In [0]:
b_multiplier = sc.broadcast(2)
b_multiplier.value

#### When you create a pandas UDF as I showed in a prior video, 
#### you can do initialization work before you start the iterator.  This seems to work here as well.

In [0]:
df = spark.createDataFrame([(1, 21), (1, 15), (2, 30)], ("id", "age"))

def filter_func(iterator):
  
    # Do some expensive initialization with a state  - This is not mentioned in the docs
    # so beware.  It may not be a good idea but wanted to see if it worked.  :-) 
    multiplier = b_multiplier.value
    
    for pdf in iterator:
        yield pdf[pdf.id == 1].assign(age_times_x = pdf.age[pdf.id == 1] * multiplier)

df.mapInPandas(filter_func, schema="id long, age long, age_times_x double").show()

In [0]:
def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.CustomerKey == 11000]

spdf_sales.mapInPandas(filter_func, schema=spdf_sales.schema).show()

In [0]:
def filter_func(iterator):
    for pdf in iterator:
        SalesAmount = pdf.SalesAmount
        pdf = pdf.assign(SalesAmount = (SalesAmount - SalesAmount.mean()) / SalesAmount.std())
        yield pdf[pdf.CustomerKey == 11000]

spdf_sales.mapInPandas(filter_func, schema=spdf_sales.schema).show()

### Cogrouped Map

#### Purpose:  Join dataframes on specified keys.

This function requires a full shuffle, i.e. this is an expensive operation.

All the data of a cogroup will be loaded into memory, so the user should be aware of the potential OOM risk if data is skewed and certain groups are too large to fit in memory.

If returning a new pandas.DataFrame constructed with a dictionary, it is recommended to explicitly index the columns by name to ensure the positions are correct, or alternatively use an OrderedDict. For example, pd.DataFrame({‘id’: ids, ‘a’: data}, columns=[‘id’, ‘a’]) or pd.DataFrame(OrderedDict([(‘id’, ids), (‘a’, data)])).

#### It consists of the following steps:
  
- Shuffle the data such that the groups of each DataFrame which share a key are cogrouped together.
- Apply a function to each cogroup. The input of the function is two pandas.DataFrame (with an optional tuple representing the key). The output of the function is a pandas.DataFrame.
- Combine the pandas.DataFrames from all groups into a new PySpark DataFrame.

In [0]:
import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()

### For more information on merge_asof() is a pandas function
See https://pandas.pydata.org/pandas-docs/version/0.25.0/reference/api/pandas.merge_asof.html

In [0]:
spark.sql('use awproject')
spdf_customer = spark.sql('select CustomerKey, BirthDate from DimCustomer').dropna()

In [0]:
import pandas as pd

def asof_join(l, r):
    return pd.merge_asof(l, r, on="CustomerKey")

spdf_customer.groupby("CustomerKey").cogroup(spdf_sales.groupby("CustomerKey")).applyInPandas(
    asof_join, schema="CustomerKey int, SalesAmount double, BirthDate string").show()