diff --git a/.env.example b/.env.example index ea1338ae..f6b2fe58 100644 --- a/.env.example +++ b/.env.example @@ -7,13 +7,13 @@ SP_APP_ID = '' SP_APP_SECRET = '' RESOUCE_GROUP = 'mlops-rg' -# Mock build/release ID for local testing - update ReleaseID each "release" +# Mock build/release ID for local testing BUILD_BUILDID = '001' -RELEASE_RELEASEID = '001' # Azure ML Workspace Variables -WORKSPACE_NAME = '' +WORKSPACE_NAME = 'aml-workspace' EXPERIMENT_NAME = '' +SCRIPT_FOLDER = './' # AML Compute Cluster Config AML_COMPUTE_CLUSTER_NAME = 'train-cluster' @@ -36,4 +36,4 @@ SOURCES_DIR_TRAIN = 'code' DB_CLUSTER_ID = '' # Optional. Container Image name for image creation -IMAGE_NAME = 'ml-trained' \ No newline at end of file +IMAGE_NAME = 'mltrained' \ No newline at end of file diff --git a/.pipelines/azdo-ci-build-train.yml b/.pipelines/azdo-ci-build-train.yml index 09c52d95..1e729357 100644 --- a/.pipelines/azdo-ci-build-train.yml +++ b/.pipelines/azdo-ci-build-train.yml @@ -21,7 +21,7 @@ stages: jobs: - job: "Model_CI_Pipeline" displayName: "Model CI Pipeline" - pool: + pool: vmImage: 'ubuntu-latest' container: mcr.microsoft.com/mlops/python:latest timeoutInMinutes: 0 @@ -37,17 +37,47 @@ stages: - stage: 'Trigger_AML_Pipeline' displayName: 'Train, evaluate, register model via previously published AML pipeline' jobs: - - job: "Invoke_Model_Pipeline" + - job: "Get_Pipeline_ID" condition: and(succeeded(), eq(coalesce(variables['auto-trigger-training'], 'true'), 'true')) - displayName: "Invoke Model Pipeline and evaluate results to register" - pool: + displayName: "Get Pipeline ID for execution" + pool: vmImage: 'ubuntu-latest' container: mcr.microsoft.com/mlops/python:latest timeoutInMinutes: 0 steps: - script: | python $(Build.SourcesDirectory)/ml_service/pipelines/run_train_pipeline.py - displayName: 'Trigger Training Pipeline' + source $(Build.SourcesDirectory)/tmp.sh + echo "##vso[task.setvariable variable=AMLPIPELINEID;isOutput=true]$AMLPIPELINE_ID" + name: 'getpipelineid' + displayName: 'Get Pipeline ID' + env: + SP_APP_SECRET: '$(SP_APP_SECRET)' + - job: "Run_ML_Pipeline" + dependsOn: "Get_Pipeline_ID" + displayName: "Trigger ML Training Pipeline" + pool: server + variables: + AMLPIPELINE_ID: $[ dependencies.Get_Pipeline_ID.outputs['getpipelineid.AMLPIPELINEID'] ] + steps: + - task: ms-air-aiagility.vss-services-azureml.azureml-restApi-task.MLPublishedPipelineRestAPITask@0 + displayName: 'Invoke ML pipeline' + inputs: + azureSubscription: '$(WORKSPACE_SVC_CONNECTION)' + PipelineId: '$(AMLPIPELINE_ID)' + ExperimentName: '$(EXPERIMENT_NAME)' + PipelineParameters: '"model_name": "sklearn_regression_model.pkl"' + - job: "Training_Run_Report" + dependsOn: "Run_ML_Pipeline" + displayName: "Determine if evaluation succeeded and new model is registered" + pool: + vmImage: 'ubuntu-latest' + container: mcr.microsoft.com/mlops/python:latest + timeoutInMinutes: 0 + steps: + - script: | + python $(Build.SourcesDirectory)/code/register/register_model.py --build_id $(Build.BuildId) --validate True + displayName: 'Check if new model registered' env: SP_APP_SECRET: '$(SP_APP_SECRET)' - task: CopyFiles@2 diff --git a/.pipelines/azdo-variables.yml b/.pipelines/azdo-variables.yml index 64a42d5b..5d7da750 100644 --- a/.pipelines/azdo-variables.yml +++ b/.pipelines/azdo-variables.yml @@ -24,7 +24,7 @@ variables: value: '1' # AML Pipeline Config - name: TRAINING_PIPELINE_NAME - value: 'Training Pipeline' + value: 'Training-Pipeline' - name: MODEL_PATH value: '' - name: EVALUATE_SCRIPT_PATH @@ -34,7 +34,7 @@ variables: - name: SOURCES_DIR_TRAIN value: code - name: IMAGE_NAME - value: '' + value: 'mltrained' # Optional. Used by a training pipeline with R on Databricks - name: DB_CLUSTER_ID value: '' \ No newline at end of file diff --git a/README.md b/README.md index 264643ea..e622ba75 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ description: "Code which demonstrates how to set up and operationalize an MLOps # MLOps with Azure ML -[![Build Status](https://dev.azure.com/customai/DevopsForAI-AML/_apis/build/status/Build%20%26%20Train?branchName=master)](https://dev.azure.com/customai/DevopsForAI-AML/_build/latest?definitionId=34&branchName=master) +[![Build Status](https://aidemos.visualstudio.com/MLOps/_apis/build/status/microsoft.MLOpsPython-CI?branchName=master)](https://aidemos.visualstudio.com/MLOps/_build/latest?definitionId=127&branchName=master) MLOps will help you to understand how to build the Continuous Integration and Continuous Delivery pipeline for a ML/AI project. We will be using the Azure DevOps Project for build and release/deployment pipelines along with Azure ML services for model retraining pipeline, model management and operationalization. diff --git a/code/evaluate/evaluate_model.py b/code/evaluate/evaluate_model.py index ec5dc5e0..0959d36b 100644 --- a/code/evaluate/evaluate_model.py +++ b/code/evaluate/evaluate_model.py @@ -24,21 +24,54 @@ POSSIBILITY OF SUCH DAMAGE. """ import os -from azureml.core import Model, Run +from azureml.core import Model, Run, Workspace, Experiment import argparse +from azureml.core.authentication import ServicePrincipalAuthentication +import traceback - -# Get workspace run = Run.get_context() -exp = run.experiment -ws = run.experiment.workspace +if (run.id.startswith('OfflineRun')): + from dotenv import load_dotenv + # For local development, set values in this section + load_dotenv() + workspace_name = os.environ.get("WORKSPACE_NAME") + experiment_name = os.environ.get("EXPERIMENT_NAME") + resource_group = os.environ.get("RESOURCE_GROUP") + subscription_id = os.environ.get("SUBSCRIPTION_ID") + tenant_id = os.environ.get("TENANT_ID") + model_name = os.environ.get("MODEL_NAME") + app_id = os.environ.get('SP_APP_ID') + app_secret = os.environ.get('SP_APP_SECRET') + build_id = os.environ.get('BUILD_BUILDID') + service_principal = ServicePrincipalAuthentication( + tenant_id=tenant_id, + service_principal_id=app_id, + service_principal_password=app_secret) + aml_workspace = Workspace.get( + name=workspace_name, + subscription_id=subscription_id, + resource_group=resource_group, + auth=service_principal + ) + ws = aml_workspace + exp = Experiment(ws, experiment_name) + run_id = "e78b2c27-5ceb-49d9-8e84-abe7aecf37d5" +else: + exp = run.experiment + ws = run.experiment.workspace + run_id = 'amlcompute' parser = argparse.ArgumentParser("evaluate") parser.add_argument( - "--release_id", + "--build_id", + type=str, + help="The Build ID of the build triggering this pipeline run", +) +parser.add_argument( + "--run_id", type=str, - help="The ID of the release triggering this pipeline run", + help="Training run ID", ) parser.add_argument( "--model_name", @@ -46,68 +79,64 @@ help="Name of the Model", default="sklearn_regression_model.pkl", ) -args = parser.parse_args() -print("Argument 1: %s" % args.release_id) -print("Argument 2: %s" % args.model_name) +args = parser.parse_args() +if (args.build_id is not None): + build_id = args.build_id +if (args.run_id is not None): + run_id = args.run_id +if (run_id == 'amlcompute'): + run_id = run.parent.id model_name = args.model_name -release_id = args.release_id +metric_eval = "mse" +run.tag("BuildId", value=build_id) -# Paramaterize the matrics on which the models should be compared +# Paramaterize the matrices on which the models should be compared # Add golden data set on which all the model performance can be evaluated - -all_runs = exp.get_runs( - properties={"release_id": release_id, "run_type": "train"}, - include_children=True - ) -new_model_run = next(all_runs) -new_model_run_id = new_model_run.id -print(f'New Run found with Run ID of: {new_model_run_id}') - try: - # Get most recently registered model, we assume that - # is the model in production. - # Download this model and compare it with the recently - # trained model by running test with same data set. model_list = Model.list(ws) - production_model = next( - filter( - lambda x: x.created_time == max( - model.created_time for model in model_list), - model_list, + if (len(model_list) > 0): + production_model = next( + filter( + lambda x: x.created_time == max( + model.created_time for model in model_list), + model_list, + ) ) - ) - production_model_run_id = production_model.tags.get("run_id") - run_list = exp.get_runs() + production_model_run_id = production_model.run_id - # Get the run history for both production model and - # newly trained model and compare mse - production_model_run = Run(exp, run_id=production_model_run_id) - new_model_run = Run(exp, run_id=new_model_run_id) + # Get the run history for both production model and + # newly trained model and compare mse + production_model_run = Run(exp, run_id=production_model_run_id) + new_model_run = run.parent + print("Production model run is", production_model_run) - production_model_mse = production_model_run.get_metrics().get("mse") - new_model_mse = new_model_run.get_metrics().get("mse") - print( - "Current Production model mse: {}, New trained model mse: {}".format( - production_model_mse, new_model_mse - ) - ) + production_model_mse = \ + production_model_run.get_metrics().get(metric_eval) + new_model_mse = new_model_run.get_metrics().get(metric_eval) + if (production_model_mse is None or new_model_mse is None): + print("Unable to find", metric_eval, "metrics, " + "exiting evaluation") + run.parent.cancel() + else: + print( + "Current Production model mse: {}, " + "New trained model mse: {}".format( + production_model_mse, new_model_mse + ) + ) - promote_new_model = False - if new_model_mse < production_model_mse: - promote_new_model = True - print("New trained model performs better, thus it will be registered") + if (new_model_mse < production_model_mse): + print("New trained model performs better, " + "thus it should be registered") + else: + print("New trained model metric is less than or equal to " + "production model so skipping model registration.") + run.parent.cancel() + else: + print("This is the first model, " + "thus it should be registered") except Exception: - promote_new_model = True - print("This is the first model to be trained, \ - thus nothing to evaluate for now") - - -# Writing the run id to /aml_config/run_id.json -if promote_new_model: - model_path = os.path.join('outputs', model_name) - new_model_run.register_model( - model_name=model_name, - model_path=model_path, - properties={"release_id": release_id}) - print("Registered new model!") + traceback.print_exc(limit=None, file=None, chain=True) + print("Something went wrong trying to evaluate. Exiting.") + raise diff --git a/code/register/register_model.py b/code/register/register_model.py index ae2b8216..17388d62 100644 --- a/code/register/register_model.py +++ b/code/register/register_model.py @@ -24,88 +24,148 @@ POSSIBILITY OF SUCH DAMAGE. """ import os -import json import sys -from azureml.core import Run import argparse +import traceback +from azureml.core import Run, Experiment, Workspace +from azureml.core.model import Model as AMLModel +from azureml.core.authentication import ServicePrincipalAuthentication -from azureml.core.authentication import AzureCliAuthentication -cli_auth = AzureCliAuthentication() +def main(): -# Get workspace -# ws = Workspace.from_config(auth=cli_auth, path='./') + run = Run.get_context() + if (run.id.startswith('OfflineRun')): + from dotenv import load_dotenv + sys.path.append(os.path.abspath("./code/util")) # NOQA: E402 + from model_helper import get_model_by_build_id + # For local development, set values in this section + load_dotenv() + workspace_name = os.environ.get("WORKSPACE_NAME") + experiment_name = os.environ.get("EXPERIMENT_NAME") + resource_group = os.environ.get("RESOURCE_GROUP") + subscription_id = os.environ.get("SUBSCRIPTION_ID") + tenant_id = os.environ.get("TENANT_ID") + model_name = os.environ.get("MODEL_NAME") + app_id = os.environ.get('SP_APP_ID') + app_secret = os.environ.get('SP_APP_SECRET') + build_id = os.environ.get('BUILD_BUILDID') + service_principal = ServicePrincipalAuthentication( + tenant_id=tenant_id, + service_principal_id=app_id, + service_principal_password=app_secret) + aml_workspace = Workspace.get( + name=workspace_name, + subscription_id=subscription_id, + resource_group=resource_group, + auth=service_principal + ) + ws = aml_workspace + exp = Experiment(ws, experiment_name) + run_id = "bd184a18-2ac8-4951-8e78-e290bef3b012" + else: + sys.path.append(os.path.abspath("./util")) # NOQA: E402 + from model_helper import get_model_by_build_id + ws = run.experiment.workspace + exp = run.experiment + run_id = 'amlcompute' -run = Run.get_context() -exp = run.experiment -ws = run.experiment.workspace - -parser = argparse.ArgumentParser("register") -parser.add_argument( - "--config_suffix", type=str, help="Datetime suffix for json config files" -) -parser.add_argument( - "--json_config", - type=str, - help="Directory to write all the intermediate json configs", -) -parser.add_argument( - "--model_name", - type=str, - help="Name of the Model", - default="sklearn_regression_model.pkl", -) + parser = argparse.ArgumentParser("register") + parser.add_argument( + "--build_id", + type=str, + help="The Build ID of the build triggering this pipeline run", + ) + parser.add_argument( + "--run_id", + type=str, + help="Training run ID", + ) + parser.add_argument( + "--model_name", + type=str, + help="Name of the Model", + default="sklearn_regression_model.pkl", + ) + parser.add_argument( + "--validate", + type=str, + help="Set to true to only validate if model is registered for run", + default=False, + ) -args = parser.parse_args() + args = parser.parse_args() + if (args.build_id is not None): + build_id = args.build_id + if (args.run_id is not None): + run_id = args.run_id + if (run_id == 'amlcompute'): + run_id = run.parent.id + if (args.validate is not None): + validate = args.validate + model_name = args.model_name -print("Argument 1: %s" % args.config_suffix) -print("Argument 2: %s" % args.json_config) + if (validate): + try: + get_model_by_build_id(model_name, build_id, exp.workspace) + print("Model was registered for this build.") + except Exception as e: + print(e) + print("Model was not registered for this run.") + sys.exit(1) + else: + if (build_id is None): + register_aml_model(model_name, exp, run_id) + else: + run.tag("BuildId", value=build_id) + register_aml_model(model_name, exp, run_id, build_id) -if not (args.json_config is None): - os.makedirs(args.json_config, exist_ok=True) - print("%s created" % args.json_config) -evaluate_run_id_json = "run_id_{}.json".format(args.config_suffix) -evaluate_output_path = os.path.join(args.json_config, evaluate_run_id_json) -model_name = args.model_name +def model_already_registered(model_name, exp, run_id): + model_list = AMLModel.list(exp.workspace, name=model_name, run_id=run_id) + if len(model_list) >= 1: + e = ("Model name:", model_name, "in workspace", + exp.workspace, "with run_id ", run_id, "is already registered.") + print(e) + raise Exception(e) + else: + print("Model is not registered for this run.") -# Get the latest evaluation result -try: - with open(evaluate_output_path) as f: - config = json.load(f) - if not config["run_id"]: - raise Exception( - "No new model to register as production model perform better") -except Exception: - print("No new model to register as production model perform better") - sys.exit(0) -run_id = config["run_id"] -experiment_name = config["experiment_name"] -# exp = Experiment(workspace=ws, name=experiment_name) +def register_aml_model(model_name, exp, run_id, build_id: str = 'none'): + try: + if (build_id != 'none'): + model_already_registered(model_name, exp, run_id) + run = Run(experiment=exp, run_id=run_id) + tagsValue = {"area": "diabetes", "type": "regression", + "BuildId": build_id, "run_id": run_id} + else: + run = Run(experiment=exp, run_id=run_id) + if (run is not None): + tagsValue = {"area": "diabetes", + "type": "regression", "run_id": run_id} + else: + print("A model run for experiment", exp.name, + "matching properties run_id =", run_id, + "was not found. Skipping model registration.") + sys.exit(0) -run = Run(experiment=exp, run_id=run_id) -names = run.get_file_names -names() -print("Run ID for last run: {}".format(run_id)) + model = run.register_model(model_name=model_name, + model_path="./outputs/" + model_name, + tags=tagsValue) + os.chdir("..") + print( + "Model registered: {} \nModel Description: {} " + "\nModel Version: {}".format( + model.name, model.description, model.version + ) + ) + except Exception: + traceback.print_exc(limit=None, file=None, chain=True) + print("Model registration failed") + raise -model = run.register_model(model_name=model_name, - model_path="./outputs/" + model_name, - tags={"area": "diabetes", "type": "regression"}) -os.chdir("..") -print( - "Model registered: {} \nModel Description: {} \nModel Version: {}".format( - model.name, model.description, model.version - ) -) -# Writing the registered model details to /aml_config/model.json -model_json = {} -model_json["model_name"] = model.name -model_json["model_version"] = model.version -model_json["run_id"] = run_id -filename = "model_{}.json".format(args.config_suffix) -output_path = os.path.join(args.json_config, filename) -with open(output_path, "w") as outfile: - json.dump(model_json, outfile) +if __name__ == '__main__': + main() diff --git a/code/training/train.py b/code/training/train.py index d703964f..5f8c19ef 100644 --- a/code/training/train.py +++ b/code/training/train.py @@ -36,9 +36,9 @@ parser = argparse.ArgumentParser("train") parser.add_argument( - "--release_id", + "--build_id", type=str, - help="The ID of the release triggering this pipeline run", + help="The build ID of the build triggering this pipeline run", ) parser.add_argument( "--model_name", @@ -49,11 +49,11 @@ args = parser.parse_args() -print("Argument 1: %s" % args.release_id) +print("Argument 1: %s" % args.build_id) print("Argument 2: %s" % args.model_name) model_name = args.model_name -release_id = args.release_id +build_id = args.build_id run = Run.get_context() exp = run.experiment @@ -73,30 +73,31 @@ alpha = alphas[np.random.choice(alphas.shape[0], 1, replace=False)][0] print(alpha) run.log("alpha", alpha) +run.parent.log("alpha", alpha) reg = Ridge(alpha=alpha) reg.fit(data["train"]["X"], data["train"]["y"]) preds = reg.predict(data["test"]["X"]) -run.log("mse", mean_squared_error(preds, data["test"]["y"])) - - -# Save model as part of the run history - -# model_name = "." +run.log("mse", mean_squared_error( + preds, data["test"]["y"]), description="Mean squared error metric") +run.parent.log("mse", mean_squared_error( + preds, data["test"]["y"]), description="Mean squared error metric") with open(model_name, "wb") as file: joblib.dump(value=reg, filename=model_name) -# upload the model file explicitly into artifacts -run.upload_file(name="./outputs/" + model_name, path_or_stream=model_name) +# upload model file explicitly into artifacts for parent run +run.parent.upload_file(name="./outputs/" + model_name, + path_or_stream=model_name) print("Uploaded the model {} to experiment {}".format( model_name, run.experiment.name)) dirpath = os.getcwd() print(dirpath) print("Following files are uploaded ") -print(run.get_file_names()) +print(run.parent.get_file_names()) # Add properties to identify this specific training run -run.add_properties({"release_id": release_id, "run_type": "train"}) -print(f"added properties: {run.properties}") +run.tag("BuildId", value=build_id) +run.tag("run_type", value="train") +print(f"tags now present for run: {run.tags}") run.complete() diff --git a/code/util/model_helper.py b/code/util/model_helper.py new file mode 100644 index 00000000..1609642a --- /dev/null +++ b/code/util/model_helper.py @@ -0,0 +1,85 @@ +""" +model_helper.py +""" +from azureml.core import Run +from azureml.core import Workspace +from azureml.core.model import Model as AMLModel + + +def get_current_workspace() -> Workspace: + """ + Retrieves and returns the latest model from the workspace + by its name and tag. Will not work when ran locally. + + Parameters: + None + + Return: + The current workspace. + """ + run = Run.get_context(allow_offline=False) + experiment = run.experiment + return experiment.workspace + + +def _get_model_by_build_id( + model_name: str, + build_id: str, + aml_workspace: Workspace = None +) -> AMLModel: + """ + Retrieves and returns the latest model from the workspace + by its name and tag. + + Parameters: + aml_workspace (Workspace): aml.core Workspace that the model lives. + model_name (str): name of the model we are looking for + build_id (str): the build id the model was registered under. + + Return: + A single aml model from the workspace that matches the name and tag. + """ + # Validate params. cannot be None. + if model_name is None: + raise ValueError("model_name[:str] is required") + if build_id is None: + raise ValueError("build_id[:str] is required") + if aml_workspace is None: + aml_workspace = get_current_workspace() + + # get model by tag. + model_list = AMLModel.list( + aml_workspace, name=model_name, + tags=[["BuildId", build_id]], latest=True + ) + + # latest should only return 1 model, but if it does, then maybe + # internal code was accidentally changed or the source code has changed. + should_not_happen = ("THIS SHOULD NOT HAPPEN: found more than one model " + "for the latest with {{model_name: {model_name}," + "BuildId: {build_id}. Models found: {model_list}}}")\ + .format(model_name=model_name, build_id=build_id, + model_list=model_list) + if len(model_list) > 1: + raise ValueError(should_not_happen) + + return model_list + + +def get_model_by_build_id( + model_name: str, + build_id: str, + aml_workspace: Workspace = None +) -> AMLModel: + """ + Wrapper function for get_model_by_id that throws an error if model is none + """ + model_list = _get_model_by_build_id(model_name, build_id, aml_workspace) + + if model_list: + return model_list[0] + + no_model_found = ("Model not found with model_name: {model_name} " + "BuildId: {build_id}.")\ + .format(model_name=model_name, build_id=build_id) + raise Exception(no_model_found) diff --git a/docs/getting_started.md b/docs/getting_started.md index cc56c6c4..a3806b22 100644 --- a/docs/getting_started.md +++ b/docs/getting_started.md @@ -31,6 +31,16 @@ subscription. Contact your subscription administrator if you don't have the permissions. Normally a subscription admin can create a Service principal and can provide you the details. +## Create an Azure DevOps Azure ML Workspace Service Connection +You need to create a service connection to your ML workspace so the Azure DevOps Azure ML task can execute the Azure ML training pipeline. To get there, go to your Azure DevOps project settings page (by clicking on the cog wheel to the bottom left of the screen), and then click on **Service connections** under the **Pipelines** section: + +**Note:** Creating service connection using Azure Machine Learning extension requires 'Owner' or 'User Access Administrator' permissions on the Workspace. + +This is how your service connection looks like. Make sure to pick your resource group and type AML workspace. The connection name specified here needs to be used for the value of the `WORKSPACE_SVC_CONNECTION` set in the variable group below. + +![](./images/svc-connection.png) + + ## Create a Variable Group for your Pipelines We make use of variable group inside Azure DevOps to store variables and their @@ -59,6 +69,7 @@ The variable group should contain the following required variables: | TENANT_ID | | | RESOURCE_GROUP | | | WORKSPACE_NAME | mlops-AML-WS | +| WORKSPACE_SVC_CONNECTION | aml-workspace-connection | Mark **SP_APP_SECRET** variable as a secret one. diff --git a/docs/images/svc-connection.png b/docs/images/svc-connection.png new file mode 100644 index 00000000..dc17e575 Binary files /dev/null and b/docs/images/svc-connection.png differ diff --git a/environment_setup/iac-remove-environment.yml b/environment_setup/iac-remove-environment.yml index 67626223..2892bf5c 100644 --- a/environment_setup/iac-remove-environment.yml +++ b/environment_setup/iac-remove-environment.yml @@ -1,10 +1,5 @@ -trigger: - branches: - include: - - master - paths: - include: - - environment_setup/arm-templates/* +pr: none +trigger: none pool: vmImage: 'ubuntu-latest' diff --git a/ml_service/pipelines/build_train_pipeline.py b/ml_service/pipelines/build_train_pipeline.py index b866201d..fa21b515 100644 --- a/ml_service/pipelines/build_train_pipeline.py +++ b/ml_service/pipelines/build_train_pipeline.py @@ -42,11 +42,10 @@ def main(): ) run_config.environment.docker.enabled = True - model_name = PipelineParameter( + model_name_param = PipelineParameter( name="model_name", default_value=e.model_name) - release_id = PipelineParameter( - name="release_id", default_value="0" - ) + build_id_param = PipelineParameter( + name="build_id", default_value=e.build_id) train_step = PythonScriptStep( name="Train Model", @@ -54,8 +53,8 @@ def main(): compute_target=aml_compute, source_directory=e.sources_directory_train, arguments=[ - "--release_id", release_id, - "--model_name", model_name, + "--build_id", build_id_param, + "--model_name", model_name_param, ], runconfig=run_config, allow_reuse=False, @@ -68,18 +67,34 @@ def main(): compute_target=aml_compute, source_directory=e.sources_directory_train, arguments=[ - "--release_id", release_id, - "--model_name", model_name, + "--build_id", build_id_param, + "--model_name", model_name_param, ], runconfig=run_config, allow_reuse=False, ) print("Step Evaluate created") + register_step = PythonScriptStep( + name="Register Model ", + script_name=e.register_script_path, + compute_target=aml_compute, + source_directory=e.sources_directory_train, + arguments=[ + "--build_id", build_id_param, + "--model_name", model_name_param, + ], + runconfig=run_config, + allow_reuse=False, + ) + print("Step Register created") + evaluate_step.run_after(train_step) - steps = [evaluate_step] + register_step.run_after(evaluate_step) + steps = [train_step, evaluate_step, register_step] train_pipeline = Pipeline(workspace=aml_workspace, steps=steps) + train_pipeline._set_experiment_name train_pipeline.validate() published_pipeline = train_pipeline.publish( name=e.pipeline_name, diff --git a/ml_service/pipelines/run_train_pipeline.py b/ml_service/pipelines/run_train_pipeline.py index fdc8f5a5..65316007 100644 --- a/ml_service/pipelines/run_train_pipeline.py +++ b/ml_service/pipelines/run_train_pipeline.py @@ -19,15 +19,16 @@ def main(): subscription_id=e.subscription_id, resource_group=e.resource_group, auth=service_principal - ) + ) # Find the pipeline that was published by the specified build ID pipelines = PublishedPipeline.list(aml_workspace) matched_pipes = [] for p in pipelines: - if p.version == e.build_id: - matched_pipes.append(p) + if p.name == e.pipeline_name: + if p.version == e.build_id: + matched_pipes.append(p) if(len(matched_pipes) > 1): published_pipeline = None @@ -37,16 +38,25 @@ def main(): raise KeyError(f"Unable to find a published pipeline for this build {e.build_id}") # NOQA: E501 else: published_pipeline = matched_pipes[0] - - pipeline_parameters = {"model_name": e.model_name} - - response = published_pipeline.submit( - aml_workspace, - e.experiment_name, - pipeline_parameters) - - run_id = response.id - print("Pipeline run initiated ", run_id) + print("published pipeline id is", published_pipeline.id) + + # Save the Pipeline ID for other AzDO jobs after script is complete + os.environ['amlpipeline_id'] = published_pipeline.id + savePIDcmd = 'echo "export AMLPIPELINE_ID=$amlpipeline_id" >tmp.sh' + os.system(savePIDcmd) + + # Set this to True for local development or + # if not using Azure DevOps pipeline execution task + skip_train_execution = True + if(skip_train_execution is False): + pipeline_parameters = {"model_name": e.model_name} + response = published_pipeline.submit( + aml_workspace, + e.experiment_name, + pipeline_parameters) + + run_id = response.id + print("Pipeline run initiated ", run_id) if __name__ == "__main__": diff --git a/ml_service/util/env_variables.py b/ml_service/util/env_variables.py index 9fe6d061..f83a9fbd 100644 --- a/ml_service/util/env_variables.py +++ b/ml_service/util/env_variables.py @@ -31,6 +31,7 @@ def __init__(self): self._sources_directory_train = os.environ.get("SOURCES_DIR_TRAIN") self._train_script_path = os.environ.get("TRAIN_SCRIPT_PATH") self._evaluate_script_path = os.environ.get("EVALUATE_SCRIPT_PATH") + self._register_script_path = os.environ.get("REGISTER_SCRIPT_PATH") self._model_name = os.environ.get("MODEL_NAME") self._experiment_name = os.environ.get("EXPERIMENT_NAME") self._model_version = os.environ.get('MODEL_VERSION') @@ -94,6 +95,10 @@ def train_script_path(self): def evaluate_script_path(self): return self._evaluate_script_path + @property + def register_script_path(self): + return self._register_script_path + @property def model_name(self): return self._model_name diff --git a/ml_service/util/register_model.py b/ml_service/util/register_model.py deleted file mode 100644 index 7c99aaac..00000000 --- a/ml_service/util/register_model.py +++ /dev/null @@ -1,37 +0,0 @@ -import sys -import os -import os.path -from azureml.core import Workspace -from azureml.core.model import Model -from azureml.core.authentication import ServicePrincipalAuthentication -from env_variables import Env - -e = Env() - -if os.path.isfile(e.model_path) is False: - print("The given model path %s is invalid" % (e.model_path)) - sys.exit(1) - -SP_AUTH = ServicePrincipalAuthentication( - tenant_id=e.tenant_id, - service_principal_id=e.app_id, - service_principal_password=e.app_secret) - -WORKSPACE = Workspace.get( - e.workspace_name, - SP_AUTH, - e.subscription_id, - e.resource_group -) - -try: - MODEL = Model.register( - model_path=e.model_path, - model_name=e.model_name, - description="Forecasting Model", - workspace=e.workspace) - - print("Model registered successfully. ID: " + MODEL.id) -except Exception as caught_error: - print("Error while registering the model: " + str(caught_error)) - sys.exit(1)