In [1]:
# MLFlow Notebook
import mlflow
from mlflow.tracking import MlflowClient
import azureml
from azureml.core import Workspace, Run, Experiment

import os, shutil

from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
import datetime as dt
from pyspark.ml.feature import OneHotEncoder, VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder 

In [2]:
# set aml workspace parameters here. 
subscription_id = ""
resource_group = ""
workspace_name = ""
workspace_region = ""

ws = Workspace(subscription_id = subscription_id, resource_group = resource_group, workspace_name = workspace_name)

In [3]:
mlflow.set_tracking_uri(ws.get_mlflow_tracking_uri())

In [4]:
# create experiment
experiment_name = 'bikeSharingDemandMLFlowAML'
exp = Experiment(workspace=ws, name=experiment_name)

In [5]:
spark.conf.set("spark.databricks.mlflow.trackMLlib.enabled", "true")

In [6]:
mlflow.set_experiment(experiment_name)

run = mlflow.start_run()
run_id = run.info.run_uuid
exp_id = run.info.experiment_id
artifact_location = run.info.artifact_uri

In [7]:
azRun = Run(exp, run_id)

In [8]:
df = (spark
      .read
      .format("csv")
      .option("inferSchema", "True")
      .option("header", "True")
      .load("/databricks-datasets/bikeSharing/data-001/day.csv")
     )
# split data
train_df, test_df = df.randomSplit([0.7, 0.3])

# One Hot Encoding
mnth_encoder = OneHotEncoder(inputCol="mnth", outputCol="encoded_mnth")
weekday_encoder = OneHotEncoder(inputCol="weekday", outputCol="encoded_weekday")

# set the training variables we want to use
train_cols = ['encoded_mnth', 'encoded_weekday', 'temp', 'hum']

# convert cols to a single features col
assembler = VectorAssembler(inputCols=train_cols, outputCol="features")

# Set linear regression model
dt = DecisionTreeRegressor(featuresCol="features", labelCol="cnt")

# Create pipeline
pipeline = Pipeline(stages=[
    mnth_encoder,
    weekday_encoder,
    assembler,
    dt
])

In [9]:
grid = (ParamGridBuilder()
  .addGrid(dt.maxDepth, [2, 3, 4, 5, 6, 7, 8])
  .addGrid(dt.maxBins, [2, 4, 8])
  .build())

In [10]:
valid_eval =  RegressionEvaluator(labelCol = "cnt", predictionCol = "prediction", metricName="rmse")

In [11]:
cv = CrossValidator(estimator=pipeline, evaluator=valid_eval, estimatorParamMaps=grid, numFolds=3)

In [12]:
cvModel = cv.fit(train_df)
mlflow.set_tag('owner_team', "Ryan") # Logs user-defined tags
test_metric = valid_eval.evaluate(cvModel.transform(test_df))
mlflow.log_metric('test_' + valid_eval.getMetricName(), test_metric) # Logs additional metrics

In [13]:
# write test predictions to datetime and lastest folder
predictions = cvModel.transform(test_df)
# mlflow log evaluations
evaluator = RegressionEvaluator(labelCol = "cnt", predictionCol = "prediction")

mlflow.log_metric("mae", evaluator.evaluate(predictions, {evaluator.metricName: "mae"}))
mlflow.log_metric("rmse", evaluator.evaluate(predictions, {evaluator.metricName: "rmse"}))
mlflow.log_metric("r2", evaluator.evaluate(predictions, {evaluator.metricName: "r2"}))

In [14]:
bestModel = cvModel.bestModel

In [15]:
model_nm = "bikeshare.mml"
model_output = '/mnt/azml/outputs/'+model_nm
model_dbfs = "/dbfs"+model_output
bestModel.write().overwrite().save(model_output)

In [16]:
model_name, model_ext = model_dbfs.split(".")

In [17]:
model_zip = model_name + ".zip"
shutil.make_archive(model_name, 'zip', model_dbfs)
azRun.upload_file("outputs/" + model_nm, model_zip)

In [18]:
azRun.register_model(model_name = 'model_nm', model_path = "outputs/" + model_nm)

In [19]:
# now delete the serialized model from local folder since it is already uploaded to run history 
shutil.rmtree(model_dbfs)
os.remove(model_zip)

In [20]:
for a in azRun.get_children():
  Run(exp, a.id).complete()

In [21]:
mlflow.end_run()