In [1]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = 'all' # default is ‘last_expr’

%load_ext autoreload
%autoreload 2

In [2]:
import azure.batch
azure.batch.__version__

'10.0.0'

In [3]:
import os
import json

from azure.batch import BatchServiceClient
from azure.batch.batch_auth import SharedKeyCredentials
from azure.batch.models import *
from azure.common.credentials import ServicePrincipalCredentials

# Azure Batch

Documentation
- https://github.com/Azure-Samples/batch-python-quickstart/blob/master/src/python_quickstart_client.py
- https://docs.microsoft.com/en-us/azure/batch/batch-docker-container-workloads#prefetch-images-for-container-configuration

TODO

- Turn `enable_auto_scale` on and set the appropriate `auto_scale_formula`. This way we can cap the maximum available nodes. https://docs.microsoft.com/en-us/azure/batch/batch-automatic-scaling

## Create a pool for each instance of the API

List all Batch supported images with their "capabilities" (e.g. "DockerCompatible", "NvidiaTeslaDriverInstalled"):
```
az batch pool supported-images list
```
with the pool information provided in additional parameters.

Listing all versions of a SKU of image:
```
az vm image list --all --publisher microsoft-dsvm
```

You may need to accept the terms of an image:
```
az vm image list --all --publisher <publisher>
```
to find the URN for the image you want to use, followed by:

```
az vm image terms accept --urn <corresponding-urn>
```

In [4]:
os.environ['BATCH_ACCOUNT_NAME'] = ''
os.environ['BATCH_ACCOUNT_URL'] = ''

os.environ['APP_CLIENT_ID'] = ''
os.environ['APP_CLIENT_SECRET'] = ''
os.environ['APP_TENANT_ID'] = ''

os.environ['REGISTRY_SERVER'] = '.azurecr.io'  # e.g. registryname.azurecr.io
os.environ['REGISTRY_USERNAME'] = ''
os.environ['REGISTRY_PASSWORD'] = ''
os.environ['REGISTRY_IMAGE_NAME'] = '.azurecr.io/tensorflow:1.14.0-gpu-py3' # login server/repository:tag

In [5]:
POOL_ID = 'internal_2'
assert len(POOL_ID) <= 64, 'pool_id has more than 64 characters'

POOL_NODE_COUNT = 1

POOL_VM_SIZE = 'Standard_NC6s_v3'  # https://docs.microsoft.com/en-us/azure/virtual-machines/ncv3-series

registry_server = os.environ['REGISTRY_SERVER']
registry_username = os.environ['REGISTRY_USERNAME']
registry_password = os.environ['REGISTRY_PASSWORD']
docker_image = os.environ['REGISTRY_IMAGE_NAME']

In [21]:
def print_batch_exception(batch_exception):
    """
    Prints the contents of the specified Batch exception.
    """
    print('-------------------------------------------')
    print('Exception encountered:')
    if batch_exception.error and \
            batch_exception.error.message and \
            batch_exception.error.message.value:
        print(batch_exception.error.message.value)
        if batch_exception.error.values:
            print()
            for mesg in batch_exception.error.values:
                print(f'{mesg.key}:\t{mesg.value}')
    print('-------------------------------------------')
    
def create_pool(batch_service_client, pool_id):
    """
    Create a pool with pool_id and the Docker image specified in the env variables.
    """
    image_ref = ImageReference(
        publisher="microsoft-azure-batch",
        offer="ubuntu-server-container",
        sku="16-04-lts",
        version="latest"  # URN: microsoft-azure-batch:ubuntu-server-container:16-04-lts:1.1.0
        # The Azure Batch container image only accepts 'latest' version
    )
    
    # Specify a container registry
    container_registry = ContainerRegistry(
        registry_server=registry_server,
        user_name=registry_username,
        password=registry_password
    )
    
    container_conf = ContainerConfiguration(
        container_image_names = [docker_image],
        container_registries =[container_registry]
    )
            
    vm_config = VirtualMachineConfiguration(
        image_reference=image_ref,
        container_configuration=container_conf,
        node_agent_sku_id="batch.node.ubuntu 16.04"
    )
    
    new_pool = PoolAddParameter(
        id=POOL_ID,
        display_name=POOL_ID,
        
        vm_size=POOL_VM_SIZE,
        target_dedicated_nodes=POOL_NODE_COUNT, # we only used dedicated nodes
        
        virtual_machine_configuration=vm_config
    )
    batch_service_client.pool.add(new_pool)

def create_job():
    pass

def create_task():
    """
    All Tasks should be idempotent as they may need to be retried due to a recovery operation.
    """
    pass

In [10]:
account_url = os.environ['BATCH_ACCOUNT_URL']

app_client_id = os.environ['APP_CLIENT_ID']
app_client_secret = os.environ['APP_CLIENT_SECRET']
app_tenant_id = os.environ['APP_TENANT_ID']

credentials = ServicePrincipalCredentials(
    client_id=app_client_id,
    secret=app_client_secret,
    tenant=app_tenant_id,
    resource="https://batch.core.windows.net/"
)

# if using the Batch quota system, use https://docs.microsoft.com/en-us/python/api/azure-batch/azure.batch.batch_auth.sharedkeycredentials?view=azure-python
# to authenticate instead of the service principal is also okay.

batch_client = BatchServiceClient(credentials=credentials, batch_url=account_url)

In [11]:
try:
    create_pool(batch_client, POOL_ID)
except BatchErrorException as e:
    print_batch_exception(e)
    raise

## Submitting a job

Job is what we have been referring to as Requests. Each shard corresponds to a Task.

The Azure Batch service sets these environment variables on the compute nodes:

- AZ_BATCH_JOB_ID

- AZ_BATCH_TASK_ID
- AZ_BATCH_TASK_DIR
- AZ_BATCH_TASK_WORKING_DIR - currently running task has read/write access to this directory

In [12]:
job_id = 'test_docker0'

In [13]:
# job id is the request id in the old API context

job = JobAddParameter(
    id=job_id,
    pool_info=PoolInformation(pool_id=POOL_ID),
)

batch_client.job.add(job)

## Submit tasks to the job (the shards)

In [14]:
command = """/bin/sh -c "python /app/score.py" """

task = TaskAddParameter(
    id='task_{}'.format(0),
    command_line=command,
    container_settings=TaskContainerSettings(
        image_name=docker_image,
        working_directory='taskWorkingDirectory'
    )
)

In [17]:
type(task)

azure.batch.models._models_py3.TaskAddParameter

In [16]:
batch_client.task.add(job_id, task)

## Monitoring a job

Optimization: remember which tasks have already Completed so that we do not repeatedly query for their status.

Documentation: https://docs.microsoft.com/en-us/azure/batch/batch-efficient-list-queries

In [18]:
tasks = batch_client.task.list(job_id)

In [20]:
tasks = [task for task in tasks]

In [22]:
type(tasks[0])

azure.batch.models._models_py3.CloudTask

In [24]:
incomplete_tasks = [task for task in tasks if
                            task.state != TaskState.completed]

In [25]:
incomplete_tasks

[]