# Distributed Search

This notebook and accompanying code shows how to run an Archai Network Architecture Search (NAS) using
an Azure Machine Learning Workspace with distributed partial training of models on a GPU cluster.
This example requires a storage account and Azure machine learning workspace specified in a config.json
file like this:

```json
{
    "subscription_id": "...",
    "resource_group": "...",
    "workspace_name": "...",
    "storage_account_key": "...",
    "storage_account_name": "..."
}
```

In [1]:
import sys
import os
import json
import archai.common.azureml_helper as aml_helper
from azure.ai.ml.entities import UserIdentityConfiguration
from archai.discrete_search.search_spaces.config import ArchParamTree, DiscreteChoice, ArchConfig

sys.path.append(os.path.realpath('scripts'))

# make sure we have a scripts dir for the code to run our jobs.
import os
scripts_dir = "./scripts"
data_scripts_dir = "./data_prep"
os.makedirs(scripts_dir, exist_ok=True)

config_file = "../.azureml/config.json"
config = json.load(open(config_file, 'r'))

for required_key in ['subscription_id', 'resource_group', 'workspace_name', 'storage_account_key', 'storage_account_name']:
    if not required_key in config:
        print(f"### Error: please add a {required_key} to {config_file}")

storage_account_key = config['storage_account_key']    
storage_account_name = config['storage_account_name']

print(f'Using storage account: {storage_account_name}')

# This is the AML experiment name 
experiment_name = 'mnist_test_run'

Using storage account: archaimnistmodels


Our [search.py](scripts/search.py) will use a ConfigSearchSpace based on the following parameters, the model defined in [model.py](scripts/model.py) will take various configurations and build different shape CNN models for each configuration as shown below. Each time you execute this cell it will generate a new random CNN model.

In [2]:
from model import MyModel

arch_param_tree = ArchParamTree({
    'nb_layers': DiscreteChoice(list(range(1, 13))),
    'kernel_size': DiscreteChoice([1, 3, 5, 7]),
    'hidden_dim': DiscreteChoice([16, 32, 64, 128])
})

arch_config = arch_param_tree.sample_config()
print(arch_config)

MyModel(arch_config)

ArchConfig({
    "nb_layers": 5,
    "kernel_size": 3,
    "hidden_dim": 64
})


MyModel(
  (model): Sequential(
    (0): Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU()
    (6): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (7): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (8): ReLU()
    (9): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (10): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (11): ReLU()
    (12): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (14): ReLU()
    (15): AdaptiveAvgPool2d(output_size=(1, 1))
    (16): Conv2d

Get a handle to the workspace

In [3]:
ml_client = aml_helper.get_aml_client_from_file(config_path=config_file)
print(f'Using workspace "{ml_client.workspace_name}" in resource group "{ml_client.resource_group_name}"')

Found the config file in: ..\.azureml\config.json


Using workspace "snpe-aml-workspace" in resource group "snpe-aml-rg"


Create the compute clusters that we need

In [4]:
# Create cpu cluster for running the search
cpu_compute_name = "nas-cpu-cluster-D14-v2"
aml_helper.create_compute_cluster(ml_client, cpu_compute_name, size="Standard_D14_v2", location="westus2")

# Create gpu cluster for running the search
gpu_compute_name = "nas-gpu-cluster-NC6"
aml_helper.create_compute_cluster(ml_client, gpu_compute_name, size="Standard_NC6", location="westus2", max_instances=8)

You already have a cluster named nas-cpu-cluster-D14-v2, we'll reuse it as is.
You already have a cluster named nas-gpu-cluster-NC6, we'll reuse it as is.


Create the AML Environment from our conda.yaml file.  This ensures our conda environment contains the Archai framework that we are using here.

In [5]:
archai_job_env = aml_helper.create_environment_from_file(ml_client, conda_file="conda.yaml", version='0.1.8')
environment_name = f"{archai_job_env.name}:{archai_job_env.version}"
print(environment_name)

Environment with name aml-archai is registered to workspace, the environment version is 0.1.8
aml-archai:0.1.8


Ensure our storage account is setup with a `models` blob store container for storing the final onnx models, a `datasets` blob store for our training dataset and a `status` storage table.

In [6]:
from store import ArchaiStore

# Register the datastore with AML
data_store_name = "datasets"
data_container_name = "datasets"
model_store_name = "models"
model_container_name = "models"
root_folder = experiment_name

# make sure the datasets container exists
store = ArchaiStore(storage_account_name, storage_account_key, blob_container_name=data_container_name)
store.upload_blob(root_folder, config_file)

# make sure the models container exists
store = ArchaiStore(storage_account_name, storage_account_key, blob_container_name=model_container_name)
store.upload_blob("config", config_file)

datastore_path = f'azureml://datastores/{data_store_name}/paths/{root_folder}'
results_path = f'azureml://datastores/{model_store_name}/paths/{root_folder}'

Register the `models` blob store container in the ML workspace `datastores`

In [7]:
from azure.ai.ml.entities import AzureBlobDatastore
from azure.ai.ml.entities._credentials import AccountKeyConfiguration

try:
    model_store = ml_client.datastores.get(model_store_name)
except:    
    model_store = AzureBlobDatastore(
        name=model_store_name,
        description="Datastore pointing to our models blob container.",
        account_name=storage_account_name,
        container_name=model_container_name,
        credentials=AccountKeyConfiguration(
            account_key=storage_account_key
        ),
    )

    ml_client.create_or_update(model_store)

Register the `datasets` blob store container in the ML workspace `datastores`

In [8]:
try:
    data_store = ml_client.datastores.get(data_store_name)
except:
    data_store = AzureBlobDatastore(
        name=data_store_name,
        description="Datastore pointing to our dataset container.",
        account_name=storage_account_name,
        container_name=data_container_name,
        credentials=AccountKeyConfiguration(
            account_key=storage_account_key
        ),
    )

    ml_client.create_or_update(data_store)

Create a pipeline command that prepares our training dataset using `prep_data_store.py`.  This pipeline will write the output do our `datasets` blob store so the training jobs can find the dataset there all ready to go.  That way each training job doesn't have to repeat the download and preparation of the dataset.

In [9]:
from azure.ai.ml import command
from azure.ai.ml import Input, Output

data_prep_component = command(
    name="data_prep2",
    display_name="Data preparation for training",
    description="Downloads the remote dataset to our blob store.",
    inputs= {
        "name": Input(type='string')
    },
    outputs= {
        "data": Output(type="uri_folder", path=datastore_path, mode="rw_mount")
    },

    # The source folder of the component
    code=data_scripts_dir,
    command="""python3 prep_data_store.py \
            --path ${{outputs.data}} \
            """,
    environment=environment_name,
)

Create a command that kicks off the Archai Search using `search.py`, it will take the dataset as input so it can pass it along to the training jobs later on.  It also produces some search output files, .png charts, and json results files which will also go into our `models` blob store under the folder named the same as our `experiment_name`.

In [10]:
hex_config = bytes(json.dumps(config), encoding='utf-8').hex()

partial_epochs = 0.1

output_path = results_path + '/' + experiment_name
fixed_args = f'--data_dir {datastore_path} ' + \
             f'--output_dir {results_path} ' + \
             f'--environment "{environment_name}" ' + \
             f'--experiment "{experiment_name}" ' + \
             f'--compute "{gpu_compute_name}" ' + \
             f'--config "{hex_config}" ' + \
             f'--partial_training_epochs {partial_epochs} '

print(fixed_args)

search_component = command(
    name="search",
    display_name="The Archai NAS search",
    description="Runs the NAS search algorithm.",    
    inputs= {
        "data": Input(type="uri_folder")
    },
    outputs= {
        "results": Output(type="uri_folder", path=output_path, mode="rw_mount")
    },
    code=scripts_dir,
    identity= UserIdentityConfiguration(),
    command='python3 search.py --local_output ${{outputs.results}} ' + \
        fixed_args,
    environment=environment_name,
)

--data_dir azureml://datastores/datasets/paths/mnist_test_run --output_dir azureml://datastores/models/paths/mnist_test_run --environment "aml-archai:0.1.8" --experiment "mnist_test_run" --compute "nas-gpu-cluster-NC6" --config "7b22737562736372697074696f6e5f6964223a202263386237663931332d363066622d343735392d613331302d666335363330653536663939222c20227265736f757263655f67726f7570223a2022736e70652d616d6c2d7267222c2022776f726b73706163655f6e616d65223a2022736e70652d616d6c2d776f726b7370616365222c2022696f745f7265736f757263655f67726f7570223a2022736e70652d6465766963652d6875622d7267222c2022696f745f6875625f6e616d65223a20224d7372536e7065446576696365487562222c20226c6f636174696f6e223a202277657374757332222c202273746f726167655f6163636f756e745f6b6579223a202279485a4459454c3045774a65754c54517747395047713867564f6d78777031593836686e54367239735732666659535967686f4c496a694973712f4353454e45766471785a78546b713872482b4153747548445944773d3d222c202273746f726167655f6163636f756e745f6e616d65223a20226172636861696d6e697

This creates a command that kicks off the full training on the final list of great models produced by the above search command.

In [11]:
from commands import make_training_pipeline_command
full_epochs = 12
hex_config = bytes(json.dumps(config), encoding='utf-8').hex()

full_training_component = make_training_pipeline_command(
    "Full Training", hex_config, scripts_dir, gpu_compute_name, experiment_name,
    results_path, environment_name, full_epochs)
                                                 

Create an AML pipeline with the data prep and search components piping the output of the data prep to the search component. Then finish with a full training job of the final top models using a dynamic subgraph.

In [12]:
from azure.ai.ml import dsl

@dsl.pipeline(
    compute=cpu_compute_name,
    description="Archai search pipeline",
)
def mnist_search_pipeline():
    
    data_prep_job = data_prep_component(
        name="MNIST"
    )

    search_job = search_component(
        data=data_prep_job.outputs.data
    )
    
    training_job = full_training_component(
        models=search_job.outputs.results,
        data=data_prep_job.outputs.data
    )

    return {
        "results": training_job.outputs.results
    }


Run the search pipeline

In [13]:
# submit the pipeline job
pipeline_job = ml_client.jobs.create_or_update(
    mnist_search_pipeline(),
    # Project's name
    experiment_name=experiment_name,
)

[32mUploading scripts (0.06 MBs): 100%|##########| 58820/58820 [00:01<00:00, 45827.84it/s]
[39m



Open the pipeline azure ML studio portal in your web browser (This works when you are running this notbook in VS code).

In [14]:
import webbrowser
webbrowser.open(pipeline_job.services["Studio"].endpoint)

job_name = pipeline_job.name
print(f'Started pipeline: {job_name}')

Started pipeline: stoic_answer_wdjb429gn8


In [17]:
# you can fetch any pipeline job again if you needed to continue this notebook later:
from azure.ai.ml import dsl, Input, Output
# job_name = 'stoic_answer_wdjb429gn8'
pipeline_job = ml_client.jobs.get(job_name)

## Plots

Take a look at the pareto curve plots.  This cell can be run multiple times and you will see updates as each iteration finishes.
You can even run this later after restarting the jupyter notebook because it is not dependent on variable state it is only
dependent on the persistent 'models' blob store.

In [None]:
from utils import get_results, show_results, download_best_models
from store import ArchaiStore
store = ArchaiStore(storage_account_name, storage_account_key, blob_container_name=model_container_name)

print(f'Fetching results for {experiment_name}...')
blob_path = root_folder + '/' + experiment_name
output_folder = experiment_name

get_results(store, blob_path, output_folder)
show_results(output_folder)
download_best_models(store, experiment_name, output_folder)
    

## Test ONNX Runtime Inference on the Best Model

When the search pipeline completes you should have a `models.json` file in the experiment_name output folder and you can use that to find the most accurate model and run it through the ONNX runtime to see if the ONNX inference gets the same accuracy.

In [None]:
# find the top model in the json results.
filename = os.path.join(output_folder, "models.json")
best_of_the_best = None
top_accuracy = 0
row = None
if not os.path.isfile(filename):
    raise Exception(f"Could not find {filename} file. Please wait for job to finish.")

results = json.load(open(filename, "r"))
models = results['models']
for a in models:
    if type(a) is dict and 'val_acc' in a:
        val_acc = a['val_acc']
        if val_acc > top_accuracy:
            top_accuracy = val_acc
            best_of_the_best = a['id']
            row = a

config = ArchConfig(row)
model = MyModel(config)

arch = f"nb_layers={model.nb_layers}, kernel_size={model.kernel_size}, hidden_dim={model.hidden_dim}"
print(f"The top model is {best_of_the_best} with accuracy {top_accuracy} and architecture {arch}")

blob_path = root_folder + '/' + best_of_the_best
model_output = os.path.join(output_folder, 'top_model')
get_results(store, blob_path, model_output)

model_path = os.path.join(model_output, 'model.onnx')


In [None]:
# Great, now let's test if this model works as advertised.
from archai.datasets.cv.mnist_dataset_provider import MnistDatasetProvider
import onnxruntime as ort
import numpy as np

dataset_provider = MnistDatasetProvider()
val_data = dataset_provider.get_val_dataset()
count = val_data.data.shape[0]
test = np.random.choice(count, 1)[0]
data = val_data.data[test]

import matplotlib.pyplot as plt

# check what the images look like.
plt.figure(figsize=(2,2))
plt.imshow(data, cmap='gray')
print(f'data has shape: {data.shape}')
plt.axis('off')
plt.show()

# Now run the ONNX runtime on this the validation set.
# You can change this to `CUDAExecutionProvider` if you have a GPU and have
# installed the CUDA runtime.
ort_sess = ort.InferenceSession(model_path, providers=['CPUExecutionProvider'])
for i in ort_sess.get_inputs():
    print(f'input: {i.name}, {i.shape}, {i.type}')
    
print(f'Testing {count} rows')
failed = 0
for i in range(val_data.data.shape[0]):
    data = val_data.data[i]    
    expected = int(val_data.train_labels[i])

    while len(data.shape) < 4:
        data = np.expand_dims(data, axis=0)
    outputs = ort_sess.run(None, {'input': data.astype(np.float32) / 255.0})
    result = outputs[0]
    index = np.argmax(result)
    label = val_data.classes[index]
    if expected != index:
        # print(f'### Failed: {expected} and got {label}')
        failed += 1
          
rate = (count - failed) * 100 / count
print(f"Failed {failed} out of {count} rows")
print(f'Inference pass rate is  {rate} %.')
print(f'How does this compare with the training validation accuracy of {top_accuracy}')
if np.isclose(rate, top_accuracy* 100, atol=0.1):
    print('Success! The model is working as expected.')
else:
    print('The onnx runtime is giving different results.')
