### Import package and load .env

In [None]:
from dotenv import set_key, get_key, find_dotenv, load_dotenv
from pathlib import Path
from azureml.core import Workspace, Run, Experiment
from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.core.datastore import Datastore
from azureml.data.data_reference import DataReference
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.pipeline.steps import PythonScriptStep, MpiStep
from azureml.core.runconfig import CondaDependencies, RunConfiguration
from azureml.pipeline.core.graph import PipelineParameter
from azureml.core.runconfig import DEFAULT_CPU_IMAGE #, DEFAULT_GPU_IMAGE
from IPython.core.display import display, HTML
import json
import os

In [None]:
env_path = find_dotenv(raise_error_if_not_found=True)
load_dotenv(env_path)

### Setup the workspace in AML

Get our workspace from the config file.

In [None]:
ws = Workspace.from_config()
print('Workspace name: ' + ws.name, 
      'Azure region: ' + ws.location, 
      'Subscription id: ' + ws.subscription_id, 
      'Resource group: ' + ws.resource_group, sep = '\n')

# Also create a Project and attach to Workspace
project_folder = "scripts"
run_history_name = project_folder

if not os.path.isdir(project_folder):
    os.mkdir(project_folder)

### Setup the compute

Create our compute using `AmlCompute`. We'll need one node for the video pre/post processing. And the remaining nodes for performing the style transfer. Since we'll be using the MPI Step, all nodes must be active before the MPI step will execute. Thus, we should set max nodes to equal min nodes, as there is no point autoscaling the cluster.

In [None]:
style_transfer_node_count = 4
ffmpeg_node_count = 1

In [None]:
# CPU compute
cpu_cluster_name = "ffmpeg-cluster"
try:
    cpu_cluster = AmlCompute(ws, cpu_cluster_name)
    print("Found existing cluster.")
except:
    print("Creating {}".format(cpu_cluster_name))
    provisioning_config = AmlCompute.provisioning_configuration(
        vm_size="STANDARD_D3_V2", 
        min_nodes=ffmpeg_node_count, 
        max_nodes=ffmpeg_node_count
    )

    # create the cluster
    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, provisioning_config)
    cpu_cluster.wait_for_completion(show_output=True)
    
# GPU compute
gpu_cluster_name = "style-cluster"
try:
    gpu_cluster = AmlCompute(ws, gpu_cluster_name)
    print("Found existing cluster.")
except:
    print("Creating {}".format(gpu_cluster_name))
    provisioning_config = AmlCompute.provisioning_configuration(
        vm_size="STANDARD_NC6s_v3", 
        min_nodes=style_transfer_node_count, 
        max_nodes=style_transfer_node_count
    )

    # create the cluster
    gpu_cluster = ComputeTarget.create(ws, gpu_cluster_name, provisioning_config)
    gpu_cluster.wait_for_completion(show_output=True)

### Setup data references

Create a datastore based on the storage account we created earlier. We'll use that storage account to hold our input and output data.

In [None]:
# datastore
my_datastore = Datastore.register_azure_blob_container(
    workspace=ws, 
    datastore_name="datastore", 
    container_name=get_key(env_path, "STORAGE_CONTAINER_NAME"), 
    account_name=get_key(env_path, "STORAGE_ACCOUNT_NAME"), 
    account_key=get_key(env_path, "STORAGE_ACCOUNT_KEY"),
    overwrite=True
)

Upload the `models` folder (from out local directory) and the `orangutan.mp4` video to the datastore.

In [None]:
# Upload files in models folder to a directory called models
my_datastore.upload_files(
    ["./models/model.pth"],
    target_path="models", 
    overwrite=True
)

# Upload orangutan.mp4 video
my_datastore.upload_files(
    ["./orangutan.mp4"],
    overwrite=True
)

Set the `models` dir and the `orangutan.mp4` video we upload as data references to be used by the pipeline steps later on.

In [None]:
model_dir = DataReference(
    data_reference_name="model_dir", 
    datastore=my_datastore, 
    path_on_datastore="models", 
    mode="download"
)

video = DataReference(
    datastore=my_datastore,                        
    data_reference_name="video",
    path_on_datastore="orangutan.mp4", 
    mode="download"
)

Set the output video to be saved in the same datastore.

In [None]:
output_video = PipelineData(name="output_video", datastore=my_datastore)

Get a reference to the datastore that was generated when the AML workspace was created. We'll use this datastore to hold temporary pipeline data.

In [None]:
default_datastore = ws.get_default_datastore()     

Save all temporary data files (PipelineData) to the default datastore.

In [None]:
ffmpeg_audio = PipelineData(name="ffmpeg_audio", datastore=default_datastore)
ffmpeg_images = PipelineData(name="ffmpeg_images", datastore=default_datastore)
processed_images = PipelineData(name="processed_images", datastore=default_datastore)

### Setup cluster environments

Config for ffmpeg cluster

In [None]:
ffmpeg_cd = CondaDependencies()
ffmpeg_cd.add_channel("conda-forge")
ffmpeg_cd.add_conda_package("ffmpeg")

ffmpeg_run_config = RunConfiguration(conda_dependencies=ffmpeg_cd)
ffmpeg_run_config.environment.docker.enabled = True
ffmpeg_run_config.environment.docker.gpu_support = False
ffmpeg_run_config.environment.docker.base_image = DEFAULT_CPU_IMAGE
ffmpeg_run_config.environment.spark.precache_packages = False

Config for style transfer cluster

In [None]:
style_transfer_cd = CondaDependencies()
style_transfer_cd.add_channel("pytorch")
style_transfer_cd.add_conda_package("pytorch")

style_transfer_run_config = RunConfiguration(conda_dependencies=style_transfer_cd)
style_transfer_run_config.environment.docker.enabled = True
style_transfer_run_config.environment.docker.gpu_support = True
style_transfer_run_config.environment.docker.base_image = "pytorch/pytorch"
style_transfer_run_config.environment.spark.precache_packages = False

### Set up pipeline steps

In [None]:
# style_param = PipelineParameter(name="style", default_value="mosaic")

In [None]:
preprocess_video_step = PythonScriptStep(
    name="preprocess video",
    script_name="preprocess_video.py",
    arguments=["--input-video", video,
               "--output-audio", ffmpeg_audio,
               "--output-images", ffmpeg_images,
              ],
    compute_target=cpu_cluster,
    inputs=[video],
    outputs=[ffmpeg_images, ffmpeg_audio],
    runconfig=ffmpeg_run_config,
    source_directory=project_folder,
    allow_reuse=False
)

distributed_style_transfer_step = MpiStep(
    name="mpi style transfer",
    script_name="style_transfer_mpi.py",
    arguments=["--content-dir", ffmpeg_images,
               "--output-dir", processed_images,
               "--model-dir", model_dir,
               "--cuda", 1
              ],
    compute_target=gpu_cluster,
    node_count=4, 
    process_count_per_node=1,
    inputs=[model_dir, ffmpeg_images],
    outputs=[processed_images],
    pip_packages=["image", "mpi4py", "torch", "torchvision"],
    runconfig=style_transfer_run_config,
    use_gpu=True,
    source_directory=project_folder,
    allow_reuse=False
)

postprocess_video_step = PythonScriptStep(
    name="postprocess video",
    script_name="postprocess_video.py",
    arguments=["--images-dir", processed_images, 
               "--input-audio", ffmpeg_audio, 
               "--output-dir", output_video],
    compute_target=cpu_cluster,
    inputs=[processed_images, ffmpeg_audio],
    outputs=[output_video],
    runconfig=ffmpeg_run_config,
    source_directory=project_folder,
    allow_reuse=False
)

### Run the pipeline

In [None]:
steps = [preprocess_video_step, distributed_style_transfer_step, postprocess_video_step]
pipeline = Pipeline(workspace=ws, steps=steps)
pipeline_run = Experiment(ws, 'style_transfer_mpi').submit(pipeline)
# pipeline_run = Experiment(ws, 'style_transfer_mpi').submit(pipeline, pipeline_params={"style": "mosaic"})

In [None]:
pipeline_run

In [None]:
pipeline_run.wait_for_completion(show_output=True)

### Download the output video

Get the step id of the postprocessing step

In [None]:
step_id = pipeline_run.find_step_run("postprocess video")[0].id

Download the output files from the postprocessing step

In [None]:
my_datastore.download(
    target_path="aml_test_orangutan", 
    prefix=step_id, 
)

Display the generated output video that we just downloaded

In [None]:
display(HTML("""
    <video width="320" height="240" controls>
        <source src="aml_test_orangutan/{}/output_video/video_processed.mp4" type="video/mp4">
    </video>
""".format(step_id)))

---

You are now ready to move on to the [next notebook](04_publish_pipeline.ipynb).

---

In [None]:
video_path_param = PipelineParameter(name="video_path", default_value="dne.mp4")

In [None]:
video_path_param.name

In [None]:
video = DataReference(
    datastore=my_datastore,                        
    data_reference_name="video",
    path_on_datastore=video_path_param, 
    mode="download"
)

preprocess_video_step = PythonScriptStep(
    name="preprocess video",
    script_name="preprocess_video.py",
    arguments=["--input-video", video,
               "--output-audio", ffmpeg_audio,
               "--output-images", ffmpeg_images,
              ],
    compute_target=cpu_cluster,
    inputs=[video],
    outputs=[ffmpeg_images, ffmpeg_audio],
    runconfig=ffmpeg_run_config,
    source_directory=project_folder,
    allow_reuse=False
)

distributed_style_transfer_step = MpiStep(
    name="mpi style transfer",
    script_name="style_transfer_mpi.py",
    arguments=["--content-dir", ffmpeg_images,
               "--output-dir", processed_images,
               "--model-dir", model_dir,
               "--cuda", 1
              ],
    compute_target=gpu_cluster,
    node_count=4, 
    process_count_per_node=1,
    inputs=[model_dir, ffmpeg_images],
    outputs=[processed_images],
    pip_packages=["image", "mpi4py", "torch", "torchvision"],
    runconfig=style_transfer_run_config,
    use_gpu=True,
    source_directory=project_folder,
    allow_reuse=False
)

postprocess_video_step = PythonScriptStep(
    name="postprocess video",
    script_name="postprocess_video.py",
    arguments=["--images-dir", processed_images, 
               "--input-audio", ffmpeg_audio, 
               "--output-dir", output_video],
    compute_target=cpu_cluster,
    inputs=[processed_images, ffmpeg_audio],
    outputs=[output_video],
    runconfig=ffmpeg_run_config,
    source_directory=project_folder,
    allow_reuse=False
)

In [None]:
steps = [postprocess_video_step]
pipeline = Pipeline(workspace=ws, steps=steps)
pipeline_run = Experiment(ws, 'style_transfer_mpi_param').submit(
    pipeline, 
    pipeline_params={"video_path": "orangutan.mp4"}
)