# pagaya-map_in_pandas

Easy python wrapper for Spark mapInPandas, applyInPandas

[this notebook on github](https://github.com/pagaya/conf-talks/blob/master/map_in_pandas/map_in_pandas.ipynb)

## Goal
Easily run legacy pandas-based python functions at scale on large amounts of data.

## applications
* Running legacy transformations, feature extraction etc.
* large scale model-evaluation
* large scale experiments and parameter tuning.
* A-B testing
* Concurrent training (e.g. xgboost)

In [84]:
import os
import pandas as pd

DATA_PATH = "/tmp/mappandas/"

# Demo - map_in_pandas
## load the data

In [85]:
applications = spark.read.format("json").load(DATA_PATH)  # see section 'create test data at the end'
applications.show()

+------+--------------+---------+---------+---------+---------+---------+------+
|amount|application.id|feature_a|feature_b|feature_c|feature_d|feature_e|  kind|
+------+--------------+---------+---------+---------+---------+---------+------+
| 49000|APP_0000002000|   0.5352|   0.5864|   0.2742|   0.2555|   0.3692|credit|
| 34000|APP_0000002001|   0.9959|    0.861|   0.5087|   0.4358|   0.9352| unsec|
| 16000|APP_0000002002|   0.3451|   0.0732|   0.4199|   0.0787|   0.8564| unsec|
| 54000|APP_0000002003|   0.1399|    0.625|   0.1242|   0.0578|    0.435|   car|
| 62000|APP_0000002004|   0.5025|   0.0957|   0.0221|   0.8953|   0.9904| unsec|
| 50000|APP_0000002005|   0.3071|   0.9404|   0.9804|   0.3654|   0.7857| unsec|
| 14000|APP_0000002006|   0.2114|   0.7633|   0.7932|   0.9443|   0.4359| unsec|
| 42000|APP_0000002007|    0.952|   0.2578|   0.4235|   0.8583|   0.0501|credit|
| 37000|APP_0000002008|   0.9496|   0.2498|   0.8821|   0.3216|   0.2862| unsec|
| 15000|APP_0000002009|   0.

# legacy code
I have many legacy functions that work pandas->pandas

In [87]:
def legacy_preprocessing(pd_df):
    pd_df['application.id'] = pd_df['application.id'] + "/M"
    pd_df['ab'] = pd_df.feature_a + pd_df.feature_b
    pd_df['cd'] = pd_df.feature_c * pd_df.feature_d
    # remove the 'features_X' columns
    return pd_df.drop(columns=[col for col in pd_df.columns if col.startswith("feature")])


# How I want things to work
## (pagaya-map_in_pandas)

In [88]:
from map_in_pandas import mappandas
results = mappandas.map_in_pandas(spark, applications, legacy_preprocessing)
results.show()

[Stage 126:>                                                        (0 + 1) / 1]

+------+----------------+------+------------------+--------------------+
|amount|  application.id|  kind|                ab|                  cd|
+------+----------------+------+------------------+--------------------+
| 49000|APP_0000002000/M|credit|            1.1216|           0.0700581|
| 34000|APP_0000002001/M| unsec|            1.8569| 0.22169146000000003|
| 16000|APP_0000002002/M| unsec|            0.4183|          0.03304613|
| 54000|APP_0000002003/M|   car|            0.7649|          0.00717876|
| 62000|APP_0000002004/M| unsec|            0.5982|0.019786130000000002|
| 50000|APP_0000002005/M| unsec|            1.2475|          0.35823816|
| 14000|APP_0000002006/M| unsec|            0.9747|          0.74901876|
| 42000|APP_0000002007/M|credit|            1.2098| 0.36349004999999995|
| 37000|APP_0000002008/M| unsec|            1.1994|          0.28368336|
| 15000|APP_0000002009/M| unsec|0.9490000000000001| 0.07161192000000001|
| 15000|APP_0000002010/M|credit|            0.5108|

                                                                                

# The reality of working with spark.mapInPandas
## schema
Spark mapInPandas expects me to define a schema. 

However - I did not write the function - I don't know which fields it returns.

I need to run the function on a small chunk of data

In [89]:
small_results = legacy_preprocessing(applications.limit(100).toPandas())
small_results

Unnamed: 0,amount,application.id,kind,ab,cd
0,49000,APP_0000002000/M,credit,1.1216,0.070058
1,34000,APP_0000002001/M,unsec,1.8569,0.221691
2,16000,APP_0000002002/M,unsec,0.4183,0.033046
3,54000,APP_0000002003/M,car,0.7649,0.007179
4,62000,APP_0000002004/M,unsec,0.5982,0.019786
...,...,...,...,...,...
95,19000,APP_0000002095/M,unsec,1.3200,0.184994
96,8000,APP_0000002096/M,credit,0.7184,0.074676
97,24000,APP_0000002097/M,car,1.8845,0.284798
98,19000,APP_0000002098/M,unsec,1.4615,0.168691


In [90]:
# I can now know the schems:
# schema must have `` around field names with special characters
RESULT_SCHEMA = "`application.id` string, kind string, amount int, ab float, cd float"

# Spark API akwardness

mapInPandas accepts a function that get several pd.Dataframes (an iterator) 
and returns several pd.Dataframe

So I need to wrap the legacy function in a for-loop generator

## other weird bugs
There are some bugs with fields that have '.' in them - even if surrounded by `

In [91]:
def legacy_wrapper(pd_df_iter):
    # mapInPandas accepts a function that get several pd.Dataframes (an iterator) and returns several pd.Dataframe
    for pd_df in pd_df_iter:
        yield legacy_preprocessing(pd_df)
results = applications.mapInPandas(legacy_preprocessing, RESULT_SCHEMA)

AnalysisException: Cannot resolve column name "application.id" among (amount, application.id, feature_a, feature_b, feature_c, feature_d, feature_e, kind); did you mean to quote the `application.id` column?

# Let's work-around the bugs 
Fix the data so there is no special character in column names
## fix the legacy function
This is the worse - I need to get into a function I don't know and try to fix its name handling

## And... we made it
We manged to scale the code.

For one function...

In [93]:
applications_FIXED = applications.withColumnRenamed('application.id', 'application_id')  ### Fix the data

def legacy_preprocessing_FIXED(pd_df):
    pd_df['application_id'] = pd_df['application_id'] + "/M"  ### had to fix this line
    pd_df['ab'] = pd_df.feature_a + pd_df.feature_b
    pd_df['cd'] = pd_df.feature_c * pd_df.feature_d
    # remove the 'features_X' columns
    return pd_df.drop(columns=[col for col in pd_df.columns if col.startswith("feature")])


RESULT_SCHEMA_FIXED = "application_id string, kind string, amount int, ab float, cd float"

def legacy_wrapper_FIXED(pd_df_iter):
    for pd_df in pd_df_iter:
        yield legacy_preprocessing_FIXED(pd_df)

results = applications_FIXED.mapInPandas(legacy_wrapper_FIXED, RESULT_SCHEMA_FIXED)
results.show()


[Stage 129:>                                                        (0 + 1) / 1]

+----------------+------+------+------+----------+
|  application_id|  kind|amount|    ab|        cd|
+----------------+------+------+------+----------+
|APP_0000002000/M|credit| 49000|1.1216| 0.0700581|
|APP_0000002001/M| unsec| 34000|1.8569|0.22169146|
|APP_0000002002/M| unsec| 16000|0.4183|0.03304613|
|APP_0000002003/M|   car| 54000|0.7649|0.00717876|
|APP_0000002004/M| unsec| 62000|0.5982|0.01978613|
|APP_0000002005/M| unsec| 50000|1.2475|0.35823816|
|APP_0000002006/M| unsec| 14000|0.9747| 0.7490188|
|APP_0000002007/M|credit| 42000|1.2098|0.36349005|
|APP_0000002008/M| unsec| 37000|1.1994|0.28368336|
|APP_0000002009/M| unsec| 15000| 0.949|0.07161192|
|APP_0000002010/M|credit| 15000|0.5108|0.27434877|
|APP_0000002011/M|credit| 22000|0.3397|0.12402877|
|APP_0000002012/M| unsec| 59000|0.5595| 0.0820625|
|APP_0000002013/M|   car| 38000|1.0024| 0.1531816|
|APP_0000002014/M| unsec| 52000|1.2825|0.22202048|
|APP_0000002015/M| unsec| 44000|0.7247| 0.2702345|
|APP_0000002016/M| unsec|  6000

                                                                                

# working with spark.applyInPandas

All the above issues and bugs in Spark also occure in applyInPandas.
Our map_in_pandas API also wraps mapInPandas by allowing to pass a

`group_by=[col1, col2]` parameter

In [94]:
from map_in_pandas import mappandas
def legacy_grouping(pd_df):
    return pd.DataFrame({"kind": [pd_df["kind"].iloc[0]], "count": [len(pd_df)]})

results = mappandas.map_in_pandas(spark, applications, legacy_grouping, group_by=['kind'])
results.show()

[Stage 135:>                                                        (0 + 1) / 1]

+------+-----+
|  kind|count|
+------+-----+
|   car| 3335|
|credit| 3274|
| unsec| 3391|
+------+-----+



                                                                                

# Debugging the Python function
In Spark the function passed to map_in_pandas (e.g. `legacy_preprocessing`) is run distributly on the executors. Debugging on the executors is hard. To see print/logs or exception - you need to collect logs after the run. Yuo cannot put break points.

## `debug_local_row_count=`
To Allow running the function locally on the driver - you can temporarily add the parameter `debug_local_row_count=1000` to run the function on a local sample

In [95]:
def legacy_preprocessing(pd_df):
    pd_df['application.id'] = pd_df['application.id'] + "/M"
    pd_df['ab'] = pd_df.feature_a + pd_df.feature_b
    pd_df['cd'] = pd_df.feature_c * pd_df.feature_d
    if pd_df['ab'].sum() > 0.8 * len(pd_df):
        print("DEBUG - too large ab features")
    # remove the 'features_X' columns
    return pd_df.drop(columns=[col for col in pd_df.columns if col.startswith("feature")])

results = mappandas.map_in_pandas(spark, applications, legacy_preprocessing, debug_local_row_count=1000)
results.count()

DEBUG - too large ab features


1000

# (Creating test data)
(Run only once)

In [66]:
MAX_MONTHS = 36
N_APPLICAIONS = 10_000
applications1 = spark.range(N_APPLICAIONS, numPartitions=10).selectExpr(
    "printf('APP_%010d', INT(id)) AS `application.id`",
    "INT(rand() * 100) * 1000 as amount", 
    "array('car', 'unsec', 'credit')[int(rand()*3)] as kind",
    *[f"round(rand(), 4) AS feature_{x}" for x in "abcde"])
applications1.write.mode("overwrite").format("json").save(DATA_PATH)

In [67]:
!ls -l {DATA_PATH}
!head {DATA_PATH}{os.listdir(DATA_PATH)[1]}

total 3200
-rw-r--r--  1 tal.franji  wheel       0 Jun 19 11:05 _SUCCESS
-rw-r--r--  1 tal.franji  wheel  159907 Jun 19 11:05 part-00000-0b485855-3f44-4d3a-8228-beae45877f32-c000.json
-rw-r--r--  1 tal.franji  wheel  159896 Jun 19 11:05 part-00001-0b485855-3f44-4d3a-8228-beae45877f32-c000.json
-rw-r--r--  1 tal.franji  wheel  160036 Jun 19 11:05 part-00002-0b485855-3f44-4d3a-8228-beae45877f32-c000.json
-rw-r--r--  1 tal.franji  wheel  159975 Jun 19 11:05 part-00003-0b485855-3f44-4d3a-8228-beae45877f32-c000.json
-rw-r--r--  1 tal.franji  wheel  159939 Jun 19 11:05 part-00004-0b485855-3f44-4d3a-8228-beae45877f32-c000.json
-rw-r--r--  1 tal.franji  wheel  160002 Jun 19 11:05 part-00005-0b485855-3f44-4d3a-8228-beae45877f32-c000.json
-rw-r--r--  1 tal.franji  wheel  159976 Jun 19 11:05 part-00006-0b485855-3f44-4d3a-8228-beae45877f32-c000.json
-rw-r--r--  1 tal.franji  wheel  159949 Jun 19 11:05 part-00007-0b485855-3f44-4d3a-8228-beae45877f32-c000.json
-rw-r--r--  1 tal.franji  wheel  159952