# Azure Machine Learning Pipeline (Student)

 In this notebook, we will be creating a Azure Machine Learning Pipeline for the complete stage of machine learning lifecycle:
 
 1. Data Engineering
 2. Model Training
 3. Model Management
 4. Model Deployment (to same environment)
 
![Data Engineering](./images/00-Pipeline.jpg)


Fill in the blanks and make the notebook work. This notebook might also has bugs needed to be fixed. The expected result is a scheduled published pipeline.

## Data Engineering 

**Input** : Raw Data 

**Output** : Registered Data Set (ProductReview)

In [1]:
import os
os.makedirs('data_engineering',exist_ok=True)
os.makedirs('train', exist_ok=True)
os.makedirs('model_selection', exist_ok=True)
os.makedirs('model_deploy', exist_ok=True)


In [51]:
%%writefile data_engineering/data_engineering.py
import os
import json
import gzip
import pandas as pd
from urllib.request import urlopen
import requests

from azureml.core.run import Run
from azureml.core import Dataset, Datastore, Workspace
from azureml.data.datapath import DataPath

# get run context
run = Run._____________________

# Download data from source
url = "http://deepyeti.ucsd.edu/jianmo/amazon/categoryFilesSmall/Software_5.json.gz"
response = requests.get(url, stream=True)

with open("Software_5.json.gz", "wb") as handle:
    for data in response.iter_content():
        handle.write(data)

### load the meta data
data = []
with gzip.open('Software_5.json.gz') as f:
    for l in f:
        data.append(json.loads(l.strip()))
    
# total length of list, this number equals total number of products
print(len(data))

# first row of the list
print(data[0])
df = pd.DataFrame.from_dict(data)

### remove rows with unformatted title (i.e. some 'title' may still contain html style content)
df3 = df.fillna('')
df3.iloc[2]

# Prepare dataset
workspace = _____________________
default_datastore = Datastore.get_default(workspace)

ds_name = 'ProductReview'
data_path = DataPath(datastore=default_datastore, path_on_datastore='product_review')

# Register dataset 
# Hint: register_xxxxx
ds = Dataset.Tabular.__________________(___, 
                                    ____________, 
                                    ____________, 
                                    description=None, 
                                    tags=None, 
                                    show_progress=True)


Overwriting dataengineer/data_engineer.py


### Model Training

Use existing train_2.py from previous demo.

In [None]:
%%writefile train/train_2.py
# General libraries.
import numpy as np
from sklearn.naive_bayes import MultinomialNB
from sklearn import metrics
from sklearn.metrics import classification_report,plot_confusion_matrix, ConfusionMatrixDisplay
from sklearn.model_selection import train_test_split
from azureml.core.run import Run
from sklearn.model_selection import GridSearchCV
from azureml.core import Workspace, Dataset
import matplotlib.pyplot as plt
from joblib import dump
from sklearn.feature_extraction.text import CountVectorizer

run = ________________

# Get workspace from run context
workspace = __________________________

# Load Data
dataset = Dataset.get_by_name(workspace, name='ProductReview')
data = dataset.to_pandas_dataframe()[['overall', 'reviewText']]

# Prepare X & Y
Y = data.pop('overall').to_numpy()
X = data.pop('reviewText').to_numpy()
train_x, test_x, train_y, test_y = train_test_split(X,Y, test_size = 0.1, random_state=1)


vec = CountVectorizer()
fitted_train_data = vec.fit_transform(train_x)
fitted_test_data = vec.transform(test_x)

model = MultinomialNB()
params = {'alpha': [1.0e-5, 0.0001, 0.001, 0.01, 0.1, 1.0, 10.0]}

clf = GridSearchCV(model, params, scoring = "f1_macro", verbose=0, cv = 5)
clf_result = clf.fit(fitted_train_data, train_y)
run.___("Best alpha ",clf_result.best_estimator_.alpha)
pred = clf.predict(fitted_test_data)
run.___("F1", metrics.f1_score(test_y, pred, average='weighted'))

plot_confusion_matrix(clf, fitted_test_data, test_y)  
plt.savefig('confusion_matrix.png')
# Record image to run
run.__________________(name='Confusion-Matrix', path='./confusion_matrix.png')

# Save trained model
dump(vec, './vec.pkl')
dump(clf, './mnb.pkl')
run.____________(name='vec.pkl', path_or_stream=___________)
run.____________(name='mnb.pkl', path_or_stream=___________)

### Model Selection

In this step, we will use a predefined metrics **F1**. We will list all today's runs and select the highest F1 score model, which will be registered in Model Registry and prepare for deployment.

In [109]:
%%writefile model_selection/model_select.py
import sklearn
from datetime import datetime, date
from azureml.core.run import Run
# from azureml.core import Experiment
# from azureml.core.model import Model

# get run context
run = Run.get_context()
workspace = run.experiment.workspace

# Get Experiment and runs for model select
# In this step, we will use F1
exp = run.__________ # Experiment.list(workspace, experiment_name='MLOps-Workshop')

today = date.today()

select_run = None
F1 = 0
for r in exp.get_runs():
    run_starttime = datetime.strptime(r.______________['startTimeUtc'][:10], '%Y-%m-%d').date()
    if run_starttime==today:
        for step in r.get_children():
            current_step_f1 = step.__________(name='F1') # Get F1 metrics
            if 'F1' in current_step_f1.keys() and F1<current_step_f1['F1']:
                F1=current_step_f1['F1']
                select_run = step
    
if select_run != None:
    # Load Data
    mnb_model = select_run.register_model("ProductReview-NaiveBayes",
                            model_path="./mnb.pkl",
                            )

    vector    = select_run.register_model("ProductReview-CountVector", 
                            model_path="./vec.pkl",
                            )

Overwriting model_selection/model_select.py


## Prepare for Model Package

In [112]:
%%writefile model_deploy/score.py
import json, os, joblib
from azureml.core.model import _____

def init(): 
  global vec, clf
  print(Model.get_model_path('ProductReview-NaiveBayes'))
  vec = joblib.load(Model.get_model_path('ProductReview-CountVector'))
  clf = joblib.load(Model.get_model_path('ProductReview-NaiveBayes'))

def run(data): 
  input_data = json.loads(data)['data'] 
  fitted_data = vec.transform(input_data)
  pred = clf.predict(fitted_data)
  return json.dumps(pred.tolist())


Writing model_deploy/score.py


In [117]:
%%writefile model_deploy/deploy.py
from azureml.core.model import InferenceConfig, Model
from azureml.core import Environment
from azureml.core.run import Run
from azureml.core.webservice import AciWebservice

# get run context
run = Run.get_context()
workspace = run.experiment.workspace

service_name = 'product-review-service'
env = Environment.get(workspace=workspace, name="AzureML-sklearn-0.24-ubuntu18.04-py37-cpu")

inference_config = InferenceConfig(entry_script=________, 
                            source_directory=_______,
                            environment=________)

deployment_config = AciWebservice.deploy_configuration(cpu_cores=1, memory_gb=1)

nb_model = Model(workspace, 'ProductReview-NaiveBayes')
vectorizor = Model(workspace, 'ProductReview-CountVector')

service = Model.deploy(
    _________,
    name = ___________,
    models=[nb_model, vectorizor],
    inference_config= _________,
    deployment_config= ____________,
    overwrite=True,
)
service.wait_for_deployment(show_output=True)

Overwriting model_deploy/deploy.py


In [118]:
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import Pipeline, PipelineData
import azureml.core
from azureml.core import Workspace, Environment, Experiment
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.core.runconfig import RunConfiguration
import os

workspace=Workspace.from_config()
# Get ComputeTarget
aml_compute_target = "cpu-cluster"
try:
    aml_compute = AmlCompute(workspace, aml_compute_target)
    print("found existing compute target.")
except ComputeTargetException:
    print("creating new compute target")
    
    provisioning_config = AmlCompute.provisioning_configuration(vm_size = "STANDARD_D2_V2",
                                                                min_nodes = 0, 
                                                                max_nodes = 4)    
    aml_compute = ComputeTarget.create(workspace, aml_compute_target, provisioning_config)
    aml_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
    
print("Azure Machine Learning Compute attached")

env = Environment.___(workspace=workspace, name="AzureML-sklearn-0.24-ubuntu18.04-py37-cpu")
# create a new runconfig object
runconfig = RunConfiguration()
runconfig.environment = env

dataprep_step = PythonScriptStep( name="prep_data", 
                                script_name=________________, 
	                            source_directory="data_engineering", 
                                compute_target=_________________, 
                                runconfig=runconfig                                
	                            )

train_step    = PythonScriptStep( name="train", 
                                script_name=__________, 
	                            source_directory="train", 
                                compute_target=_________________, 
                                runconfig=runconfig
	                            )
train_step.run_after(dataprep_step)

select_step   = PythonScriptStep( name="select_model", 
                                script_name=__________, 
	                            source_directory="model_selection", 
                                compute_target=_________________, 
                                runconfig=runconfig
	                            )
select_step.run_after(train_step)

deploy_step   = PythonScriptStep( name="deploy_model", 
                                script_name=__________, 
	                            source_directory="model_deploy", 
                                compute_target=____________________, 
                                runconfig=runconfig
	                            )
deploy_step.run_after(select_step)

experiment_name = 'MLOps-Workshop'
pipeline = Pipeline(workspace=workspace, steps=[_____________])
pipeline_run = Experiment(workspace, experiment_name).submit(_________)
print("Pipeline is submitted for execution")

found existing compute target.
Azure Machine Learning Compute attached
Created step deploy_model [f4398fe2][ef707c3f-6ebe-49bd-875b-abd38b13068b], (This step is eligible to reuse a previous run's output)
Created step select_model [e3cf3603][ba6b0607-caaf-42f4-800d-f11a41cd11fb], (This step is eligible to reuse a previous run's output)Created step train [fca8188c][9a901d98-bd6d-432f-b4de-1dfae832f1be], (This step is eligible to reuse a previous run's output)

Created step prep_data [215c71e8][353e7ebf-5a4e-4138-8093-a54b5c71bf41], (This step is eligible to reuse a previous run's output)
Submitted PipelineRun 33581c8a-011b-4ba9-8a64-2beb3c97f9fe
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/33581c8a-011b-4ba9-8a64-2beb3c97f9fe?wsid=/subscriptions/f3f672c1-6cfc-4f72-92ae-2b1ab1c0cf69/resourcegroups/mlrg/workspaces/mymlspace&tid=72f988bf-86f1-41af-91ab-2d7cd011db47
Pipeline is submitted for execution


## Test Endpoint

In [119]:
import requests, json

# replace uri below with your service endpoint
uri = 'http://71ed97f8-aa0f-4c14-871f-b7c9687e8443.westus2.azurecontainer.io/score'

headers = {"Content-Type": "application/json"}
comments = """
"I've been using Dreamweaver (and it's predecessor Macromedia's UltraDev) for many years.  
For someone who is an experienced web designer, this course is a high-level review of the CS5 version of Dreamweaver,
 but it doesn't go into a great enough level of detail to find it very useful.\n\nOn the other hand, 
 this is a great tool for someone who is a relative novice at web design.  
 It starts off with a basic overview of HTML and continues through the concepts necessary to build a modern web site.  
 Someone who goes through this course should exit with enough knowledge to create something that does what 
 you want it do do...within reason.  Don't expect to go off and build an entire e-commerce system with only this class 
 under your belt.\n\nIt's important to note that there's a long gap from site design to actual implementation.  
 This course teaches you how to implement a design.  The user interface and overall user experience is a different 
 subject that isn't covered here...it's possible to do a great implementation of an absolutely abysmal design.  
 I speak from experience.  :)\n\nAs I said above, if you're a novice, a relative newcomer or just an experienced web 
 designer who wants a refresher course, this is a good way to do it."
"""
sample_input = json.dumps({
    'data': [comments]
})
response = requests.post(uri, data=sample_input, headers=headers)
print(response.json())

[4.0]


## Publish Pipeline

In [138]:
published_pipeline = pipeline_run._____________(
     name="MLOps-Workshop-Pipeline",
     description="Published Pipeline for MLOps Workshop",
     version="1.0")

In [139]:
published_pipeline.id

'7cd1356f-dc16-4df6-be0d-622e3da5ba63'

## Schedule Pipeline runs

https://docs.microsoft.com/en-us/azure/machine-learning/how-to-trigger-published-pipeline


In [140]:
from azureml.pipeline.core.schedule import ScheduleRecurrence, Schedule

## create time-based pipeline
# Frequency can be Minute / Hour / Day / Week / Month
recurrence = ScheduleRecurrence(frequency="Month", interval=1)
recurring_schedule = Schedule.create(workspace, name="MonthlySchedule", 
                            description="Based on time",
                            pipeline_id=published_pipeline.id, 
                            experiment_name=experiment_name, 
                            recurrence=recurrence)

In [141]:
recurring_schedule


Name,Id,Status,Pipeline Id,Pipeline Endpoint Id,Recurrence Details
MonthlySchedule,af9d2e20-1cf5-441b-ac90-ca7ea750f95a,Active,7cd1356f-dc16-4df6-be0d-622e3da5ba63,,Runs every Month


## Pipeline Schedule Management

- Enable / Disable

enable(wait_for_provisioning=False, wait_timeout=3600)

disable(wait_for_provisioning=False, wait_timeout=3600)

- Get (Set the schedule to 'Active' and available to run)

get(workspace, id, _workflow_provider=None, _service_endpoint=None)

- List ( Get all schedules in the current workspace)

list(workspace, active_only=True, pipeline_id=None, pipeline_endpoint_id=None, _workflow_provider=None, _service_endpoint=None)


https://github.com/Azure/MachineLearningNotebooks/blob/master/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-setup-schedule-for-a-published-pipeline.ipynb

