In [1]:
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
import pandas as pd
import numpy as np
import pickle

# Step 1: Train a Model using scikit-learn

#### Make some fake data and create Train Test splits

In [2]:
n_samples = 10000
n_features = 20

X, y = make_classification(n_samples=n_samples, n_features=n_features, random_state=123) #Make some fake data
column_names = [f'feature{i}' for i in range(n_features)] #Set column names
X = pd.DataFrame(X, columns=column_names)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=45) #Split into train and test


In [3]:
X_test.to_parquet('X_test_DataFrame') #Write X_test dataframe to disk

#### Train a model on the data and write model to disk

In [4]:
model_rf = RandomForestClassifier(n_estimators=100, max_depth=2, random_state=0)
model = model_rf.fit(X_train, y_train)
predictions = model.predict(X_test)

In [5]:
predictions

array([1, 0, 1, ..., 0, 0, 0])

In [6]:
RFdummyModel = open('RFdummyModel.p', 'wb')
pickle.dump(model, RFdummyModel)
RFdummyModel.close()

# Step 2: Read the model and do prediction using Spark Pandas UDF

### Initiate Spark Session

In [7]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[4]').appName('ML Pred using Pandas UDF').getOrCreate()

In [8]:
spark

#### Read Model and Xtest data from disk

In [9]:
RFdummy = open('RFdummyModel.p', 'rb')
model_readFromDisk = pickle.load(RFdummy) #model read from disk
RFdummy.close()

X_test_Parquet = spark.read.parquet('X_test_DataFrame')

In [10]:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType

@pandas_udf(returnType=DoubleType())
def predict_pandas_udf(*features):
    """ Executes the prediction using numpy arrays.
         
        Parameters
        ----------
        features : List[pd.Series]
            The features for the model, with each feature in it's
            owns pandas Series.
         
        Returns
        -------
        pd.Series
            The predictions.
    """
    # Need a multi-dimensional numpy array for sklearn models.
    X = pd.concat(features, axis=1).values
    # If model is somewhere in the driver we're good.
    y = model_readFromDisk.predict(X)  # This is vectorized.
    return pd.Series(y)

In [11]:
predict_Spark = X_test_Parquet.select(predict_pandas_udf(*column_names).alias('prediction'))

# Step 3: Check whether pySpark UDF predictions and scikit-learn predictions are same

In [12]:
predict_Spark_array = np.array(predict_Spark.toPandas().apply(int, axis=1))

In [13]:
np.alltrue(predictions == predict_Spark_array)

True