# Introduction 

In this notebook we investigate `Pipeline` object of `azureml-sdk`. One of the great thing about pipeline is that we can use it to scheule periodic run of the model for the batch inference. 

In [1]:
from azureml.core import Workspace, Environment, Experiment
from azureml.core.environment import CondaDependencies
from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.core.runconfig import RunConfiguration
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import PipelineData, Pipeline

Failure while loading azureml_run_type_providers. Failed to load entrypoint automl = azureml.train.automl.run:AutoMLRun._from_run_dto with exception (numpy 1.20.2 (c:\users\rkoirala\anaconda3\lib\site-packages), Requirement.parse('numpy<=1.19.3; sys_platform == "win32"'), {'azureml-dataset-runtime'}).


In [2]:
ws = Workspace.from_config()

# Create an Environment 

In [3]:
myenv = Environment(name='env_azure_pipeline')
myenv_dep = CondaDependencies.create(conda_packages=['pandas', 'scikit-learn', 'pip'], 
                                    pip_packages=['azureml-defaults'])
myenv.python.conda_dependencies = myenv_dep
myenv.register(ws)

{
    "databricks": {
        "eggLibraries": [],
        "jarLibraries": [],
        "mavenLibraries": [],
        "pypiLibraries": [],
        "rcranLibraries": []
    },
    "docker": {
        "arguments": [],
        "baseDockerfile": null,
        "baseImage": "mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04:20210513.v1",
        "baseImageRegistry": {
            "address": null,
            "password": null,
            "registryIdentity": null,
            "username": null
        },
        "enabled": false,
        "platform": {
            "architecture": "amd64",
            "os": "Linux"
        },
        "sharedVolumes": true,
        "shmSize": null
    },
    "environmentVariables": {
        "EXAMPLE_ENV_VAR": "EXAMPLE_VALUE"
    },
    "inferencingStackVersion": null,
    "name": "env_azure_pipeline",
    "python": {
        "baseCondaEnvironment": null,
        "condaDependencies": {
            "channels": [
                "anaconda",
                "conda-fo

# Compute target in the cloud 

In [4]:
compute_name = 'rk-test-compute'

compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS11_V2', 
                                                       max_nodes=2)

rk_cluster = ComputeTarget.create(ws, compute_name, compute_config)
rk_cluster.wait_for_completion(show_output=True)

SucceededProvisioning operation finished, operation "Succeeded"
Succeeded
AmlCompute wait for completion finished

Minimum number of nodes requested have been provisioned


In [5]:
run_config = RunConfiguration()
run_config.target = rk_cluster
run_config.environment = myenv

# Pipeline and steps 

This is the new step here. This step is similar to the previous notebook where we create `Experiment` object and submit the model via python script. But only this time we submit the pipeline object which may consists of one or more than one steps. 

##### Check for `OutputFileDatasetConfig` option!

In [6]:
input_ds = ws.datasets.get('Iris Data')
datafolder = PipelineData('datafolder', datastore=ws.get_default_datastore())

# Step 01 - Data Preprocessing 
preprocess_step = PythonScriptStep(name='01 data_preprocessing', 
                                   source_directory='.', 
                                   script_name='pipeline_data_prepare.py', 
                                   arguments=['--datafolder', datafolder],
                                   inputs=[input_ds.as_named_input('raw_data')], 
                                   outputs=[datafolder],
                                   runconfig=run_config)

# Step 02 - Train Model 
model_train_step = PythonScriptStep(name='02 model_training', 
                                   source_directory='.', 
                                   script_name='pipeline_model_training.py', 
                                   arguments=['--datafolder', datafolder],
                                   inputs=[datafolder],
                                   runconfig=run_config)

In [7]:
# Construct the pipeline 
train_pipeline = Pipeline(workspace=ws,
                          steps=[preprocess_step, model_train_step])

# Create the experiment 
experiment = Experiment(workspace=ws, name='azureml-demo-exp')

# Run the experiment with pipeline
pipeline_run = experiment.submit(train_pipeline, 
                                 regenerate_outputs=True)
pipeline_run.wait_for_completion(show_output=True)

Created step 01 data_preprocessing [94bae01d][27e76576-5f33-4ddd-921a-c2dd1006dcd2], (This step will run and generate new outputs)
Created step 02 model_training [4e408fd0][a3b02f09-94e5-499b-8285-41268f060607], (This step will run and generate new outputs)
Submitted PipelineRun a561e068-8103-4117-9de9-b446ed437b19
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/a561e068-8103-4117-9de9-b446ed437b19?wsid=/subscriptions/54245888-2ffe-41fa-b080-67a29997b41c/resourcegroups/rg-dataservices-sandbox-01/workspaces/ds_dev_01&tid=4ef6e02a-f252-4618-a1dc-03bd2f93157d
PipelineRunId: a561e068-8103-4117-9de9-b446ed437b19
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/a561e068-8103-4117-9de9-b446ed437b19?wsid=/subscriptions/54245888-2ffe-41fa-b080-67a29997b41c/resourcegroups/rg-dataservices-sandbox-01/workspaces/ds_dev_01&tid=4ef6e02a-f252-4618-a1dc-03bd2f93157d
PipelineRun Status: Running


StepRunId: 9ce9ea22-e38e-4f97-a0b6-f13db77644bb
Link to Azure Machine Learn


Streaming azureml-logs/70_driver_log.txt
2021/10/14 18:48:07 Got JobInfoJson from env
2021/10/14 18:48:07 Starting App Insight Logger for task:  runTaskLet
2021/10/14 18:48:07 Version: 3.0.01744.0001 Branch: .SourceBranch Commit: f3a69a2
2021/10/14 18:48:07 Attempt 1 of http call to http://10.0.0.5:16384/sendlogstoartifacts/info
2021/10/14 18:48:07 Send process info logs to master server succeeded
2021/10/14 18:48:07 Attempt 1 of http call to http://10.0.0.5:16384/sendlogstoartifacts/status
2021/10/14 18:48:07 Send process info logs to master server succeeded
[2021-10-14T18:48:07.786115] Entering context manager injector.
[2021-10-14T18:48:08.265113] context_manager_injector.py Command line Options: Namespace(inject=['ProjectPythonPath:context_managers.ProjectPythonPath', 'RunHistory:context_managers.RunHistory', 'TrackUserError:context_managers.TrackUserError'], invocation=['pipeline_data_prepare.py', '--datafolder', '/mnt/batch/tasks/shared/LS_root/jobs/ds_dev_01/azureml/9ce9ea22-e3




StepRunId: c3b19f0d-b2a8-482c-b83d-823e3d657032
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/c3b19f0d-b2a8-482c-b83d-823e3d657032?wsid=/subscriptions/54245888-2ffe-41fa-b080-67a29997b41c/resourcegroups/rg-dataservices-sandbox-01/workspaces/ds_dev_01&tid=4ef6e02a-f252-4618-a1dc-03bd2f93157d
StepRun( 02 model_training ) Status: Running

Streaming azureml-logs/55_azureml-execution-tvmps_db31589b9d9c1ad7de24beacc0ad45383e39fc8484a000df02d5d14e478949ec_d.txt
2021-10-14T18:48:54Z Successfully mounted a/an Blobfuse File System at /mnt/batch/tasks/shared/LS_root/jobs/ds_dev_01/azureml/c3b19f0d-b2a8-482c-b83d-823e3d657032/mounts/workspaceblobstore
2021-10-14T18:48:54Z The vmsize standard_ds11_v2 is not a GPU VM, skipping get GPU count by running nvidia-smi command.
2021-10-14T18:48:54Z Starting output-watcher...
2021-10-14T18:48:54Z IsDedicatedCompute == True, won't poll for Low Pri Preemption
2021-10-14T18:48:55Z Executing 'Copy ACR Details file' on 10.0.0.5
2021-10-14T1


Streaming azureml-logs/75_job_post-tvmps_db31589b9d9c1ad7de24beacc0ad45383e39fc8484a000df02d5d14e478949ec_d.txt
[2021-10-14T18:49:48.306952] Entering job release
[2021-10-14T18:49:49.176131] Starting job release
[2021-10-14T18:49:49.177144] Logging experiment finalizing status in history service.
Starting the daemon thread to refresh tokens in background for process with pid = 137
[2021-10-14T18:49:49.177691] job release stage : upload_datastore starting...
[2021-10-14T18:49:49.178013] job release stage : start importing azureml.history._tracking in run_history_release.
[2021-10-14T18:49:49.185623] job release stage : execute_job_release starting...
[2021-10-14T18:49:49.188821] Entering context manager injector.
[2021-10-14T18:49:49.189953] job release stage : copy_batchai_cached_logs starting...
[2021-10-14T18:49:49.190415] job release stage : copy_batchai_cached_logs completed...
[2021-10-14T18:49:49.207758] job release stage : upload_datastore completed...
[2021-10-14T18:49:49.2459



PipelineRun Execution Summary
PipelineRun Status: Finished
{'runId': 'a561e068-8103-4117-9de9-b446ed437b19', 'status': 'Completed', 'startTimeUtc': '2021-10-14T18:43:26.786798Z', 'endTimeUtc': '2021-10-14T18:49:59.013248Z', 'properties': {'azureml.runsource': 'azureml.PipelineRun', 'runSource': 'SDK', 'runType': 'SDK', 'azureml.parameters': '{}', 'azureml.pipelineComponent': 'pipelinerun'}, 'inputDatasets': [], 'outputDatasets': [], 'logFiles': {'logs/azureml/executionlogs.txt': 'https://dsdev011073180542.blob.core.windows.net/azureml/ExperimentRun/dcid.a561e068-8103-4117-9de9-b446ed437b19/logs/azureml/executionlogs.txt?sv=2019-07-07&sr=b&sig=eUCeqMQbwaT01U%2BIAl7L4idkHw4GtTkKbq9%2BhsaAeuA%3D&skoid=b14fb336-66cf-43ab-a99a-2949dab01787&sktid=4ef6e02a-f252-4618-a1dc-03bd2f93157d&skt=2021-10-14T14%3A18%3A31Z&ske=2021-10-15T22%3A28%3A31Z&sks=b&skv=2019-07-07&st=2021-10-14T18%3A40%3A00Z&se=2021-10-15T02%3A50%3A00Z&sp=r', 'logs/azureml/stderrlogs.txt': 'https://dsdev011073180542.blob.core.

'Finished'

# Publish pipeline 

We can access the rest endpoint by publishing the pipeline. 

In [8]:
# The pipeline_run here looks same as the running notebook pipeline_run. 
# But this give an error proceeding forward. 
pipeline_exp = ws.experiments.get('azureml-demo-exp')
list(pipeline_exp.get_runs())[0]

Experiment,Id,Type,Status,Details Page,Docs Page
azureml-demo-exp,79793b8d-4092-4c19-a326-3111c812dfb5,azureml.PipelineRun,Completed,Link to Azure Machine Learning studio,Link to Documentation


In [9]:
pipeline_run

Experiment,Id,Type,Status,Details Page,Docs Page
azureml-demo-exp,79793b8d-4092-4c19-a326-3111c812dfb5,azureml.PipelineRun,Completed,Link to Azure Machine Learning studio,Link to Documentation


In [10]:
published_pipeline = pipeline_run.publish_pipeline(name='training_pipeline', 
                                                   description='Pipeline demo', 
                                                   version='1.0')
published_pipeline

Name,Id,Status,Endpoint
training_pipeline,c3f9e3f4-cd08-449d-bf67-602b1b8ceb1e,Active,REST Endpoint


In [11]:
rest_endpoint = published_pipeline.endpoint
rest_endpoint

'https://eastus2.api.azureml.ms/pipelines/v1.0/subscriptions/54245888-2ffe-41fa-b080-67a29997b41c/resourceGroups/rg-dataservices-sandbox-01/providers/Microsoft.MachineLearningServices/workspaces/ds_dev_01/PipelineRuns/PipelineSubmit/c3f9e3f4-cd08-449d-bf67-602b1b8ceb1e'

In [12]:
published_pipeline.id

'c3f9e3f4-cd08-449d-bf67-602b1b8ceb1e'

# Using published pipeline 

In [23]:
import requests

response = requests.post(rest_endpoint, 
                        header=auth_header,
                        json={'ExperimentName':'azureml-demo-exp'})
response

NameError: name 'auth_header' is not defined

# Schedule a pipeline 

We can scheule to run a pipeline in a regular interval. This is one of the key step in the deployment through batch inference.

In [13]:
from azureml.pipeline.core import ScheduleRecurrence, Schedule

In [14]:
hourly = ScheduleRecurrence(frequency='Hour', interval=1)
pipeline_schedule = Schedule.create(workspace=ws, 
                                   name='trains model hourly', 
                                   pipeline_id=published_pipeline.id, 
                                   experiment_name='azureml-demo-exp', 
                                   recurrence=hourly)

In [15]:
pipeline_schedule

Name,Id,Status,Pipeline Id,Pipeline Endpoint Id,Recurrence Details
trains model hourly,adeafe6a-f075-483f-a5f3-af7e5c420354,Active,c3f9e3f4-cd08-449d-bf67-602b1b8ceb1e,,Runs every Hour


# Disable/Enable schedule 

In [16]:
pipeline_schedule.disable()

```pyhton 
pipeline_schedule.disable()
pipeline_schedule.enable()
```

# Delete compute cluster 

How to delete the cluster between the scheduled run?

```python 
cluster.delete()
```

In [33]:
rk_cluster.delete()