# MLOps with AKS

<b>Objective</b>


1. To create a simple model to predict the category for IRIS dataset.
2. To use Azure's capability to find the following:
 - Use dataset drifter to identiy whether there is a drift in the dataset.
 - If there is a drift then create an automatic email alert for the same.
 - Model profiling to get an approximation of the resources that might be required for our model.
 - Deploy the model on AKS with the above profiling values.
 - Use multiple versions of the best model to introduce AB testing kind of scenarios.
 - Monitoring using log analytics and application insights.


## Part - I

1. Download the IRIS dataset from the web.
2. Convert the labels to numeric representations.
3. Push the cleaned dataset to a new folder.
4. Upload it to the default datastore of ML workspace.
5. Register the dataset.
6. Access the dataset to check everything is working as expected.

In [1]:
!mkdir ./dataset
!mkdir ./dataset/inputs
!mkdir ./dataset/processed_data
!mkdir ./dataset/profile-data
!mkdir ./model

In [2]:
!wget -O ./dataset/inputs/iris_raw.csv https://gist.githubusercontent.com/netj/8836201/raw/6f9306ad21398ea43cba4f7d537619d0e07d5ae3/iris.csv

--2022-09-08 15:32:16--  https://gist.githubusercontent.com/netj/8836201/raw/6f9306ad21398ea43cba4f7d537619d0e07d5ae3/iris.csv
Resolving gist.githubusercontent.com (gist.githubusercontent.com)... 185.199.109.133, 185.199.111.133, 185.199.110.133, ...
Connecting to gist.githubusercontent.com (gist.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 3975 (3.9K) [text/plain]
Saving to: ‘./dataset/inputs/iris_raw.csv’


2022-09-08 15:32:17 (57.5 MB/s) - ‘./dataset/inputs/iris_raw.csv’ saved [3975/3975]



In [3]:
import pandas as pd
from sklearn.model_selection import train_test_split

raw_data = pd.read_csv('./dataset/inputs/iris_raw.csv') #The shape of the data is (150,5) 4 features + 1 label
i2l = dict(enumerate(raw_data.variety.unique().tolist()))
l2i = {k:i for i, k in i2l.items()}
raw_data.variety = raw_data.variety.map(lambda x : l2i[x])

display(raw_data.head(10))

Unnamed: 0,sepal.length,sepal.width,petal.length,petal.width,variety
0,5.1,3.5,1.4,0.2,0
1,4.9,3.0,1.4,0.2,0
2,4.7,3.2,1.3,0.2,0
3,4.6,3.1,1.5,0.2,0
4,5.0,3.6,1.4,0.2,0
5,5.4,3.9,1.7,0.4,0
6,4.6,3.4,1.4,0.3,0
7,5.0,3.4,1.5,0.2,0
8,4.4,2.9,1.4,0.2,0
9,4.9,3.1,1.5,0.1,0


In [4]:
raw_data.to_csv('./dataset/processed_data/iris_data_base.csv', index = False)

#(or)
"""
with open('./auth/azure_details.txt', 'r') as f:
    vals = f.readlines()

subscription_id = vals[0].strip('\n')
resource_group = vals[1].strip('\n')
workspace_name = vals[2] 
workspace = Workspace(subscription_id, resource_group, workspace_name)
"""

In [5]:
from azureml.core import Workspace

ws = Workspace.from_config() #we are in the same workspace

In [6]:
from azureml.core import Dataset

datastore = ws.get_default_datastore() # points to the native Azure ML Storage

datastore.upload(src_dir = './dataset/processed_data', target_path = 'iris_data_base')

#from azureml.core.datapath import DataPath
#Datastore.get(workspace, 'workspaceblobstore')
#iris_dataset = Dataset.File.upload_directory('./dataset/inputs/', DataPath(datastore, 'iris-base-data'), pattern = '*.csv') # for files dataset 

"Datastore.upload" is deprecated after version 1.0.69. Please use "Dataset.File.upload_directory" to upload your files             from a local directory and create FileDataset in single method call. See Dataset API change notice at https://aka.ms/dataset-deprecation.


Uploading an estimated of 1 files
Uploading ./dataset/processed_data/iris_data_base.csv
Uploaded ./dataset/processed_data/iris_data_base.csv, 1 files out of an estimated total of 1
Uploaded 1 files


$AZUREML_DATAREFERENCE_fcd5f9cf77364c2ba6f90fd1745f58f6

In [8]:
#Registering the dataset
dataset =  Dataset.Tabular.from_delimited_files(datastore.path('iris_data_base/iris_data_base.csv'))

dataset.register(workspace=ws, name='iris_data_base', description='iris-base-dataset')

{
  "source": [
    "('workspaceblobstore', 'iris_data_base/iris_data_base.csv')"
  ],
  "definition": [
    "GetDatastoreFiles",
    "ParseDelimited",
    "DropColumns",
    "SetColumnTypes"
  ],
  "registration": {
    "id": "da8a1654-158b-4b04-9536-af4da1f71fa0",
    "name": "iris_data_base",
    "version": 1,
    "description": "iris-base-dataset",
    "workspace": "Workspace.create(name='mlops-sanjeev', subscription_id='0d1442c1-d386-4505-9abe-0bedfd63701e', resource_group='mlops')"
  }
}

In [9]:
#retrieving the dataset from registered datasets
dataset = Dataset.get_by_name(ws, name='iris_data_base')
df = dataset.to_pandas_dataframe()
print(dataset.name, dataset.version)
display(df.head(10))

iris_data_base 1


Unnamed: 0,sepal.length,sepal.width,petal.length,petal.width,variety
0,5.1,3.5,1.4,0.2,0
1,4.9,3.0,1.4,0.2,0
2,4.7,3.2,1.3,0.2,0
3,4.6,3.1,1.5,0.2,0
4,5.0,3.6,1.4,0.2,0
5,5.4,3.9,1.7,0.4,0
6,4.6,3.4,1.4,0.3,0
7,5.0,3.4,1.5,0.2,0
8,4.4,2.9,1.4,0.2,0
9,4.9,3.1,1.5,0.1,0


## Part - II

1. Train a Random forest model
2. Train a decision tree model
3. Register the above models
4. Create a scoring file
5. Do model profiling

In [10]:
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier

x, y = df.iloc[:,:-1], df.iloc[:,-1]
X_train, X_test, y_train, y_test = train_test_split(x, y , random_state = 1, test_size = 0.3, stratify = y)

In [11]:
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score


def printMetrics(model, test_data, y_labels):
    predicted = model.predict(test_data)
    acc = accuracy_score(y_labels, predicted)
    f1 = f1_score(y_labels, predicted, average = 'macro')
    precision = precision_score(y_labels, predicted, average = 'macro')
    recall = recall_score(y_labels, predicted, average = 'macro')
    print('Accuracy', acc)
    print('F1', f1)
    print('Precision', precision)
    print('Recall', recall)

In [12]:
## Searching the best parameters using the GridSearchCV for RFC and DTC

# Decision Tree Classifier
from sklearn.model_selection import GridSearchCV

params = {'criterion' : ('gini', 'entropy'), 'max_depth' : [i for i in range(2, 9)]}
dt = DecisionTreeClassifier(random_state = 198)
grid_ = GridSearchCV(dt, params, n_jobs = -1)
grid_.fit(X_train, y_train)
dt_best = DecisionTreeClassifier(criterion = grid_.best_params_['criterion'], max_depth = grid_.best_params_['max_depth'], random_state =
                                198)
dt_best.fit(X_train, y_train)

print('Decision Tree classifier')
print('-' * 20)
print('Params:')
print("Criterion", grid_.best_params_['criterion'])
print("Max Depth", grid_.best_params_['max_depth'])
print('-' * 20)
printMetrics(dt_best, X_test, y_test)

Decision Tree classifier
--------------------
Params:
Criterion gini
Max Depth 4
--------------------
Accuracy 0.9777777777777777
F1 0.9777530589543938
Precision 0.9791666666666666
Recall 0.9777777777777779


In [13]:
# Random forest classifier

params = {'n_estimators' : [i for i in range(100, 200, 10)], 'max_depth' : [i for i in range(2, 9)]}

rfc = RandomForestClassifier(random_state = 198)
grid_ = GridSearchCV(rfc, params, n_jobs = -1)
grid_.fit(X_train, y_train)

rfc_best = RandomForestClassifier(n_estimators = grid_.best_params_['n_estimators'], max_depth = grid_.best_params_['max_depth'], random_state =
                                198)
rfc_best.fit(X_train, y_train)

print('Random Forest classifier')
print('-' * 20)
print('Params:')
print("N estimators", grid_.best_params_['n_estimators'])
print("Max Depth", grid_.best_params_['max_depth'])
print('-' * 20)
printMetrics(rfc_best, X_test, y_test)

Random Forest classifier
--------------------
Params:
N estimators 100
Max Depth 3
--------------------
Accuracy 0.9777777777777777
F1 0.9777530589543938
Precision 0.9791666666666666
Recall 0.9777777777777779


In [14]:
#Converting the essentials into onnx format

from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType

initial_type = [('float_input', FloatTensorType([None, 4]))] #4 represents the number of features

onx = convert_sklearn(dt_best, initial_types=initial_type)

with open("./model/iris_dt.onnx", "wb") as f:
    f.write(onx.SerializeToString())

onx = convert_sklearn(dt_best, initial_types=initial_type)

with open("./model/iris_rfc.onnx", "wb") as f:
    f.write(onx.SerializeToString())

The maximum opset needed by this model is only 1.
The maximum opset needed by this model is only 9.
The maximum opset needed by this model is only 1.
The maximum opset needed by this model is only 9.


In [15]:
from azureml.core.model import Model

model_rfc = Model.register(model_path = './model/iris_rfc.onnx', model_name = 'iris-predictor-rfc', tags = {'model_version' : 1},
                     description = 'RFC for classifying IRIS', workspace = ws)
model_dt = Model.register(model_path = './model/iris_dt.onnx', model_name = 'iris-predictor-dt', tags = {'model_version' : 1},
                      description = 'DT for classifying IRIS', workspace = ws)

Registering model iris-predictor-rfc
Registering model iris-predictor-dt


### Testing model - local inferencing

In [16]:
import onnxruntime as rt
import numpy as np

#iris_predictor = Model(ws, 'iris-predictor', version = 1).download(exist_ok=True) #defaulted under ./models/model_name.onnx

sess = rt.InferenceSession("./model/iris_rfc.onnx")

input_name = sess.get_inputs()[0].name
label_name = sess.get_outputs()[0].name

test_data = np.array([[5.1, 3.5, 1.4, 0.2], [1.1, 2.8, 4.4, 1.2]])

preds = sess.run([label_name], {input_name: test_data.astype(np.float32)})

print(preds)

[array([0, 1], dtype=int64)]


2022-09-08 15:36:28.643534800 [W:onnxruntime:, execution_frame.cc:806 VerifyOutputSizes] Expected shape from model of {1} does not match actual shape of {2} for output output_label


## Deploying the model as a local web service

In [17]:
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core import Environment

env = CondaDependencies.create(pip_packages=["numpy", "pandas", "onnxruntime", "joblib", "azureml-core", "azureml-monitoring", "azureml-defaults", 'Jinja2<3.1', "scikit-learn==0.22.2.post1", "inference-schema", "inference-schema[numpy-support]"])

with open('./model/env.yml', 'w') as f:
    f.write(env.serialize_to_string())

iris_env = Environment.from_conda_specification(name = 'iris-env', file_path = "./model/env.yml")
    
iris_env.register(workspace=ws)

{
    "assetId": "azureml://locations/centralindia/workspaces/2cdb4132-60a6-427c-8b1a-10c400bf94cb/environments/iris-env/versions/1",
    "databricks": {
        "eggLibraries": [],
        "jarLibraries": [],
        "mavenLibraries": [],
        "pypiLibraries": [],
        "rcranLibraries": []
    },
    "docker": {
        "arguments": [],
        "baseDockerfile": null,
        "baseImage": "mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:20220708.v1",
        "baseImageRegistry": {
            "address": null,
            "password": null,
            "registryIdentity": null,
            "username": null
        },
        "buildContext": null,
        "enabled": false,
        "platform": {
            "architecture": "amd64",
            "os": "Linux"
        },
        "sharedVolumes": true,
        "shmSize": null
    },
    "environmentVariables": {
        "EXAMPLE_ENV_VAR": "EXAMPLE_VALUE"
    },
    "inferencingStackVersion": null,
    "name": "iris-env",
    "python"

In [18]:
from azureml.core.model import InferenceConfig

inference_config = InferenceConfig(entry_script="./score_v1.py",
                                   environment=iris_env)

In [19]:
!sudo service docker start

In [27]:
from azureml.core.webservice import LocalWebservice


model_dt_iris = Model(ws, 'iris-predictor-dt')
# This is optional, if not provided Docker will choose a random unused port.
deployment_config = LocalWebservice.deploy_configuration(port=6601)

local_service = Model.deploy(ws, "test", [model_dt_iris], inference_config, deployment_config)

local_service.wait_for_deployment()

Downloading model iris-predictor-dt:1 to /tmp/azureml_ufdr5ogh/iris-predictor-dt/1
Generating Docker build context.
Package creation Succeeded
Logging into Docker registry 2cdb413260a6427c8b1a10c400bf94cb.azurecr.io
Logging into Docker registry 2cdb413260a6427c8b1a10c400bf94cb.azurecr.io
Building Docker image from Dockerfile...
Step 1/5 : FROM 2cdb413260a6427c8b1a10c400bf94cb.azurecr.io/azureml/azureml_44403a36f69d445806d69e506c1210a0
 ---> b649ffae7f00
Step 2/5 : COPY azureml-app /var/azureml-app
 ---> 713bc8f27aec
Step 3/5 : RUN mkdir -p '/var/azureml-app' && echo eyJhY2NvdW50Q29udGV4dCI6eyJzdWJzY3JpcHRpb25JZCI6IjBkMTQ0MmMxLWQzODYtNDUwNS05YWJlLTBiZWRmZDYzNzAxZSIsInJlc291cmNlR3JvdXBOYW1lIjoibWxvcHMiLCJhY2NvdW50TmFtZSI6Im1sb3BzLXNhbmplZXYiLCJ3b3Jrc3BhY2VJZCI6IjJjZGI0MTMyLTYwYTYtNDI3Yy04YjFhLTEwYzQwMGJmOTRjYiJ9LCJtb2RlbHMiOnt9LCJtb2RlbHNJbmZvIjp7fX0= | base64 --decode > /var/azureml-app/model_config_map.json
 ---> Running in 2e8614281fbe
 ---> 1e5243863941
Step 4/5 : RUN mv '/var/azurem

In [29]:
import requests
import json

#input_data = pd.DataFrame({'sepal.length': [5.1, 1.1], 'sepal.width': [3.5, 2.8], 'petal.length' : [1.4, 4.4], 'petal.width' : [0.2, 1.2]})
input_data = {'sepal.length': [5.1, 1.1], 'sepal.width': [3.5, 2.8], 'petal.length' : [1.4, 4.4], 'petal.width' : [0.2, 1.2] }
                          #[{'sepal.length': [5.1, 1.1], 'sepal.width': [3.5, 2.8], 'petal.length' : [1.4, 4.4], 'petal.width' : [0.2, 1.2]}]}} #{'data' : {'sepal.length': [5.1, 1.1], 'sepal.width': [3.5, 2.8], 'petal.length' : [1.4, 4.4], 'petal.width' : [0.2, 1.2]] }

request = {"Inputs": {"data" : [[5.1, 3.5, 1.4, 0.2], [1.1, 2.8, 4.4, 1.2] ] }}

headers = {'Content-Type': 'application/json', 'Accept': 'text/plain'}

scoring_uri = "http://localhost:6601/score"
resp = requests.post(scoring_uri, json.dumps(request), headers=headers)

print("prediction:", resp.text)

prediction: [0, 1]


https://stackoverflow.com/questions/72376401/making-predictions-with-azure-machine-learning-with-new-data-that-contains-heade
https://stackoverflow.com/questions/64257530/import-data-and-python-scripts-in-azure-ml-entry-script-when-deploying-models
https://docs.microsoft.com/en-us/answers/questions/746784/azure-ml-studio-error-while-testing-real-time-endp.html

## Model profiling

In [31]:
import json
from azureml.core import Datastore
from azureml.core.dataset import Dataset
from azureml.data import dataset_type_definitions
from azureml.core import Workspace

ws = Workspace.from_config()

input_json = {"Inputs": {"data" : [[5.1, 3.5, 1.4, 0.2], [1.1, 2.8, 4.4, 1.2] ] }}

serialized_input_json = json.dumps(input_json)
dataset_content = []

for i in range(100):
     dataset_content.append(serialized_input_json)


dataset_content = '\n'.join(dataset_content)

with open('./dataset/profile-data/model-profiling-data-v1.txt', 'w') as f:
    f.write(dataset_content)


# upload the txt file created above to the Datastore and create a dataset from it
datastore = ws.get_default_datastore() # points to the native Azure ML Storage
datastore.upload(src_dir = 'dataset/profile-data/', target_path = 'iris_model_profiling')

Uploading an estimated of 1 files
Uploading dataset/profile-data/model-profiling-data-v1.txt
Uploaded dataset/profile-data/model-profiling-data-v1.txt, 1 files out of an estimated total of 1
Uploaded 1 files


$AZUREML_DATAREFERENCE_2952426701534d109f781c0c5face180

In [32]:
sample_request_data = Dataset.Tabular.from_delimited_files(datastore.path('iris_model_profiling/model-profiling-data-v1.txt'), separator='\n',
                                                           infer_column_types=True,
                                                          header=dataset_type_definitions.PromoteHeadersBehavior.NO_HEADERS)

sample_request_data = sample_request_data.register(workspace=ws, name='iris-profiling-data', create_new_version=True)

In [34]:
from azureml.core.model import InferenceConfig, Model
from azureml.core.dataset import Dataset
from azureml.core import Workspace, Environment

ws = Workspace.from_config()

iris_env = Environment.get(ws, name = 'iris-env')

model = Model(ws, name = 'iris-predictor-dt')
inference_config = InferenceConfig(entry_script='score_v1.py',
                                   environment = iris_env)

input_dataset = Dataset.get_by_name(workspace=ws, name='iris-profiling-data') #dataset should be in the string format hence the above exercise
profile = Model.profile(ws,
            'iris-model-profile-2',
            [model],
            inference_config,
            input_dataset=input_dataset)

profile.wait_for_completion(True)

# see the result
details = profile.get_details()

Running..................................................
Succeeded


In [35]:
details

{'name': 'iris-model-profile-2',
 'createdTime': '2022-09-08T16:17:09.527822+00:00',
 'state': 'Succeeded',
 'requestedCpu': 3.5,
 'requestedMemoryInGB': 15.0,
 'requestedQueriesPerSecond': 0,
 'maxUtilizedMemoryInGB': 0.193830912,
 'totalQueries': 100.0,
 'successQueries': 100.0,
 'successRate': 100.0,
 'averageLatencyInMs': 4.7544,
 'latencyPercentile50InMs': 3.14,
 'latencyPercentile90InMs': 6.4,
 'latencyPercentile95InMs': 9.52,
 'latencyPercentile99InMs': 41.2,
 'latencyPercentile999InMs': 41.2,
 'maxUtilizedCpu': 0.022,
 'measuredQueriesPerSecond': 210.33148241628805,
 'recommendedMemoryInGB': 0.5,
 'recommendedCpu': 0.5}

## AKS With Data Drift Monitor

In [36]:
from azureml.core.compute import ComputeTarget, AksCompute
compute_config = AksCompute.provisioning_configuration(location='centralindia', cluster_purpose='DevTest')
cluster = ComputeTarget.create(ws, 'iris-aks', compute_config)
cluster.wait_for_completion(show_output=True)

InProgress..........................................................................
SucceededProvisioning operation finished, operation "Succeeded"


In [37]:
from azureml.core.webservice import AksWebservice, Webservice, AksEndpoint
from azureml.core.model import Model
from azureml.core.compute import AksCompute

aks_target = AksCompute(ws, 'iris-aks')
model_v1 = Model(ws, 'iris-predictor-dt')

# If deploying to a cluster configured for dev/test, ensure that it was created with enough
# cores and memory to handle this deployment configuration. Note that memory is also used by
# things such as dependencies and AML components.
deployment_config = AksEndpoint.deploy_configuration(cpu_cores = 1, memory_gb = 0.5)

service = Model.deploy(ws, 'iris-endpoint', [model_v1], inference_config, deployment_config, aks_target)
service.wait_for_deployment(show_output = True)

print(service.state)
print(service.get_logs())

Tips: You can try get_logs(): https://aka.ms/debugimage#dockerlog or local deployment: https://aka.ms/debugimage#debug-locally to debug if deployment takes longer than 10 minutes.
Running......
2022-09-08 16:32:24+00:00 Registering the environment.
2022-09-08 16:32:26+00:00 Use the existing image for iris-endpoint..
2022-09-08 16:35:37+00:00 Creating resources in AKS..
2022-09-08 16:35:38+00:00 Submitting deployment to compute.
2022-09-08 16:35:38+00:00 Checking the status of deployment iris-endpoint..
2022-09-08 16:45:20+00:00 Checking the status of inference endpoint iris-endpoint.
Succeeded
AKSENDPOINT service creation operation finished, operation "Succeeded"
Healthy
{"iris-endpoint":"/bin/bash: /azureml-envs/azureml_1867cd816376033961c66f81bff348cf/lib/libtinfo.so.6: no version information available (required by /bin/bash)\n/bin/bash: /azureml-envs/azureml_1867cd816376033961c66f81bff348cf/lib/libtinfo.so.6: no version information available (required by /bin/bash)\n/bin/bash: /azur

In [40]:
model_v2 = Model(ws, 'iris-predictor-dt')
service.create_version( version_name= 'version-2', inference_config=inference_config, models=[model_v2], description="With RF", traffic_percentile=50)
service.wait_for_deployment(show_output=True)
service.update_version(version_name =  'Version-1', is_default = True, traffic_percentile = 50, is_control_version_type = True)
service.wait_for_deployment(show_output=True)

WebserviceException: WebserviceException:
	Message: There is a deployment operation in flight for the Service: iris-endpoint
	InnerException None
	ErrorResponse 
{
    "error": {
        "message": "There is a deployment operation in flight for the Service: iris-endpoint"
    }
}

In [42]:
import json
test_sample = json.dumps({"Inputs": {"data" : [[5.1, 3.5, 1.4, 0.2], [1.1, 2.8, 4.4, 1.2] ] }})

test_sample_encoded = bytes(test_sample, encoding='utf8')
prediction = service.run(input_data=test_sample_encoded)
print(prediction)

[0, 1]
