# pagaya-map_in_pandas

Easy python wrapper for Spark mapInPandas, applyInPandas

[this notebook on github](https://github.com/pagaya/conf-talks/blob/master/map_in_pandas/map_in_pandas.ipynb)

## Goal
Easily run legacy pandas-based python functions at scale on large amounts of data.

## applications
* Running legacy transformations, feature extraction etc.
* large scale model-evaluation
* large scale experiments and parameter tuning.
* A-B testing
* Concurrent training (e.g. xgboost)

In [1]:
import os
import pandas as pd

DATA_PATH = "/tmp/mappandas/" 

# Demo - map_in_pandas
## load the data

In [34]:
applications = spark.read.format("json").load(DATA_PATH)
applications.show()

+------+--------------+---------+---------+---------+---------+---------+
|amount|application.id|feature_a|feature_b|feature_c|feature_d|feature_e|
+------+--------------+---------+---------+---------+---------+---------+
|  1000|APP_0000003000|   0.5056|   0.5332|   0.8988|   0.0167|   0.1865|
| 77000|APP_0000003001|   0.2841|   0.3322|    0.954|   0.2985|   0.4344|
| 23000|APP_0000003002|   0.9934|   0.1634|   0.2316|   0.5968|   0.2253|
| 90000|APP_0000003003|   0.5882|   0.8698|   0.7088|   0.2831|   0.7278|
| 15000|APP_0000003004|   0.0545|   0.3589|   0.1225|   0.8014|   0.2392|
| 96000|APP_0000003005|   0.8458|   0.7208|   0.5997|   0.3011|   0.9883|
| 79000|APP_0000003006|   0.2966|   0.8867|   0.9708|   0.4768|   0.0424|
| 76000|APP_0000003007|   0.0448|   0.1714|   0.8282|   0.1251|    0.041|
| 39000|APP_0000003008|   0.9661|   0.7887|   0.5307|   0.0577|   0.2891|
| 17000|APP_0000003009|   0.6397|   0.8368|   0.4307|   0.8176|    0.685|
| 96000|APP_0000003010|   0.0657|   0.

# legacy code
I have many legacy functions that work pandas->pandas

In [35]:
def legacy_preprocessing(pd_df):
    pd_df['application.id'] = pd_df['application.id'] + "/M"
    pd_df['ab'] = pd_df.feature_a + pd_df.feature_b
    pd_df['cd'] = pd_df.feature_c * pd_df.feature_d
    # remove the 'features_X' columns
    return pd_df.drop(columns=[col for col in pd_df.columns if col.startswith("feature")])


# How I want things to work
## (pagaya-map_in_pandas)

In [36]:
from map_in_pandas import mappandas
results = mappandas.map_in_pandas(spark, applications, legacy_preprocessing)
results.show()

[Stage 19:>                                                         (0 + 1) / 1]

+------+----------------+-------------------+--------------------+
|amount|  application.id|                 ab|                  cd|
+------+----------------+-------------------+--------------------+
|  1000|APP_0000003000/M| 1.0388000000000002|0.015009960000000001|
| 77000|APP_0000003001/M| 0.6163000000000001|            0.284769|
| 23000|APP_0000003002/M|             1.1568|          0.13821888|
| 90000|APP_0000003003/M|              1.458|          0.20066128|
| 15000|APP_0000003004/M|             0.4134|           0.0981715|
| 96000|APP_0000003005/M|             1.5666|          0.18056967|
| 79000|APP_0000003006/M|             1.1833|          0.46287744|
| 76000|APP_0000003007/M|             0.2162|          0.10360782|
| 39000|APP_0000003008/M|             1.7548|          0.03062139|
| 17000|APP_0000003009/M| 1.4765000000000001|          0.35214032|
| 96000|APP_0000003010/M|             0.7312| 0.49603467999999995|
| 70000|APP_0000003011/M| 1.5011999999999999|          0.27953

                                                                                

# The reality of working with spark.mapInPandas
## schema
Spark mapInPandas expects me to define a schema. 

However - I did not write the function - I don't know which fields it returns.

I need to run the function on a small chunk of data

In [37]:
small_results = legacy_preprocessing(applications.limit(100).toPandas())
small_results

Unnamed: 0,amount,application.id,ab,cd
0,1000,APP_0000003000/M,1.0388,0.015010
1,77000,APP_0000003001/M,0.6163,0.284769
2,23000,APP_0000003002/M,1.1568,0.138219
3,90000,APP_0000003003/M,1.4580,0.200661
4,15000,APP_0000003004/M,0.4134,0.098171
...,...,...,...,...
95,28000,APP_0000003095/M,1.1364,0.473482
96,43000,APP_0000003096/M,0.6847,0.138079
97,99000,APP_0000003097/M,1.0983,0.299509
98,28000,APP_0000003098/M,0.3022,0.159553


In [None]:
# I can now know the schems:
# schema must have `` around field names with special characters
RESULT_SCHEMA = "`application.id` string, amount int, ab float, cd float"

# Spark API akwardness

mapInPandas accepts a function that get several pd.Dataframes (an iterator) 
and returns several pd.Dataframe

So I need to wrap the legacy function in a for-loop generator

## other weird bugs
There are some bugs with fields that have '.' in them - even if surrounded by `

In [38]:
def legacy_wrapper(pd_df_iter):
    # mapInPandas accepts a function that get several pd.Dataframes (an iterator) and returns several pd.Dataframe
    for pd_df in pd_df_iter:
        yield legacy_preprocessing(pd_df)
results = applications.mapInPandas(legacy_preprocessing, RESULT_SCHEMA)

AnalysisException: Cannot resolve column name "application.id" among (amount, application.id, feature_a, feature_b, feature_c, feature_d, feature_e); did you mean to quote the `application.id` column?

# Let's skip the bugs 
Fix the data so there is no special character in column names
## fix the legacy function
This is the worse - I need to get into a function I don't know and try to fix its name handling

## And... we made it
We manged to scale the code.

For one function...

In [41]:
applications_FIXED = applications.withColumnRenamed('application.id', 'application_id')  ### Fix the data

def legacy_preprocessing_FIXED(pd_df):
    pd_df['application_id'] = pd_df['application_id'] + "/M"  ### had to fix this line
    pd_df['ab'] = pd_df.feature_a + pd_df.feature_b
    pd_df['cd'] = pd_df.feature_c * pd_df.feature_d
    # remove the 'features_X' columns
    return pd_df.drop(columns=[col for col in pd_df.columns if col.startswith("feature")])


RESULT_SCHEMA_FIXED = "application_id string, amount int, ab float, cd float"

def legacy_wrapper_FIXED(pd_df_iter):
    for pd_df in pd_df_iter:
        yield legacy_preprocessing_FIXED(pd_df)

results = applications_FIXED.mapInPandas(legacy_wrapper_FIXED, RESULT_SCHEMA_FIXED)
results.show()


[Stage 23:>                                                         (0 + 1) / 1]

+----------------+------+------+----------+
|  application_id|amount|    ab|        cd|
+----------------+------+------+----------+
|APP_0000003000/M|  1000|1.0388|0.01500996|
|APP_0000003001/M| 77000|0.6163|  0.284769|
|APP_0000003002/M| 23000|1.1568|0.13821888|
|APP_0000003003/M| 90000| 1.458|0.20066129|
|APP_0000003004/M| 15000|0.4134| 0.0981715|
|APP_0000003005/M| 96000|1.5666|0.18056966|
|APP_0000003006/M| 79000|1.1833|0.46287745|
|APP_0000003007/M| 76000|0.2162|0.10360782|
|APP_0000003008/M| 39000|1.7548|0.03062139|
|APP_0000003009/M| 17000|1.4765| 0.3521403|
|APP_0000003010/M| 96000|0.7312|0.49603468|
|APP_0000003011/M| 70000|1.5012|0.27953613|
|APP_0000003012/M| 56000|0.8664|0.04679532|
|APP_0000003013/M|  5000|0.7576|0.05587302|
|APP_0000003014/M| 74000|1.3384|0.08994418|
|APP_0000003015/M|  1000|0.6803|0.67947406|
|APP_0000003016/M| 19000|0.3561|0.04130205|
|APP_0000003017/M| 37000|1.0392|0.18053356|
|APP_0000003018/M| 86000|0.3175|0.08405565|
|APP_0000003019/M| 16000|0.5896|

                                                                                

# (Creating test data)
(Run only once)

In [22]:
MAX_MONTHS = 36
N_APPLICAIONS = 10_000
applications1 = spark.range(N_APPLICAIONS, numPartitions=10).selectExpr(
    "printf('APP_%010d', INT(id)) AS `application.id`",
    "INT(rand() * 100) * 1000 as amount",
    *[f"round(rand(), 4) AS feature_{x}" for x in "abcde"])
applications1.write.mode("overwrite").format("json").save(DATA_PATH)

In [31]:
!ls -l {DATA_PATH}
!head {DATA_PATH}{os.listdir(DATA_PATH)[0]}

total 2880
-rw-r--r--  1 tal.franji  wheel       0 Jun 16 18:38 _SUCCESS
-rw-r--r--  1 tal.franji  wheel  145298 Jun 16 18:38 part-00000-44099317-5c47-4c15-9ea2-2180ec88a00d-c000.json
-rw-r--r--  1 tal.franji  wheel  145358 Jun 16 18:38 part-00001-44099317-5c47-4c15-9ea2-2180ec88a00d-c000.json
-rw-r--r--  1 tal.franji  wheel  145320 Jun 16 18:38 part-00002-44099317-5c47-4c15-9ea2-2180ec88a00d-c000.json
-rw-r--r--  1 tal.franji  wheel  145360 Jun 16 18:38 part-00003-44099317-5c47-4c15-9ea2-2180ec88a00d-c000.json
-rw-r--r--  1 tal.franji  wheel  145327 Jun 16 18:38 part-00004-44099317-5c47-4c15-9ea2-2180ec88a00d-c000.json
-rw-r--r--  1 tal.franji  wheel  145293 Jun 16 18:38 part-00005-44099317-5c47-4c15-9ea2-2180ec88a00d-c000.json
-rw-r--r--  1 tal.franji  wheel  145350 Jun 16 18:38 part-00006-44099317-5c47-4c15-9ea2-2180ec88a00d-c000.json
-rw-r--r--  1 tal.franji  wheel  145250 Jun 16 18:38 part-00007-44099317-5c47-4c15-9ea2-2180ec88a00d-c000.json
-rw-r--r--  1 tal.franji  wheel  145272