# comparison between Pandas, Dask, and Koalas

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import copy
import numpy as np
import pandas as pd
from pandas.testing import assert_frame_equal
from pandas.testing import assert_series_equal
from sklearn.tree import DecisionTreeClassifier
from xgboost import XGBClassifier
import treelite
import treelite_runtime
import dill
import joblib
from sklearn.metrics import make_scorer
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import RandomizedSearchCV
from xgboost import XGBClassifier
from pyspark.ml.classification import RandomForestClassifier as RFCSpark

**Gators** imports

In [3]:
# data cleaning
from gators.data_cleaning import (
    DropColumns,
    Replace,
)
# imputers
from gators.imputers import (
    NumericsImputer, 
    ObjectImputer,
)
# encoders
from gators.encoders import (
    WOEEncoder,
)
# binning
from gators.binning import (
    BinRareCategories,
    BinSingleTargetClassCategories,
    Binning,
    CustomBinning,
    QuantileBinning,
    TreeBinning,
)
# feature generation
from gators.feature_generation import (
    PolynomialFeatures,
    ElementaryArithmetics,
    PolynomialObjectFeatures,
    IsNull,
)
from gators.feature_generation_str import (
    StringContains,
    StringLength,
    Extract,
    SplitExtract,
)
# feature selection
from gators.feature_selection import (
    SelectFromModel,
    InformationValue
)
# model building
from gators.model_building import (
    Model,
    TrainTestSplit,
    XGBBoosterBuilder,
    XGBTreeliteDumper,
)
# pipeline
from gators.pipeline import Pipeline

## pipeline

In [4]:
steps = [
    ('SplitExtractName', SplitExtract(['Name'], [', '], [1], ['Dummy'])),
    ('SplitExtractTitle', SplitExtract(['Dummy'], ['.'], [0], ['Title'])),
    ('StringLength', StringLength(columns=['Cabin', 'Ticket'])),
    ('DropColumns', DropColumns(['Name', 'Dummy', 'Cabin', 'Ticket'])),
    ('ObjectImputer', ObjectImputer(strategy='constant', value='MISSING')),
    ('BinSingleTargetClassCategories', BinSingleTargetClassCategories()),
    ('NumericsImputer', NumericsImputer(strategy='mean')),
    ('ElementaryArithmetics', ElementaryArithmetics(
        operator='+',
        columns_a=['SibSp'], 
        columns_b=['Parch'], 
        column_names=['FamilySize'])),
    ('TreeBinning', TreeBinning(
        tree=DecisionTreeClassifier(max_depth=2, min_samples_leaf=25),
        inplace=True)),
    ('PolynomialObjectFeatures', PolynomialObjectFeatures(
        columns=[
            'Pclass', 'Sex', 'Age', 'Fare', 'Embarked', 
            'Title', 'FamilySize'],
        degree=2)),
    ('CleanCatego', BinRareCategories(min_ratio=0.)),
    ('Encoder', WOEEncoder()),
    ('Model', Model(
        model=XGBClassifier(random_state=0, eval_metric='logloss', use_label_encoder=False)
    )),
]

pipe = Pipeline(steps=steps, verbose=False)

## Pandas pipeline

In [5]:
data = pd.read_parquet('data/titanic.parquet')
data = data.reset_index(drop=True)
y = data['Survived']
X = data.drop('Survived', axis=1)
train_test_split = TrainTestSplit(test_ratio=0.3, strategy='ordered')
X_train, X_test, y_train, y_test = train_test_split.transform(X, y)

In [6]:
_ = pipe.fit(X_train, y_train)

In [7]:
# split prod pipeline and model
model_pd = pipe[-1].model
prod_pipe_pd = copy.deepcopy(pipe)
_ = prod_pipe_pd.steps.pop(-1)

In [8]:
X_train_prepro_pd = prod_pipe_pd.transform(X_train)
X_test_prepro_pd = prod_pipe_pd.transform(X_test)
y_test_pred_proba_pd = model_pd.predict_proba(X_test_prepro_pd)[:, 1]

## Dask pipeline

In [9]:
import dask.dataframe as dd
import dask.distributed
client = dask.distributed.Client()
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 16,Total memory: 64.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:52055,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 16
Started: Just now,Total memory: 64.00 GiB

0,1
Comm: tcp://127.0.0.1:52076,Total threads: 4
Dashboard: http://127.0.0.1:52077/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:52058,
Local directory: /Users/cpoli/opensource/gators/examples/dask-worker-space/worker-9u3ctyh7,Local directory: /Users/cpoli/opensource/gators/examples/dask-worker-space/worker-9u3ctyh7

0,1
Comm: tcp://127.0.0.1:52079,Total threads: 4
Dashboard: http://127.0.0.1:52080/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:52059,
Local directory: /Users/cpoli/opensource/gators/examples/dask-worker-space/worker-f7nc6qch,Local directory: /Users/cpoli/opensource/gators/examples/dask-worker-space/worker-f7nc6qch

0,1
Comm: tcp://127.0.0.1:52082,Total threads: 4
Dashboard: http://127.0.0.1:52084/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:52060,
Local directory: /Users/cpoli/opensource/gators/examples/dask-worker-space/worker-g_pdk5u1,Local directory: /Users/cpoli/opensource/gators/examples/dask-worker-space/worker-g_pdk5u1

0,1
Comm: tcp://127.0.0.1:52083,Total threads: 4
Dashboard: http://127.0.0.1:52085/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:52061,
Local directory: /Users/cpoli/opensource/gators/examples/dask-worker-space/worker-b8psevqt,Local directory: /Users/cpoli/opensource/gators/examples/dask-worker-space/worker-b8psevqt


In [10]:
data_dd = dd.read_parquet('data/titanic.parquet')
data_dd = data_dd.reset_index(drop=True)
y_dd = data_dd['Survived']
X_dd = data_dd.drop('Survived', axis=1)

train_test_split = TrainTestSplit(test_ratio=0.3, strategy='ordered')
X_train_dd, X_test_dd, y_train_dd, y_test_dd = train_test_split.transform(X_dd, y_dd)
X_train_dd = client.persist(X_train_dd)
X_test_dd = client.persist(X_test_dd)
y_train_dd = client.persist(y_train_dd)
y_test_dd = client.persist(y_test_dd)

In [11]:
_ = pipe.fit(X_train_dd, y_train_dd)

In [12]:
# split prod pipeline and model
model_dd = pipe[-1].model
prod_pipe_dd = copy.deepcopy(pipe)
_ = prod_pipe_dd.steps.pop(-1)

In [13]:
X_train_prepro_dd = prod_pipe_dd.transform(X_train_dd)
X_test_prepro_dd = prod_pipe_dd.transform(X_test_dd)
y_test_pred_proba_dd = model_dd.predict_proba(X_test_prepro_dd)[:, 1]

## Koalas pipeline

In [14]:
from pyspark import SparkConf, SparkContext

conf = SparkConf()
conf.set('spark.executor.memory', '2g')
conf.set('spark.sql.codegen.wholeStage', 'false')
conf.set('spark.sql.autoBroadcastJoinThreshold', -1)
SparkContext(conf=conf)
import databricks.koalas as ks
ks.set_option('compute.default_index_type', 'distributed-sequence')

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/02/02 06:12:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/02/02 06:12:25 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/02/02 06:12:25 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [17]:
!pip freeze | grep dill
!pip install dill==0.3.1.1

dill==0.3.4
Collecting dill==0.3.1.1
  Downloading dill-0.3.1.1.tar.gz (151 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 KB[0m [31m1.3 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hBuilding wheels for collected packages: dill
  Building wheel for dill (setup.py) ... [?25ldone
[?25h  Created wheel for dill: filename=dill-0.3.1.1-py3-none-any.whl size=78531 sha256=2b8b47e9691fa948989ba2c8ccef5fe89fd2ff313ac0e9da7cab5eb6f4e64ea8
  Stored in directory: /Users/cpoli/Library/Caches/pip/wheels/a4/61/fd/c57e374e580aa78a45ed78d5859b3a44436af17e22ca53284f
Successfully built dill
Installing collected packages: dill
  Attempting uninstall: dill
    Found existing installation: dill 0.3.4
    Uninstalling dill-0.3.4:
      Successfully uninstalled dill-0.3.4
Successfully installed dill-0.3.1.1


In [18]:
data_ks = ks.read_parquet('data/titanic.parquet')
data_ks = data_ks.reset_index(drop=True)
y_ks = data_ks['Survived']
X_ks = data_ks.drop('Survived', axis=1)
train_test_split = TrainTestSplit(test_ratio=0.3, strategy='ordered')
X_train_ks, X_test_ks, y_train_ks, y_test_ks = train_test_split.transform(X_ks, y_ks)

Traceback (most recent call last):
  File "/Users/cpoli/gators37/lib/python3.7/site-packages/pyspark/serializers.py", line 437, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
  File "/Users/cpoli/gators37/lib/python3.7/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 102, in dumps
    cp.dump(obj)
  File "/Users/cpoli/gators37/lib/python3.7/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 563, in dump
    return Pickler.dump(self, obj)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 437, in dump
    self.save(obj)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 789, in save_tuple
    save(element)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 504, in save
    f(self, ob

PicklingError: Could not serialize object: ValueError: Cell is empty

In [None]:
from pyspark.ml.classification import RandomForestClassifier as RFCSpark

steps_ks = steps[:-1] + [
    ('Model', Model(
        model=RFCSpark(numTrees=15, maxDepth=3, labelCol='Survived', seed=0)
    ))
]

pipe_ks = Pipeline(steps=steps_ks, verbose=False)

In [None]:
_ = pipe_ks.fit(X_train_ks, y_train_ks)

In [None]:
# split prod pipeline and model
# note that koalas pipelines cannot be pickled
model_ks = pipe_ks[-1]
prod_pipe_ks = pipe_ks
_ = pipe_ks.steps.pop(-1)

In [None]:
X_train_prepro_ks = prod_pipe_ks.transform(X_train_ks)
X_test_prepro_ks = prod_pipe_ks.transform(X_test_ks)

In [None]:
y_test_pred_proba_ks = model_ks.predict_proba(X_test_prepro_ks)

## check results match

#### check pandas, dask, and koalas results match - production pipeline

In [None]:
assert_frame_equal(
    X_train_prepro_pd,
    X_train_prepro_dd.compute())
assert_frame_equal(
    X_train_prepro_pd,
    X_train_prepro_ks.to_pandas())
assert_frame_equal(
    X_test_prepro_pd,
    X_test_prepro_dd.compute())
assert_frame_equal(
    X_test_prepro_pd.reset_index(drop=True),
    X_test_prepro_ks.to_pandas())

#### check pandas and dask results match - predictions

In [None]:
assert np.allclose(y_test_pred_proba_pd, y_test_pred_proba_dd)