# Develop our pipeline

In this notebook, we will develop our Azure Machine Learning Pipeline. The Azure Machine learning pipeline will string together the steps of preprocessing the video, applying style transfer, and postprocessing the video into a single execution graph. 

To setup the pipeline, we'll need to make sure we have the necessary compute and storage available. To do so, we'll need to create our compute platform using AmlCompute and register the storage account that we created in the previous notebook.

The last step of this notebook is to publish the pipeline. Once it's published as a public endpoint, we'll test it to make sure that it runs as expected.

---

### Import package and load .env

In [1]:
from dotenv import set_key, get_key, find_dotenv, load_dotenv
from pathlib import Path
from azureml.core import Workspace, Run, Experiment
from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.core.datastore import Datastore
from azureml.data.data_reference import DataReference
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.pipeline.steps import PythonScriptStep, MpiStep
from azureml.core.runconfig import CondaDependencies, RunConfiguration
from azureml.core.runconfig import DEFAULT_CPU_IMAGE #, DEFAULT_GPU_IMAGE
from IPython.core.display import display, HTML
from azureml.data.datapath import DataPath, DataPathComputeBinding
from azureml.pipeline.core.graph import PipelineParameter
from azureml.core.authentication import AzureCliAuthentication
import subprocess
import requests
import json
import os

In [2]:
env_path = find_dotenv(raise_error_if_not_found=True)
load_dotenv(env_path)

True

### Setup the workspace in AML

Get our workspace from the config file.

In [3]:
ws = Workspace.from_config()
print('Workspace name: ' + ws.name, 
      'Azure region: ' + ws.location, 
      'Subscription id: ' + ws.subscription_id, 
      'Resource group: ' + ws.resource_group, sep = '\n')

# Also create a Project and attach to Workspace
project_folder = "scripts"
run_history_name = project_folder

if not os.path.isdir(project_folder):
    os.mkdir(project_folder)

### Setup the compute

Create our compute using `AmlCompute`. We'll need one node for the video pre/post processing. And the remaining nodes for performing the style transfer. Since we'll be using the MPI Step, all nodes must be active before the MPI step will execute. Thus, we should set max nodes to equal min nodes, as there is no point autoscaling the cluster.

Set the number of nodes we want for each cluster.

In [4]:
style_transfer_node_count = 4
ffmpeg_node_count = 1

Verify that the subscription in use has enough cores. We need to check for two vm types since we'll be using NCSv2 for style transfer and DSv2 for ffmpeg processes. If you do not have quota for the NCSv2 family, you can use another GPU family instead.

In [5]:
vm_dict = {
    "NC": {
        "size": "STANDARD_NC6",
        "cores": 6
    },
    "NCSv3": {
        "size": "STANDARD_NC6s_v3",
        "cores": 6
    },
    "DSv2": {
        "size": "STANDARD_DS3_V2",
        "cores": 4
    }
}

Create our non-gpu DSv2 cluster

In [7]:
# CPU compute
cpu_cluster_name = "ffmpeg-cluster"
try:
    cpu_cluster = AmlCompute(ws, cpu_cluster_name)
    print("Found existing cluster.")
except:
    print("Creating {}".format(cpu_cluster_name))
    provisioning_config = AmlCompute.provisioning_configuration(
        vm_size=vm_dict["DSv2"]["size"], 
        min_nodes=ffmpeg_node_count, 
        max_nodes=ffmpeg_node_count
    )

    # create the cluster
    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, provisioning_config)
    cpu_cluster.wait_for_completion(show_output=True)


Creating ffmpeg-cluster
Creating
Succeeded..............
AmlCompute wait for completion finished
Minimum number of nodes requested have been provisioned


Create our NCSv2 cluster.

In [9]:
# GPU compute
gpu_cluster_name = "style-cluster"
try:
    gpu_cluster = AmlCompute(ws, gpu_cluster_name)
    print("Found existing cluster.")
except:
    print("Creating {}".format(gpu_cluster_name))
    provisioning_config = AmlCompute.provisioning_configuration(
        vm_size=vm_dict["NC"]["size"], 
        min_nodes=style_transfer_node_count, 
        max_nodes=style_transfer_node_count
    )

    # create the cluster
    gpu_cluster = ComputeTarget.create(ws, gpu_cluster_name, provisioning_config)
    gpu_cluster.wait_for_completion(show_output=True)

Creating style-cluster
Creating
Succeeded.......................
AmlCompute wait for completion finished
Minimum number of nodes requested have been provisioned


### Setup data references

Use the default data store provided by the Azure Machine Learning workspace.

In [11]:
my_datastore = ws.get_default_datastore()
set_key(env_path, "AML_DATASTORE_NAME", "workspaceblobstore")
set_key(env_path, "STORAGE_ACCOUNT_NAME", my_datastore.account_name)
set_key(env_path, "STORAGE_ACCOUNT_KEY", my_datastore.account_key)
set_key(env_path, "STORAGE_CONTAINER_NAME", my_datastore.container_name)

Upload the `models` folder (from out local directory) and the `orangutan.mp4` video to the datastore.

In [None]:
!wget -O orangutan.mp4 https://happypathspublic.blob.core.windows.net/assets/batch_scoring_for_dl/input_video.mp4

In [12]:
# Upload files in models folder to a directory called models
my_datastore.upload_files(
    ["./models/model.pth"],
    target_path="models", 
    overwrite=True
)

# Upload orangutan.mp4 video
my_datastore.upload_files(
    ["./orangutan.mp4"],
    overwrite=True
)

$AZUREML_DATAREFERENCE_datastore

Set the `models` dir we uploaded as data references to be used by the pipeline steps later on.

In [13]:
model_dir = DataReference(
    data_reference_name="model_dir", 
    datastore=my_datastore, 
    path_on_datastore="models", 
    mode="download"
)

Set the output video to be saved in the same datastore.

In [14]:
output_video = PipelineData(name="output_video", datastore=my_datastore)

Get a reference to the datastore that was generated when the AML workspace was created. We'll use this datastore to hold temporary pipeline data.

In [15]:
default_datastore = ws.get_default_datastore()     

Save all temporary data files (PipelineData) to the default datastore.

In [16]:
ffmpeg_audio = PipelineData(name="ffmpeg_audio", datastore=default_datastore)
ffmpeg_images = PipelineData(name="ffmpeg_images", datastore=default_datastore)
processed_images = PipelineData(name="processed_images", datastore=default_datastore)

### Setup cluster environments

Config for ffmpeg cluster

In [17]:
ffmpeg_cd = CondaDependencies()
ffmpeg_cd.add_channel("conda-forge")
ffmpeg_cd.add_conda_package("ffmpeg")

ffmpeg_run_config = RunConfiguration(conda_dependencies=ffmpeg_cd)
ffmpeg_run_config.environment.docker.enabled = True
ffmpeg_run_config.environment.docker.gpu_support = False
ffmpeg_run_config.environment.docker.base_image = DEFAULT_CPU_IMAGE
ffmpeg_run_config.environment.spark.precache_packages = False

Config for style transfer cluster

In [18]:
style_transfer_cd = CondaDependencies()
style_transfer_cd.add_channel("pytorch")
style_transfer_cd.add_conda_package("pytorch")

style_transfer_run_config = RunConfiguration(conda_dependencies=style_transfer_cd)
style_transfer_run_config.environment.docker.enabled = True
style_transfer_run_config.environment.docker.gpu_support = True
style_transfer_run_config.environment.docker.base_image = "pytorch/pytorch"
style_transfer_run_config.environment.spark.precache_packages = False

### Set up pipeline steps

When setting up the pipelines, we'll need to create a `video_path_param` that can be modified when the pipeline is published.

In [19]:
video_path_default = DataPath(datastore=my_datastore, path_on_datastore="orangutan.mp4")
video_path_param = (PipelineParameter(name="video_path", default_value=video_path_default), DataPathComputeBinding())

Create the 3-step pipeline using PythonScriptSteps and the MpiStep. In the MPI step, you'll notice that we use the `style_transfer_mpi.py` script instead of the `style_transfer.py` script. This is because the MPI expects that the script is modified to use MPI code.

Both scripts do the exact same thing, except that the `style_transfer_mpi.py` script is set up to use MPI to run process the frames in a distributed way. 

Feel free to inspect the differences under the `scripts` folder.

In [20]:
preprocess_video_step = PythonScriptStep(
    name="preprocess video",
    script_name="preprocess_video.py",
    arguments=["--input-video", video_path_param,
               "--output-audio", ffmpeg_audio,
               "--output-images", ffmpeg_images,
              ],
    compute_target=cpu_cluster,
    inputs=[video_path_param],
    outputs=[ffmpeg_images, ffmpeg_audio],
    runconfig=ffmpeg_run_config,
    source_directory=project_folder,
    allow_reuse=False
)

distributed_style_transfer_step = MpiStep(
    name="mpi style transfer",
    script_name="style_transfer_mpi.py",
    arguments=["--content-dir", ffmpeg_images,
               "--output-dir", processed_images,
               "--model-dir", model_dir,
               "--cuda", 1
              ],
    compute_target=gpu_cluster,
    node_count=4, 
    process_count_per_node=1,
    inputs=[model_dir, ffmpeg_images],
    outputs=[processed_images],
    pip_packages=["image", "mpi4py", "torch", "torchvision"],
    runconfig=style_transfer_run_config,
    use_gpu=True,
    source_directory=project_folder,
    allow_reuse=False
)

postprocess_video_step = PythonScriptStep(
    name="postprocess video",
    script_name="postprocess_video.py",
    arguments=["--images-dir", processed_images, 
               "--input-audio", ffmpeg_audio, 
               "--output-dir", output_video],
    compute_target=cpu_cluster,
    inputs=[processed_images, ffmpeg_audio],
    outputs=[output_video],
    runconfig=ffmpeg_run_config,
    source_directory=project_folder,
    allow_reuse=False
)

### Run the pipeline

Run the pipeline, passing in the video path variable.

In [21]:
steps = [postprocess_video_step]
pipeline = Pipeline(workspace=ws, steps=steps)
pipeline_run = Experiment(ws, 'style_transfer_mpi').submit(
    pipeline, 
    pipeline_params={'video_path': DataPath(datastore=my_datastore, path_on_datastore="orangutan.mp4")}
)

Created step postprocess video [7cc17697][f9be7548-48d6-4df0-99f6-99a0c0377c20], (This step will run and generate new outputs)
Created step mpi style transfer [e7ba080e][3017c382-f3cf-4055-8f20-409ac9305ffc], (This step will run and generate new outputs)
Created step preprocess video [2bb7ac9c][31623158-d45d-4c8a-8fcc-c12d4a73f4b6], (This step will run and generate new outputs)
Created data reference model_dir for StepId [416f19e6][88eeea78-4063-4fc8-9930-5480186dd516], (Consumers of this data will generate new runs.)
Created data reference datastore_975052cf_ad24f844 for StepId [17a8ad15][4c81af2a-45ec-47b5-8678-e08556c3738a], (Consumers of this data will generate new runs.)
Submitted pipeline run: 304146d1-ea9c-445c-978a-b8c55800759a


In [22]:
pipeline_run

Experiment,Id,Type,Status,Details Page,Docs Page
style_transfer_mpi,304146d1-ea9c-445c-978a-b8c55800759a,azureml.PipelineRun,NotStarted,Link to Azure Portal,Link to Documentation


Wait until the pipeline completes before proceeding...

In [23]:
pipeline_run.wait_for_completion(show_output=True)

status:Running
...............................................................................................................................................................................................................................................................
status:Finished


'Finished'

### Download the output video

Get the step id of the postprocessing step

In [24]:
step_id = pipeline_run.find_step_run("postprocess video")[0].id

Download the output files from the postprocessing step

In [25]:
my_datastore.download(
    target_path="aml_test_orangutan", 
    prefix=step_id, 
)

2

Display the generated output video that we just downloaded

In [26]:
display(HTML("""
    <video width="320" height="240" controls>
        <source src="aml_test_orangutan/{}/output_video/video_processed.mp4" type="video/mp4">
    </video>
""".format(step_id)))

### Publish the pipeline

The last step is to publish the pipeline so that the pipeline can be triggered on an http endpoint. We'll use Logic Apps in the next notebook to consume this endpoint.

In [27]:
published_pipeline = pipeline.publish(
    name="style transfer", 
    description="some description"
)

In [28]:
published_pipeline_id = published_pipeline.id
set_key(env_path, "AML_PUBLISHED_PIPELINE_ID", published_pipeline_id)

### Test the published pipeline

In [29]:
cli_auth = AzureCliAuthentication()
aad_token = cli_auth.get_authentication_header()

response = requests.post(
    published_pipeline.endpoint, 
    headers=aad_token, 
    json={
        "ExperimentName": "My_Pipeline",
        "DataPathAssignments": {
            "video_path": {"DataStoreName": "workspaceblobstore",
                           "RelativePath": "orangutan.mp4"}
        }
    }
)

run_id = response.json()["Id"]
print(run_id)

---

You are now ready to move on to the [next notebook](04_deploy_logic_apps.ipynb).