### Set things up and generate some data

In [1]:
import sys

import numpy as np
import sklearn
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV
import pandas as pd
import pyarrow
import pyspark
import pyspark.sql.functions as F
from pyspark.sql.types import DoubleType, StringType, ArrayType

# Print the versions of packages, etc.
print(f"python: {sys.version.split()[0]}")
print(f"pyspark: {pyspark.__version__}")
print(f"scikit-learn: {sklearn.__version__}")
# pyspark versions after 2.4.4 should better support pandas and pyarrow versions (https://github.com/apache/spark/pull/24867)
print(f"pandas: {pd.__version__}")
# https://github.com/apache/spark/blob/v2.4.3/python/setup.py#L106
print(f"pyarrow: {pyarrow.__version__}")


python: 3.6.9
pyspark: 2.4.3
scikit-learn: 0.21.3
pandas: 0.21.1
pyarrow: 0.8.0


In [2]:
# Make some fake data and train a model.
n_samples_test = 100000
n_samples_train = 1000
n_samples_all = n_samples_train + n_samples_test
n_features = 50

X, y = make_classification(n_samples=n_samples_all, n_features=n_features, random_state=123)
X_train, X_test, y_train, y_test = \
    train_test_split(X, y, test_size=n_samples_test, random_state=45)

# Use pandas to put the test data in parquet format to illustrate how to load it up later.
# In real usage, the data might be on S3, Azure Blog Storage, HDFS, etc.
column_names = [f'feature{i}' for i in range(n_features)]
(
    pd.DataFrame(X_test, columns=column_names)
    .reset_index()
    .rename(columns={'index': 'id'})
    .to_parquet('unlabeled_data')
)

### Train a model with scikit-learn

In [3]:
param_grid = {'n_estimators': [100], 'max_depth': [2, 4, None]}
gs_rf = GridSearchCV(
    RandomForestClassifier(random_state=42),
    param_grid=param_grid,
    scoring='roc_auc',
    cv=3
).fit(X_train, y_train)
print('ROC AUC: %.3f' % gs_rf.best_score_)

ROC AUC: 0.959


### Set up a spark environment

In [4]:
sc = pyspark.SparkContext(appName="foo")
sqlContext = pyspark.SQLContext(sc)

### Now load the data and make predictions.

In real usage, we might be doing a bunch of ETL after reading raw data, but here, we'll just load it up.

In [5]:
df_unlabeled = sqlContext.read.parquet('unlabeled_data')
df_unlabeled

DataFrame[id: bigint, feature0: double, feature1: double, feature2: double, feature3: double, feature4: double, feature5: double, feature6: double, feature7: double, feature8: double, feature9: double, feature10: double, feature11: double, feature12: double, feature13: double, feature14: double, feature15: double, feature16: double, feature17: double, feature18: double, feature19: double, feature20: double, feature21: double, feature22: double, feature23: double, feature24: double, feature25: double, feature26: double, feature27: double, feature28: double, feature29: double, feature30: double, feature31: double, feature32: double, feature33: double, feature34: double, feature35: double, feature36: double, feature37: double, feature38: double, feature39: double, feature40: double, feature41: double, feature42: double, feature43: double, feature44: double, feature45: double, feature46: double, feature47: double, feature48: double, feature49: double, __index_level_0__: bigint]

### Make predictions with a regular UDF

First, we'll try a regular UDF.  This will deserialize one row (i.e., instance, sample, record) at a time, make a prediction with the, and return a prediction, which will be serialized and sent back to Spark to combine with all the other predictions.

In [6]:
@F.udf(returnType=DoubleType())
def predict_udf(*cols):
    # cols will be a tuple of floats here.
    return float(gs_rf.predict_proba((cols,))[0, 1])

df_pred_a = df_unlabeled.select(
    F.col('id'),
    predict_udf(*column_names).alias('prediction')
)
df_pred_a.take(5)

[Row(id=0, prediction=0.96),
 Row(id=1, prediction=0.13),
 Row(id=2, prediction=0.95),
 Row(id=3, prediction=0.43),
 Row(id=4, prediction=0.95)]

### Make predictions with a Pandas UDF

Now we'll use a Pandas UDF (i.e., vectorized UDF).  In this case, Spark will send a tuple of pandas Series objects with multiple rows at a time.  The tuple will have one Series per column/feature, in the order they are passed to the UDF.  Note that one of these Series objects won't contain features for all rows at once because Spark partitions datasets across workers.  The partition size can be tuned, but we'll just use defaults here.

In [7]:
@F.pandas_udf(returnType=DoubleType())
def predict_pandas_udf(*cols):
    # cols will be a tuple of pandas.Series here.
    X = pd.concat(cols, axis=1)
    return pd.Series(gs_rf.predict_proba(X)[:, 1])

df_pred_b = df_unlabeled.select(
    F.col('id'),
    predict_pandas_udf(*column_names).alias('prediction')
)
df_pred_b.take(5)

[Row(id=0, prediction=0.96),
 Row(id=1, prediction=0.13),
 Row(id=2, prediction=0.95),
 Row(id=3, prediction=0.43),
 Row(id=4, prediction=0.95)]

### Making multiclass predictions

Above, we're just returning a single series of predictions for the positive class, which works for single binary or dependent variables.  One can also put multiclass or multilabel models in Pandas UDFs.  One just returns a series of lists of numbers instead of a series of numbers.

In [8]:
@F.pandas_udf(returnType=ArrayType(DoubleType()))
def predict_pandas_udf(*cols):
    X = pd.concat(cols, axis=1)
    return pd.Series(row.tolist() for row in gs_rf.predict_proba(X))

df_pred_multi = (
    df_unlabeled.select(
        F.col('id'),
        predict_pandas_udf(*column_names).alias('predictions')
    )
    # Select each item of the prediction array into its own column.
    .select(
        F.col('id'),
        *[F.col('predictions')[i].alias(f'prediction_{c}')
          for i, c in enumerate(gs_rf.classes_)]
    )
)
df_pred_multi.take(5)

[Row(id=0, prediction_0=0.04, prediction_1=0.96),
 Row(id=1, prediction_0=0.87, prediction_1=0.13),
 Row(id=2, prediction_0=0.05, prediction_1=0.95),
 Row(id=3, prediction_0=0.57, prediction_1=0.43),
 Row(id=4, prediction_0=0.05, prediction_1=0.95)]