# Embarrassingly Parallel Model Training on Spark — Pandas UDF

In [2]:
# Make Temporary Directory
dbutils.fs.mkdirs("dbfs:/FileStore/temporary")

In [3]:
# Import required packages
import joblib # To Pickel Trained model file 
import numpy as np # To create random data
import pandas as pd # To operate on data in Python Process
from sklearn.linear_model import LinearRegression # To train Linear Regression models

from pyspark.sql.functions import pandas_udf, PandasUDFType # Pandas UDF functions to call Python processes from spark
from pyspark.sql.types import DoubleType, StringType, ArrayType # Data types to capture reurn at Spark End

In [4]:
# Create Random linear dataset for training
df1 = pd.DataFrame({'x': np.random.normal(size=100)})
df1['y'] = df1['x']*2.5 + np.random.normal(scale=0.5, size=100) # DF1 is dummy Linear data Y = 2.5*x + random noise of 100 datapoints
df1['name'] = 'df1'
df1.to_csv('df1.csv') # Save the dataframe as .csv

In [5]:
%sh
# Zip the CSV file
gzip -f "df1.csv"

In [6]:
# Move the zipped dataset to dbfs:/FileStore/temporary
dbutils.fs.mv("file:/databricks/driver/df1.csv.gz", "dbfs:/FileStore/temporary/df1.csv.gz")

In [7]:
# Create Random linear dataset for training
df2 = pd.DataFrame({'x': np.random.normal(size=100)})
df2['y'] = df2['x']*3.0 + np.random.normal(scale=0.3, size=100) # DF2 is dummy Linear data Y = 3.0*x + random noise of 100 datapoints
df2['name'] = 'df2'
df2.to_csv('df2.csv')

In [8]:
%sh
# Zip the CSV file
gzip -f "df2.csv"

In [9]:
# Move the zipped dataset to dbfs:/FileStore/temporary
dbutils.fs.mv("file:/databricks/driver/df2.csv.gz", "dbfs:/FileStore/temporary/df2.csv.gz")

In [10]:
%fs ls dbfs:/FileStore/temporary

path,name,size
dbfs:/FileStore/temporary/df1.csv.gz,df1.csv.gz,2272
dbfs:/FileStore/temporary/df2.csv.gz,df2.csv.gz,2264


In [11]:
# Create spark data-frame from .csv.gz files
sparkDF = (spark.read
            .option("header", "true")
            .option("delimiter", ",")
            .option("inferSchema", "true") 
            .csv('dbfs:/FileStore/temporary/df*.csv.gz'))

sparkDF.rdd.getNumPartitions()

In [12]:
# Check the schema for the spark dataframe
sparkDF.printSchema()

In [13]:
# Pandas UDF to Train the models
@pandas_udf(returnType=DoubleType())
def train_lm_pandas_udf(*cols):
    df = pd.concat(cols, axis=1) # Create pandas dataframe using input Spark DataFrame columns
    df.columns = ['x', 'y', 'name']
    modelUDF = LinearRegression() # Scikit-Learn Linear Regression 
    modelUDF.fit(pd.DataFrame(df['x']),df['y']) # Fit Scikit-Learn Linear Regression Model
    sig = df.loc[0,'name'] # Unique Identiter for model files, obtained from one of the columns in dataset
    joblib.dump(modelUDF, 'new_modelUDF{signature}.joblib'.format(signature=sig)) # Pickel Thetrained model file
    return pd.Series(modelUDF.predict(pd.DataFrame(df['x']))) # Returns Predicted values on training data

In [14]:
# Create sparkDF2 to kick off training
column_names = ['x', 'y', 'name']
sparkDF2 = sparkDF.select(train_lm_pandas_udf(*column_names).alias("TrainPrediction"))

In [15]:
# Initiate Parallel training jobs
sparkDF2.rdd.count()

In [16]:
%fs ls file:/databricks/driver

path,name,size
file:/databricks/driver/conf/,conf/,4096
file:/databricks/driver/new_modelUDFdf2.joblib,new_modelUDFdf2.joblib,574
file:/databricks/driver/derby.log,derby.log,726
file:/databricks/driver/new_modelUDFdf1.joblib,new_modelUDFdf1.joblib,574
file:/databricks/driver/logs/,logs/,4096
file:/databricks/driver/ganglia/,ganglia/,4096
file:/databricks/driver/eventlogs/,eventlogs/,4096


In [17]:
# Load trained models to varify 
modeldf1 = joblib.load('new_modelUDFdf1.joblib')
modeldf2 = joblib.load('new_modelUDFdf2.joblib')

In [18]:
modeldf1.coef_

In [19]:
modeldf2.coef_