First, you will need to create an environment to run these cells. I have done so using a "pyenv", follow the instructions on the internet to do so.
This allows me to simply install the necessary packages and then run this notebook file using the right kernel (selected here in the top right corner: "mlops (Python 3.10.10)")



In [1]:
# import required libraries
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential

from azure.ai.ml import MLClient
from azure.ai.ml.dsl import pipeline
from azure.ai.ml import load_component

In [2]:
import os
## Either get environment variables, or a fallback name, which is the second parameter.
## Currently, fill in the fallback values. Later on, we will make sure to work with Environment values. So we're already preparing for it in here!
workspace_name = os.environ.get('WORKSPACE', 'mlops-nathan')
subscription_id = os.environ.get('SUBSCRIPTION_ID', '7c50f9c3-289b-4ae0-a075-08784b3b9042')
resource_group = os.environ.get('RESOURCE_GROUP', 'mlops')

In [3]:
# Because we are running this in an interactive notebook; we can use the InteractiveBrowserCredential
# This allows us to open a browser window and login there
credential = InteractiveBrowserCredential()

In [4]:
ml_client = MLClient(
    credential, subscription_id, resource_group, workspace_name
)

Class FeatureStoreOperations: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class FeatureSetOperations: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class FeatureStoreEntityOperations: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.


## Prepare a Virtual PC if needed

If we need to get a CPU cluster, or a single-node machine, we can again create on using the SDK...

But we can also just create one in the Portal and then fetch it from here, which might be the safest option to be cheaper.

When you're configuring it in the Portal, select the "STANDARD_A4M_V2" one to have enough RAM, CPU Power and Storage to run the scripts. The cheapest one is not powerful enough ...

In [5]:
# Compute Instances need to have a unique name across the region.
# Here we create a unique name with current datetime
from azure.ai.ml.entities import ComputeInstance, AmlCompute
import datetime

ci_basic_name = "basic-ci" + datetime.datetime.now().strftime("%Y%m%d%H%M")
ci_basic = ComputeInstance(name=ci_basic_name, size="STANDARD_DS3_v2")
ml_client.begin_create_or_update(ci_basic).result()

ComputeInstance({'state': 'Running', 'last_operation': {'operation_name': 'Create', 'operation_time': '2023-10-04T12:45:39.211Z', 'operation_status': 'Succeeded', 'operation_trigger': 'User'}, 'os_image_metadata': <azure.ai.ml.entities._compute._image_metadata.ImageMetadata object at 0x125b72e00>, 'services': [{'display_name': 'Jupyter', 'endpoint_uri': 'https://zinnige-naam.westeurope.instances.azureml.ms/tree/'}, {'display_name': 'Jupyter Lab', 'endpoint_uri': 'https://zinnige-naam.westeurope.instances.azureml.ms/lab'}], 'type': 'computeinstance', 'created_on': None, 'provisioning_state': 'Succeeded', 'provisioning_errors': None, 'name': 'zinnige-naam', 'description': None, 'tags': None, 'properties': {}, 'print_as_yaml': True, 'id': '/subscriptions/7c50f9c3-289b-4ae0-a075-08784b3b9042/resourceGroups/mlops/providers/Microsoft.MachineLearningServices/workspaces/mlops-nathan/computes/zinnige-naam', 'Resource__source_path': None, 'base_path': '/Users/mctxr/MLOps-Les3', 'creation_context

We also need to setup the Environment to use for our Data Preparation scripts.
For this one, I call it "aml-Pillow", because we are mostly going to use the "Pillow" library to do the image processing.

It requires a "components/dataprep/conda.yaml" file, which I will show you where to find and copy just now. It's in the "azureml-2.0-automation/components/dataprep" directory, because it was later used in there, and not copied, just cut... Sorry!

In [14]:
from azure.ai.ml.entities import Environment
import os

custom_env_name = "aml-Pillow"

pipeline_job_env = Environment(
    name=custom_env_name,
    description="Custom environment for Image Processing (with Pillow)",
    tags={"Pillow": "10.0.1"},
    conda_file=os.path.join("components", "dataprep", "conda.yaml"),
    image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest",
)
pipeline_job_env = ml_client.environments.create_or_update(pipeline_job_env)

print(
    f"Environment with name {pipeline_job_env.name} is registered to workspace, the environment version is {pipeline_job_env.version}"
)

Environment with name aml-Pillow is registered to workspace, the environment version is 4


In [15]:
from azure.ai.ml import Input

## Data Prep

In [50]:
%%writefile {data_prep_src_dir}/dataprep.py
import os
import argparse
import logging
from glob import glob
from PIL import Image



def main():
    """Main function of the script."""

    # input and output arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--data", type=str, help="path to input data")
    parser.add_argument("--output_data", type=str, help="path to output data")
    args = parser.parse_args()


    print(" ".join(f"{k}={v}" for k, v in vars(args).items()))

    print("input data:", args.data)
    print("output folder:", args.output_data)

    output_dir = args.output_data
    size = (64, 64) # Later we can also pass this as a property


    for file in glob(args.data + "/*.jpg"):
        img = Image.open(file)
        img_resized = img.resize(size)

        # Save the resized image to the output directory
        output_file = os.path.join(output_dir, os.path.basename(file))
        img_resized.save(output_file)


if __name__ == "__main__":
    main()

Overwriting ./components/dataprep/dataprep.py


In [16]:
from azure.ai.ml import command
from azure.ai.ml import Input, Output

# This registers a component with the name "data_prep_image_resize"
# Which can then be used in the Pipeline editor of the Azure Portal
data_prep_component = command(
    name="data_prep_image_resize_live",
    display_name="Data preparation, Image Resizing",
    description="Reads a data asset of images and preprocesses them by resizing them to 64 to 64.",
    inputs={
        "data": Input(type="uri_folder"),
    },
    outputs={
        "output_data": Output(type="uri_folder", mode="rw_mount"),
    },
    # The source folder of the component
    code=os.path.join("components", "dataprep"),
    command="""python dataprep.py \
            --data ${{inputs.data}} \
            --output_data ${{outputs.output_data}} \
            """,
    environment=f"aml-Pillow@latest",
)

In [17]:
# Now we register the component to the workspace
data_prep_component = ml_client.create_or_update(data_prep_component.component)

# Create (register) the component in your workspace
print(
    f"Component {data_prep_component.name} with Version {data_prep_component.version} is registered"
)

[32mUploading dataprep (0.0 MBs): 100%|██████████| 1251/1251 [00:00<00:00, 18661.11it/s]
[39m



Component data_prep_image_resize_live with Version 2023-10-04-13-38-39-3593095 is registered


Now that it's registered, it's possible to start using it in the Pipelines.

Because it's fun to do it from the SDK, we can show you that. But first, check out the recording to see how it works in the Azure Portal too..

As you might've noticed in the recording, the designer is OK for a bit, but we can't really customize everything in there... That's why we want to take it a step further and automate it from the SDK in here...

In [39]:
from typing import List

In [19]:
# the dsl decorator tells the sdk that we are defining an Azure ML pipeline
from azure.ai.ml import dsl, Input, Output

@dsl.pipeline(
    compute="zinnige-naam",
    description="Custom data_prep pipeline",
)
def animal_images_preprocessing_pipeline(
    input_version: str, # Currently we don't use these version numbers, but we will use them later on.
    output_version: str,
):
    # using data_prep_function like a python call with its own inputs
    # These are the animals with the version name as a second item in the tuple
    animals = [
        ('pandas', "1"),
        ('cats', "1"),
        ('dogs', "1")
    ] # They are hardcoded in here, because we should give them from another component otherwise.
    
    jobs = {}
    for animal in animals:

        data_prep_job = data_prep_component(
            data=Input(
                type="uri_folder",
                path=f"azureml:{animal[0]}:{animal[1]}" 
            ),
        )
        
        output_name = animal[0] + "_resized"
        output_path = "azureml://subscriptions/7c50f9c3-289b-4ae0-a075-08784b3b9042/resourcegroups/mlops/workspaces/mlops-nathan/datastores/workspaceblobstore/paths/processed_animals/" + animal[0]

        data_prep_job.outputs.output_data = Output(
            type="uri_folder",
            path=output_path,
            name=output_name,
            mode="rw_mount"
        )

        jobs[animal[0]] = data_prep_job

    # a pipeline returns a dictionary of outputs
    # keys will code for the pipeline output identifier
    return {
        k: v.outputs.output_data for k,v in jobs.items()
    }

In [20]:

# Let's instantiate the pipeline with the parameters of our choice
pipeline = animal_images_preprocessing_pipeline()

By executing the cell below, you're scripting something to make the Data Prep Pipeline run. It will use the "dataprep.py" script, which is located in the "components/dataprep" directory.

It opened up your browser where you can see three different parallell runs. They will all take one of the datasets, and process it one by one, then upload it to the Datastore under the "dogs_resized" dataset.

In [21]:
import webbrowser

# submit the pipeline job
pipeline_job = ml_client.jobs.create_or_update(
    pipeline,
    # Project's name
    experiment_name="image_preprocessing_pipeline",
)
# open the pipeline in web browser
webbrowser.open(pipeline_job.studio_url)

Class IntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class ProtectionLevelSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseIntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.


True

## Train test split

Now that they are all resized in the cloud, we can try to split them up into a train and test set as well.

We will repeat the steps:
- Create the Component script
- Register the Component in Azure
- Use the Component in a Pipeline
- Run the Pipeline

In [53]:
%%writefile {data_prep_src_dir}/traintestsplit.py
import os
import argparse
import logging
from glob import glob
import math
import random

def main():
    """Main function of the script."""

    SEED = 42

    # input and output arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--datasets", type=str, nargs="+", help="All the datasets to combine")
    parser.add_argument("--training_data_output", type=str, help="path to training output data")
    parser.add_argument("--testing_data_output", type=str, help="path to testing output data")
    parser.add_argument("--split_size", type=int, help="Percentage to use as Testing data")
    args = parser.parse_args()

    print(" ".join(f"{k}={v}" for k, v in vars(args).items()))

    print("input data:", args.datasets)
    print("Training folder:", args.training_data_output)
    print("Testing folder:", args.testing_data_output)
    print("Split size:", args.split_size)

    train_test_split_factor = args.split_size / 100 # Alias
    datasets = args.datasets

    training_datapaths = []
    testing_datapaths = []


    for dataset in datasets:
        animal_images = glob(dataset + "/*.jpg")
        print(f"Found {len(animal_images)} images for {dataset}")

        ## Concatenate the names for the animal_name and the img_path. Don't put a / between, because the img_path already contains that
        ## animal_images = [(default_datastore, f'processed_animals/{animal_name}{img_path}') for img_path in animal_images] # Make sure the paths are actual DataPaths
    
        random.seed(SEED) # Use the same random seed as I use and defined in the earlier cells
        random.shuffle(animal_images) # Shuffle the data so it's randomized

        ## Testing images
        amount_of_test_images = math.ceil(len(animal_images) * train_test_split_factor) # Get a small percentage of testing images

        animal_test_images = animal_images[:amount_of_test_images]
        animal_training_images = animal_images[amount_of_test_images:]

        # Add them all to the other ones
        testing_datapaths.extend(animal_test_images)
        training_datapaths.extend(animal_training_images)

        print(testing_datapaths[:5])

        # Write the data to the output
        for img in animal_test_images:
            # Open the img, which is a string filepath, then save it to the args.testing_data_output directory
            with open(img, "rb") as f:
                with open(os.path.join(args.testing_data_output, os.path.basename(img)), "wb") as f2:
                    f2.write(f.read())

        for img in animal_training_images:
            # Open the img, which is a string filepath, then save it to the args.testing_data_output directory
            with open(img, "rb") as f:
                with open(os.path.join(args.training_data_output, os.path.basename(img)), "wb") as f2:
                    f2.write(f.read())

if __name__ == "__main__":
    main()

Overwriting ./components/dataprep/traintestsplit.py


In [22]:
from azure.ai.ml import command
from azure.ai.ml import Input, Output

data_split_component = command(
    name="data_split",
    display_name="Data Splitting to Train and Test",
    description="Reads a data asset of images and combines them into a training and testing dataset",
    # We want to give the datasets as a dynamic input ...
   inputs={
        "animal_1": Input(type="uri_folder"),
        "animal_2": Input(type="uri_folder"),
        "animal_3": Input(type="uri_folder"),
        "train_test_split_factor": Input(type="number"), # The percentage of the data to use as testing data, always a positive value
    },
    # ... and take the outputs as a dynamic output to override the training and testset locations.
    outputs={
        "training_data": Output(type="uri_folder", mode="rw_mount"),
        "testing_data": Output(type="uri_folder", mode="rw_mount"),
    },
    # The source folder of the component
    code=os.path.join("components", "dataprep"),
    command="""python traintestsplit.py \
            --datasets ${{inputs.animal_1}} ${{inputs.animal_2}} ${{inputs.animal_3}} \
            --split_size ${{inputs.train_test_split_factor}} \
            --training_data ${{outputs.training_data}} \
            --testing_data ${{outputs.testing_data}} \
            """,
    environment=f"aml-Pillow@latest",
)

In [23]:
# Now we register the component to the workspace
data_split_component = ml_client.create_or_update(data_split_component.component)

# Create (register) the component in your workspace
print(
    f"Component {data_split_component.name} with Version {data_split_component.version} is registered"
)

[32mUploading dataprep (0.0 MBs): 100%|██████████| 4155/4155 [00:00<00:00, 48507.49it/s]
[39m



Component data_split with Version 2023-10-04-14-12-52-9998659 is registered


In [24]:
# the dsl decorator tells the sdk that we are defining an Azure ML pipeline
from azure.ai.ml import dsl, Input, Output

@dsl.pipeline(
    compute="zinnige-naam",
    description="Custom data_prep pipeline",
)
def animal_images_traintest_split_pipeline(
    train_test_split: int, # Currently we don't use these version numbers, but we will use them later on.
    animal_1: Input,
    animal_2: Input,
    animal_3: Input,
):
    # using data_prep_function like a python call with its own inputs
    # These are the animals with the version name as a second item in the tuple

    # Combining arguments starting with "animals_" into a dictionary
    animals_args = {k: v for k, v in locals().items() if k.startswith("animals_")}

    # Create a component instance by calling the component factory
    data_split_job = data_split_component(
            animal_1=animal_1,
            animal_2=animal_2,
            animal_3=animal_3,
            train_test_split_factor=train_test_split
        )
    
    # Override the training data output and testing data output to a file named "trainingdata" and "testingdata
    data_split_job.outputs.training_data = Output(
        type="uri_folder",
        name="training_data",
        mode="rw_mount"
    )
    data_split_job.outputs.testing_data = Output(
        type="uri_folder",
        name="testing_data",
        mode="rw_mount"
    )


    # a pipeline returns a dictionary of outputs
    # keys will code for the pipeline output identifier
    return {
        "training_data": data_split_job.outputs.training_data,
        "testing_data": data_split_job.outputs.testing_data
    }

In [25]:

# Let's instantiate the pipeline with the parameters of our choice
version = "1" # We can choose which version of the resized_pandas it will use
animals = ["pandas", "cats", "dogs"]

# Apparently, we made a small mistake in the naming conventions, but we will ignore that for now, we can fix it later...
animals_datasets = {
    f"animal_{i+1}": Input(type="uri_folder", path=f"azureml:{animal}_resized:{version}")
    for i, animal in enumerate(animals)
}

print(animals_datasets)

train_test_pipeline = animal_images_traintest_split_pipeline(
    **animals_datasets,
    train_test_split=20
)

{'animal_1': {'type': 'uri_folder', 'path': 'azureml:pandas_resized:1'}, 'animal_2': {'type': 'uri_folder', 'path': 'azureml:cats_resized:1'}, 'animal_3': {'type': 'uri_folder', 'path': 'azureml:dogs_resized:1'}}


In [26]:
import webbrowser

In [27]:
# submit the pipeline job
train_test_pipeline_job = ml_client.jobs.create_or_update(
    train_test_pipeline,
    # Project's name
    experiment_name="image_preprocessing_pipeline",
)
# open the pipeline in web browser
webbrowser.open(train_test_pipeline_job.studio_url)

True

Check out how the pipeline looks now, we only take the "resized" parts, not the rest of the components, which already happened before. Later on, we will combine them of course!

## Training

We will quickly go over the following steps, because they are basically the same all over again.
- Create a component script
- Register the component in Azure
- Register an Environment if needed
- Start training

In [28]:
from azure.ai.ml.entities import Environment
import os

custom_env_name = "aml-Tensorflow-Pillow"

pipeline_job_env = Environment(
    name=custom_env_name,
    description="Custom environment for AI Training (with Pillow)",
    tags={"Pillow": "10.0.1", "Tensorflow": "2.4.1"},
    conda_file=os.path.join("components", "training", "conda.yaml"),
    image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest",
)
pipeline_job_env = ml_client.environments.create_or_update(pipeline_job_env)

print(
    f"Environment with name {pipeline_job_env.name} is registered to workspace, the environment version is {pipeline_job_env.version}"
)

Environment with name aml-Tensorflow-Pillow is registered to workspace, the environment version is 2


In [34]:
from azure.ai.ml import command
from azure.ai.ml import Input, Output

training_component = command(
    name="training",
    display_name="Training an AI model",
    description="Trains an AI model by inputting a lot of training and testing data.",
    inputs={
        "training_folder": Input(type="uri_folder"),
        "testing_folder": Input(type="uri_folder"),
        "epochs": Input(type="number") # The percentage of the data to use as testing data, always a positive value
    },
    outputs={
        "output_folder": Output(type="uri_folder", mode="rw_mount"),
    },
    # The source folder of the component
    code=os.path.join("components", "training"),
    command="""python train.py \
            --training_folder ${{inputs.training_folder}} \
            --testing_folder ${{inputs.testing_folder}} \
            --output_folder ${{outputs.output_folder}} \
            --epochs ${{inputs.epochs}} \
            """,
    environment=f"aml-Tensorflow-Pillow@latest",
)

In [35]:
# Now we register the component to the workspace
training_component = ml_client.create_or_update(training_component.component)

# Create (register) the component in your workspace
print(
    f"Component {training_component.name} with Version {training_component.version} is registered"
)

Component training with Version 2023-10-04-14-29-11-9827465 is registered


In [36]:
# the dsl decorator tells the sdk that we are defining an Azure ML pipeline
from azure.ai.ml import dsl, Input, Output

@dsl.pipeline(
    compute="zinnige-naam",
    description="Custom Animals Training pipeline",
)
def animals_training_pipeline(
    training_folder: Input, # Currently we don't use these version numbers, but we will use them later on.
    testing_folder: Input,
    epochs: int,
):

    training_job = training_component(
        training_folder=training_folder,
        testing_folder=testing_folder,
        epochs=epochs
    )
    
    # Let Azure decide a unique place everytime
    training_job.outputs.output_folder = Output(
        type="uri_folder",
        name="output_data",
        mode="rw_mount"
    )


    # a pipeline returns a dictionary of outputs
    # keys will code for the pipeline output identifier
    return {
        "output_data": training_job.outputs.output_folder,
    }

In [39]:

# Let's instantiate the pipeline with the parameters of our choice

# Woops, make sure to use the correct version number here!
training_pipeline = animals_training_pipeline(
    # Change these versions if you want to override the choices
    training_folder=Input(type="uri_folder", path=f"azureml:training_data:4"),
    testing_folder=Input(type="uri_folder", path=f"azureml:testing_data:14"),
    epochs=5
)

In [40]:
import webbrowser
# submit the pipeline job
training_pipeline_job = ml_client.jobs.create_or_update(
    training_pipeline,
    # Project's name
    experiment_name="training_pipeline",
)
# open the pipeline in web browser
webbrowser.open(training_pipeline_job.studio_url)

True

This pipeline ends here, we will not be chaining everything together yet, we will do that in the next Stage, where we will execute things from the CLI.