In [1]:
# %load snippet/default_notebook_setup.py
## Not all libraries support reloads. This might break things.
## https://ipython.org/ipython-doc/3/config/extensions/autoreload.html
## use %autoreload 1 and %aimport project for selective reloads

%reload_ext autoreload
%autoreload 1

## Load environment variables from our .env file
#%load_ext dotenv
#%dotenv

# Add the jupyter home directory to the python path
# to find our project code base for imports
import sys
sys.path.append('/home/jovyan')


In [2]:
# %load snippet/default_spark.py
import pyspark
from pyspark.sql import SparkSession, Window, DataFrame
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = (
    SparkSession.builder
    .config('spark.jars.packages', 'ml.combust.mleap:mleap-spark-base_2.11:0.14.0,ml.combust.mleap:mleap-spark_2.11:0.14.0')
    .config('spark.sql.execution.arrow.enabled', 'true')
    .getOrCreate()
)

In [5]:
%aimport project
from project.data import raw

In [6]:
iris_df = raw.load_iris('../')
iris_df.show(1)

+---------------+--------------+---------------+--------------+-----------+
|sepal_length_cm|sepal_width_cm|petal_length_cm|petal_width_cm|      class|
+---------------+--------------+---------------+--------------+-----------+
|            5.1|           3.5|            1.4|           0.2|Iris-setosa|
+---------------+--------------+---------------+--------------+-----------+
only showing top 1 row



# Step 1: Development

In a first step we use Jupyter to experiment with the data and Spark to build a functioning feature pipeline. We use all the flexibility of Notebooks here to print out the results and test the code we write.

In [30]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, PolynomialExpansion, StandardScaler

In [7]:
assembler = VectorAssembler(
    inputCols=["sepal_length_cm", "sepal_width_cm", "petal_length_cm", "petal_width_cm"],
    outputCol="features"
)
features = assembler.transform(iris_df)
features.show(1)

+---------------+--------------+---------------+--------------+-----------+-----------------+
|sepal_length_cm|sepal_width_cm|petal_length_cm|petal_width_cm|      class|         features|
+---------------+--------------+---------------+--------------+-----------+-----------------+
|            5.1|           3.5|            1.4|           0.2|Iris-setosa|[5.1,3.5,1.4,0.2]|
+---------------+--------------+---------------+--------------+-----------+-----------------+
only showing top 1 row



In [8]:
scaler = StandardScaler(
    inputCol="features", outputCol="scaledFeatures",
    withStd=True, withMean=True
)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(features)

# Normalize each feature to have unit standard deviation.
scaledFeatures = scalerModel.transform(features)
scaledFeatures.show(1, False)

+---------------+--------------+---------------+--------------+-----------+-----------------+-------------------------------------------------------------------------------+
|sepal_length_cm|sepal_width_cm|petal_length_cm|petal_width_cm|class      |features         |scaledFeatures                                                                 |
+---------------+--------------+---------------+--------------+-----------+-----------------+-------------------------------------------------------------------------------+
|5.1            |3.5           |1.4            |0.2           |Iris-setosa|[5.1,3.5,1.4,0.2]|[-0.8976738791967643,1.0286112808972372,-1.3367940202882502,-1.308592819437957]|
+---------------+--------------+---------------+--------------+-----------+-----------------+-------------------------------------------------------------------------------+
only showing top 1 row



In [9]:
polyExpansion = PolynomialExpansion(degree=3, inputCol="scaledFeatures", outputCol="polyFeatures")
polyFeatures = polyExpansion.transform(scaledFeatures)
polyFeatures.select('polyFeatures').take(1)

[Row(polyFeatures=DenseVector([-0.8977, 0.8058, -0.7234, 1.0286, -0.9234, 0.8289, 1.058, -0.9498, 1.0883, -1.3368, 1.2, -1.0772, -1.375, 1.2343, -1.4144, 1.787, -1.6042, 1.8381, -2.3889, -1.3086, 1.1747, -1.0545, -1.346, 1.2083, -1.3845, 1.7493, -1.5703, 1.7994, -2.3385, 1.7124, -1.5372, 1.7614, -2.2891, -2.2409]))]

# Step 2: Incorporate  the Feature Pipeline into the Project code base

After we have functioning code for our feature pipeline, we incorporate that code into our project codebase.
We can now use that functions and even test that both versions behave identically. In a next step we can 

1. Delete the previous code cells for Step 1
1. Restart the kernel
1. Rerun all the code cells in order to assure a clean state
1. Continue working on the next tasks

In [10]:
from project.data import features

In [6]:
feature_pipeline = features.train_new_feature_pipeline(iris_df)

In [7]:
feature_pipeline.transform(iris_df).show(1)

+---------------+--------------+---------------+--------------+-----------+-----------+-----------------+--------------------+--------------------+
|sepal_length_cm|sepal_width_cm|petal_length_cm|petal_width_cm|      class|class_index|         features|      scaledFeatures|        polyFeatures|
+---------------+--------------+---------------+--------------+-----------+-----------+-----------------+--------------------+--------------------+
|            5.1|           3.5|            1.4|           0.2|Iris-setosa|        0.0|[5.1,3.5,1.4,0.2]|[-0.8976738791967...|[-0.8976738791967...|
+---------------+--------------+---------------+--------------+-----------+-----------+-----------------+--------------------+--------------------+
only showing top 1 row



In [8]:
feature_pipeline.transform(iris_df).select('polyFeatures').take(1)

[Row(polyFeatures=DenseVector([-0.8977, 0.8058, -0.7234, 1.0286, -0.9234, 0.8289, 1.058, -0.9498, 1.0883, -1.3368, 1.2, -1.0772, -1.375, 1.2343, -1.4144, 1.787, -1.6042, 1.8381, -2.3889, -1.3086, 1.1747, -1.0545, -1.346, 1.2083, -1.3845, 1.7493, -1.5703, 1.7994, -2.3385, 1.7124, -1.5372, 1.7614, -2.2891, -2.2409]))]

# Step 3: Model Experimentation with Mlflow

It is important to track parameters and metrics when experimenting with models and data. __Mlflow__ is a great tool to track experimentation and save models. Let's use MLflow to save our feature pipeline which uses the Spark StandardScaler which requires fitting and is not stateless.

## Saving a spark model with Mlflow

Unfortunately, there are some bugs preventing boto3 to upload empty files to minio (or S3 stand-in for local development). To circumvent this problem we save the model locally with Mlflow and upload it afterwards with our __log_artifacts_minio()__ function.

In [11]:
import os 
import mlflow
import mlflow.spark
from project.utility.mlflow import log_artifacts_minio

In [12]:
experiment = 'iris_features'
mlflow.set_tracking_uri(os.environ['MLFLOW_TRACKING_URI'])
mlflow.set_experiment(experiment)

In [14]:
degree = 3
with mlflow.start_run() as run:
    feature_pipeline = features.train_new_feature_pipeline(iris_df, degree)
    mlflow.log_param("degree", degree)
    mlflow.spark.save_model(
        feature_pipeline, 
        'feature_pipeline', 
        sample_input=iris_df.select(
            'sepal_length_cm',
            'sepal_width_cm',
            'petal_length_cm',
            'petal_width_cm'
        )
    )
    log_artifacts_minio(run, 'feature_pipeline', 'feature_pipeline', True)
    run_id = run.info.run_id
    print(run.info)

<RunInfo: artifact_uri='s3://artifacts/0/f3a735a824264043a7978ee1aa0230d6/artifacts', end_time=None, experiment_id='0', lifecycle_stage='active', run_id='f3a735a824264043a7978ee1aa0230d6', run_uuid='f3a735a824264043a7978ee1aa0230d6', start_time=1572870582878, status='RUNNING', user_id='jovyan'>


## Loading a spark model using Mlflow

We load the seriealised model with Mlflow using the previous run_id

In [15]:
artifact_uri = 'runs:/{}/feature_pipeline'.format(run_id)
model = mlflow.spark.load_model(artifact_uri)
print(artifact_uri)

runs:/f3a735a824264043a7978ee1aa0230d6/feature_pipeline


In [16]:
model.transform(iris_df).show(1)

+---------------+--------------+---------------+--------------+-----------+-----------------+--------------------+--------------------+
|sepal_length_cm|sepal_width_cm|petal_length_cm|petal_width_cm|      class|         features|      scaledFeatures|        polyFeatures|
+---------------+--------------+---------------+--------------+-----------+-----------------+--------------------+--------------------+
|            5.1|           3.5|            1.4|           0.2|Iris-setosa|[5.1,3.5,1.4,0.2]|[-0.8976738791967...|[-0.8976738791967...|
+---------------+--------------+---------------+--------------+-----------+-----------------+--------------------+--------------------+
only showing top 1 row



We can also get logged metrics and parameters from the tracked run:

In [17]:
c = mlflow.tracking.MlflowClient()
r = c.get_run(run_id)
print(r)

<Run: data=<RunData: metrics={}, params={'degree': '3'}, tags={'mlflow.source.name': '/opt/conda/lib/python3.7/site-packages/ipykernel_launcher.py',
 'mlflow.source.type': 'LOCAL',
 'mlflow.user': 'jovyan'}>, info=<RunInfo: artifact_uri='s3://artifacts/0/f3a735a824264043a7978ee1aa0230d6/artifacts', end_time=1572870584338, experiment_id='0', lifecycle_stage='active', run_id='f3a735a824264043a7978ee1aa0230d6', run_uuid='f3a735a824264043a7978ee1aa0230d6', start_time=1572870582878, status='FINISHED', user_id='jovyan'>>


In [18]:
r.data.params['degree']

'3'