## Scikit-Learn pipeline

In [9]:
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline as skPipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression
from sklearn.metrics import root_mean_squared_error
import numpy as np
import pandas as pd

In [10]:
# variables
np.random.seed(87)
NUM_SAMPLES = 10000
TRAIN_RATIO = 0.8

# vreate simulated data
def data_simulation(num_samples):
    X = np.random.rand(num_samples, 2) * 10
    y = np.random.rand(num_samples) * 20 - 10
    return X, y

# split data in train and test
def data_splitter(X, y, train_ratio):
    X_train, X_test, y_train, y_test = train_test_split(X, y,
                                                    test_size=1-train_ratio,
                                                    random_state=87)
    return X_train, X_test, y_train, y_test


# create simulated data
X_data, y_data = data_simulation(NUM_SAMPLES)

# split data in train and test
X_train, X_test, y_train, y_test = data_splitter(
    X_data,
    y_data,
    TRAIN_RATIO
)

# create standardscaler for normalization
scaler = StandardScaler()

# create regressionmodel
regression = LinearRegression()

# create ML pipeline
steps = [
    ("scaler", scaler),
    ("regression", regression)
]
pipeline = skPipeline(steps=steps)
pipeline.fit(X_train, y_train)

# evaluate training
coefficients = pipeline.named_steps["regression"].coef_
intercept = pipeline.named_steps["regression"].intercept_

print(f"Coefficients: {coefficients}")
print(f"Intercept: {intercept}")

# make and evaluate predictions
y_pred = pipeline.predict(X_test)

data = {"y_true": y_test, "y_pred": y_pred}
df = pd.DataFrame(data)
print(df.head())

rmse = root_mean_squared_error(y_test, y_pred)
print(f"RMSE: {rmse}")

Coefficients: [-0.02440983 -0.07185289]
Intercept: -0.07350052500081823
     y_true    y_pred
0 -9.615213 -0.154982
1  4.593737 -0.136383
2  1.952026 -0.170274
3 -1.342535 -0.189420
4 -4.790354 -0.078791
RMSE: 5.687104139114933


## SparkML pipeline

In [11]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline as sparkPipeline
from pyspark.ml.evaluation import RegressionEvaluator

In [12]:
# Create spark sessrion
spark = SparkSession.builder.getOrCreate()

# Variables
NUM_SAMPLES = 10000
TRAIN_RATIO = 0.8

# Create simulated data
def data_simulation(num_samples):
    simulated_data = (spark.range(num_samples)
                    .selectExpr(
                        "id as id",
                        "(RAND() * 10) as feature1",
                        "(RAND() * 5) as feature2",
                        "(RAND() * 20 - 10) as label"))
    return simulated_data
 

# split data in train and test
def data_splitter(data, train_ratio):
    test_ratio = 1 - train_ratio
    train_data, test_data = data.randomSplit(
        [train_ratio, test_ratio],
        seed=87
    )
    return train_data, test_data
    

# create simulated data           
simulated_data = data_simulation(NUM_SAMPLES)

# create train and test splits
training_data, testing_data = data_splitter(
    simulated_data,
    TRAIN_RATIO
)
                 
 
# create vector assembler for ML model
assembler = VectorAssembler(
    inputCols=["feature1", "feature2"],
    outputCol="features"
)

# create standardscaler for normalization
scaler = StandardScaler(inputCol="features",
                        outputCol="scaledFeatures"
)

# create lnear regresion model
regression = LinearRegression(
    featuresCol="scaledFeatures",
    labelCol="label"
)

# create ML pipeline
stages = [
    assembler,
    scaler,
    regression
]
pipeline = sparkPipeline(stages=stages)
pipeline_model = pipeline.fit(training_data)

# extract fitting parameters
coefficients = pipeline_model.stages[-1].coefficients
intercept = pipeline_model.stages[-1].intercept

# print fitting parameters
print("Coefficients: ", coefficients)
print("Intercept: ", intercept)

# create predictions
preds = pipeline_model.transform(testing_data)

# show the predictions
preds.select(
    "features", "label", "prediction"
    ).show(5)

# create evaluation metric
evaluator = RegressionEvaluator(
    predictionCol="prediction",
    labelCol="label",
    metricName="rmse"
)

# evaluate the preditions
rmse = evaluator.evaluate(preds)
print("RMSE: ", rmse)

25/01/21 09:53:07 WARN Instrumentation: [98abb65c] regParam is zero, which might cause numerical instability and overfitting.


Coefficients:  [0.13348965971885035,0.04080595714807422]
Intercept:  -0.35641291108111783
+--------------------+-------------------+--------------------+
|            features|              label|          prediction|
+--------------------+-------------------+--------------------+
|[8.42284044860756...|  8.087115265911105| 0.12555900812734877|
|[7.46666388690664...|  5.086996075074966|  0.1254555951746139|
|[9.34503865374475...| -9.772565396159969|  0.1315229010280709|
|[2.25556430373861...|-4.3993419391160105|-0.11777576949341934|
|[5.69924586717390...|   6.68215801521853|-0.07701101637688762|
+--------------------+-------------------+--------------------+
only showing top 5 rows

RMSE:  5.844045987881074
