In [24]:
%load_ext autoreload
%autoreload 2

import datetime
import os
import sys
import json

import azure.storage.blob as azureblob
import azure.batch._batch_service_client as batch
import azure.batch.batch_auth as batch_auth
import azure.batch.models as batchmodels

module_path = os.path.abspath(os.path.join('../src'))
if module_path not in sys.path:
    sys.path.append(module_path)
from azbatch import main

from dotenv import load_dotenv
load_dotenv()

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


True

In [25]:
import azure.batch
azure.batch.__version__

'10.0.0'

In [26]:
start_time = datetime.datetime.now().replace(microsecond=0)

config = {
    "POOL_ID": "test_pool_NC12", # f"job_{start_time.strftime('%Y%m%d%H%M%S')}",
    "JOB_ID":  f"job_{start_time.strftime('%Y%m%d%H%M%S')}",
    "POOL_NODE_COUNT": 12,  # max 312 cores of NCpromo --> 26 x NC12 or 52 x NC6
    "POOL_VM_SIZE": "Standard_NC12_Promo",
    "TASK_SLOTS_PER_NODE": 1,  # keep <= cores per machine

    "BATCH_ACCOUNT_NAME": os.environ.get("_BATCH_ACCOUNT_NAME"),
    "BATCH_ACCOUNT_KEY": os.environ.get("_BATCH_ACCOUNT_KEY"),
    "BATCH_ACCOUNT_URL": os.environ.get("_BATCH_ACCOUNT_URL"),

    "CR_PASSWORD": os.environ.get("_CR_PASSWORD"),  # container registry
    
    "STORAGE_ACCOUNT_NAME": os.environ.get("_STORAGE_ACCOUNT_NAME"),
    "STORAGE_ACCOUNT_KEY": os.environ.get("_STORAGE_ACCOUNT_KEY"),
    
    "510_DLS_CONNECTION_STRING": os.environ.get("_510_DLS_CONNECTION_STRING"),
    "XCCTEST_CONNECTION_STRING": os.environ.get("_XCCTEST_CONNECTION_STRING")
}

print(f'working on batch account {config["BATCH_ACCOUNT_NAME"]}')

working on batch account 510adagpu


### Connect to batch & storage accounts

In [27]:
# Create a Batch service client. We'll now be interacting with the Batch
# service in addition to Storage
batch_client = batch.BatchServiceClient(
    credentials=batch_auth.SharedKeyCredentials(
        account_name=config["BATCH_ACCOUNT_NAME"], 
        key=config["BATCH_ACCOUNT_KEY"],
    ),
    batch_url=config["BATCH_ACCOUNT_URL"]
)

blob_client_xcctest = azureblob.BlockBlobService(connection_string=config["XCCTEST_CONNECTION_STRING"])
blob_client_510 = azureblob.BlockBlobService(connection_string=config["510_DLS_CONNECTION_STRING"])

### Create pool & job

In [28]:
# Create the pool that will contain the compute nodes that will execute the
# tasks.
if not batch_client.pool.exists(config['POOL_ID']):
    pool = main.create_pool(batch_client, config)
    print(f"Created pool {config['POOL_ID']}.")
else:
    print(f"Pool {config['POOL_ID']} already exists.")

Creating pool [test_pool_NC12]...
Created pool test_pool_NC12.


In [29]:
# Create the job that will run the tasks.
if not config['JOB_ID'] in [j.id for j in batch_client.job.list()]:
    main.create_job(batch_client, config)
    print(f"Created job {config['JOB_ID']}.")
else:
    print(f"Job {config['JOB_ID']} already exists.")

Creating job [job_20210714145356]...
Created job job_20210714145356.


## Adding tasks

### Container & storage settings

In [30]:
# common settings 
task_container_settings = batchmodels.TaskContainerSettings(
    image_name='ada510.azurecr.io/ada:latest',
    # ipc=host needed for pytorch to share memory 
    # https://discuss.pytorch.org/t/unable-to-write-to-file-torch-18692-1954506624/9990
    container_run_options='--rm --ipc=host'
)
# needed to create folders inside running container
admin_identity = batchmodels.UserIdentity(
    auto_user=batchmodels.AutoUserSpecification(
        scope='pool',
        elevation_level='admin',
    )
)
task_common_args = {
    "container_settings": task_container_settings,
    "user_identity": admin_identity,
}

upload_opts = batchmodels.OutputFileUploadOptions(
    upload_condition=batchmodels.OutputFileUploadCondition.task_success
)

In [31]:
# commonly used tokens & urls
adafiles_read_token = main.create_sas_token(blob_client_xcctest, "adafiles", ["read", "list"])
adafiles_write_token = main.create_sas_token(blob_client_xcctest, "adafiles", ["write"])
_510_read_token = main.create_sas_token(blob_client_510, "automated-damage-assessment", ["read", "list"])
adafiles_output_url = main.create_resource_url("xcctest", "adafiles", adafiles_write_token)

### Set data directory and read index

In [32]:
data_dir = "hurricane-dorian"  # relative to container
blob_client_xcctest.get_blob_to_path(container_name="adafiles",
                                     blob_name=data_dir+'/tile_index.geojson',
                                     file_path='tile_index.geojson')
with open('tile_index.geojson') as file:
    index = json.load(file)
print(f"index length: {len(index['features'])}")
for ind_ in index['features'][0:2]:
    print(ind_)

index length: 129
{'type': 'Feature', 'properties': {'tile': '12.1150.1730', 'pre-event': {'0': 'pre-event/1030010086A20400.tif', '1': 'pre-event/1030010095164900.tif'}, 'post-event': {'0': 'post-event/1050010018A3AC00.tif', '1': 'post-event/1050010018A3AD00.tif'}}, 'geometry': {'type': 'Polygon', 'coordinates': [[[-78.837890625, 26.902476886279814], [-78.837890625, 26.82407078047018], [-78.92578125, 26.82407078047018], [-78.92578125, 26.902476886279814], [-78.837890625, 26.902476886279814]]]}}
{'type': 'Feature', 'properties': {'tile': '12.1150.1731', 'pre-event': {'0': 'pre-event/1030010086A20400.tif', '1': 'pre-event/1030010095164900.tif'}, 'post-event': {'0': 'post-event/1050010018A3AC00.tif', '1': 'post-event/1050010018A3AD00.tif'}}, 'geometry': {'type': 'Polygon', 'coordinates': [[[-78.837890625, 26.82407078047018], [-78.837890625, 26.745610382199015], [-78.92578125, 26.745610382199015], [-78.92578125, 26.82407078047018], [-78.837890625, 26.82407078047018]]]}}


### Actual task specification

In [20]:
batch_name = datetime.datetime.now().strftime('%Y%m%d%H%M%S')  # necessary to match dependencies
unique_ids = [tile['properties']['tile'] for tile in index['features']]
tasks = []

# add tasks separately for each tile
for num_id, id_ in enumerate(unique_ids):
    
    num_id = num_id#+start_
    
    images_to_process = list(index['features'][num_id]['properties']['pre-event'].values()) + list(index['features'][num_id]['properties']['post-event'].values())
    images_to_process_resource_files = []
    for image in images_to_process:
        images_to_process_resource_files.append(batchmodels.ResourceFile(
            http_url=main.create_resource_url("xcctest", "adafiles", adafiles_read_token, container_path=f"{data_dir}/{image}"),
            file_path=f'{data_dir}/{image}'
        ))
        
    print(f'TASK {num_id}, {id_}')
    print(images_to_process)
    
    tasks += [
        
        # set up working directory and create raster mosaic (--> merged.tif)
        batchmodels.TaskAddParameter(
            id=f"setup-{batch_name}-{num_id}",
            depends_on=None,
            command_line=f'/bin/bash -c "setup-wd --data {data_dir} --index tile_index.geojson --id {id_} --dest raw"',
            resource_files=[
                batchmodels.ResourceFile(
                    http_url=main.create_resource_url("xcctest", "adafiles", adafiles_read_token, container_path=f"{data_dir}/tile_index.geojson"),
                    file_path='tile_index.geojson'
                )
            ] + images_to_process_resource_files,
            output_files=[
                batchmodels.OutputFile(
                    file_pattern="raw/**/*.tif",
                    destination=batchmodels.OutputFileDestination(
                        container=batchmodels.OutputFileBlobContainerDestination(
                            container_url=adafiles_output_url,
                            path=f"{id_}/raw",
                        )
                    ),
                    upload_options=upload_opts,
                )
            ],
            **task_common_args,
        ),

        # abd cover: create cover file with metadata of mini-tiles
        batchmodels.TaskAddParameter(
            id=f"cover-{batch_name}-{num_id}",
            depends_on=batchmodels.TaskDependencies(task_ids=[f"setup-{batch_name}-{num_id}"]),
            command_line=f'/bin/bash -c "abd cover --raster merged.tif --zoom 17 --out cover.csv"',
            resource_files=[
                batchmodels.ResourceFile(
                    http_url=main.create_resource_url("xcctest", "adafiles", adafiles_read_token, container_path=f"{id_}/raw/pre-event/merged.tif"),
                    file_path='merged.tif'
                )
            ],
            output_files=[
                batchmodels.OutputFile(
                    file_pattern="cover.csv",
                    destination=batchmodels.OutputFileDestination(
                        container=batchmodels.OutputFileBlobContainerDestination(
                            container_url=adafiles_output_url,
                            path=f"{id_}/cover.csv",
                        )
                    ),
                    upload_options=upload_opts,
                )
            ],
            **task_common_args,
        ),

        # abd tile: split tiles in mini-tiles
        batchmodels.TaskAddParameter(
            id=f"tile-{batch_name}-{num_id}",
            depends_on=batchmodels.TaskDependencies(task_ids=[f"cover-{batch_name}-{num_id}"]),
            command_line=f'/bin/bash -c "abd tile --raster merged.tif --zoom 17 --cover cover.csv --config config.toml --out images --format tif"',
            resource_files=[
                batchmodels.ResourceFile(
                    http_url=main.create_resource_url("xcctest", "adafiles", adafiles_read_token, container_path=f"{id_}/raw/pre-event/merged.tif"),
                    file_path='merged.tif'
                ),
                batchmodels.ResourceFile(
                    http_url=main.create_resource_url("xcctest", "adafiles", adafiles_read_token, "config.toml"),
                    file_path='config.toml'
                ),
                batchmodels.ResourceFile(
                    http_url=main.create_resource_url("xcctest", "adafiles", adafiles_read_token, container_path=f"{id_}/cover.csv"),
                    file_path='cover.csv'
                )
            ],
            output_files=[
                batchmodels.OutputFile(
                    file_pattern="images/**/*.tif",
                    destination=batchmodels.OutputFileDestination(
                        container=batchmodels.OutputFileBlobContainerDestination(
                            container_url=adafiles_output_url,
                            path=f"{id_}/images",
                        )
                    ),
                    upload_options=upload_opts,
                )
            ],
            **task_common_args,
        ),

        # abd predict: predict buildings on mini-tiles -- only runnable on a GPU instance !!!
        batchmodels.TaskAddParameter(
            id=f"predict-{batch_name}-{num_id}",
            depends_on=batchmodels.TaskDependencies(task_ids=[f"tile-{batch_name}-{num_id}"]),
            command_line=f'/bin/bash -c "abd predict --config config.toml --cover cover.csv --dataset {id_} --checkpoint neat-fullxview-epoch75.pth --out predictions --metatiles --keep_borders"',
            resource_files=[
                batchmodels.ResourceFile(
                    http_url=main.create_resource_url("xcctest", "adafiles", adafiles_read_token, "neat-fullxview-epoch75.pth"),
                    file_path='neat-fullxview-epoch75.pth'
                ),
                batchmodels.ResourceFile(
                    storage_container_url=main.create_resource_url("xcctest", "adafiles", adafiles_read_token),
                    blob_prefix=f"{id_}/"
                ),
                batchmodels.ResourceFile(
                    http_url=main.create_resource_url("xcctest", "adafiles", adafiles_read_token, "config.toml"),
                    file_path='config.toml'
                ),
                batchmodels.ResourceFile(
                    http_url=main.create_resource_url("xcctest", "adafiles", adafiles_read_token, container_path=f"{id_}/cover.csv"),
                    file_path='cover.csv'
                )
            ],
            output_files=[batchmodels.OutputFile(
                file_pattern="predictions/**/*.png",
                destination=batchmodels.OutputFileDestination(
                    container=batchmodels.OutputFileBlobContainerDestination(
                        container_url=adafiles_output_url,
                        path=f"{id_}/predictions",
                    )
                ),
                upload_options=upload_opts,
            )],
            **task_common_args,
        ),

        # abd vectorize: convert pixel-level predictions into polygons (.geojson)
        batchmodels.TaskAddParameter(
            id=f"vectorize-{batch_name}-{num_id}",
            depends_on=batchmodels.TaskDependencies(task_ids=[f"predict-{batch_name}-{num_id}"]),
            command_line=f'/bin/bash -c "abd vectorize --config config.toml --masks {id_}/predictions --out buildings.geojson --type Building"',
            resource_files=[
                batchmodels.ResourceFile(
                    storage_container_url=main.create_resource_url("xcctest", "adafiles", adafiles_read_token),
                    blob_prefix=f"{id_}/predictions/"
                ),
                batchmodels.ResourceFile(
                    http_url=main.create_resource_url("xcctest", "adafiles", adafiles_read_token, "config.toml"),
                    file_path='config.toml'
                )
            ],
            output_files=[
                batchmodels.OutputFile(
                    file_pattern="buildings.geojson",
                    destination=batchmodels.OutputFileDestination(
                        container=batchmodels.OutputFileBlobContainerDestination(
                            container_url=adafiles_output_url,
                            path=f"{id_}/buildings.geojson",
                        )
                    ),
                    upload_options=upload_opts,
                )
            ],
            **task_common_args,
        ),    
        
        # filter buildings
        batchmodels.TaskAddParameter(
            id=f"filter-buildings-{batch_name}-{num_id}",
            depends_on=batchmodels.TaskDependencies(task_ids=[f"vectorize-{batch_name}-{num_id}"]),
            command_line=f'/bin/bash -c "filter-buildings --data buildings.geojson --dest buildings-clean.geojson --waterbodies hydropolys.gpkg"',
            resource_files=[
                batchmodels.ResourceFile(
                    http_url=main.create_resource_url("xcctest", "adafiles", adafiles_read_token, container_path=f"{id_}/buildings.geojson"),
                    file_path='buildings.geojson'
                ),
                batchmodels.ResourceFile(
                    http_url=main.create_resource_url("xcctest", "adafiles", adafiles_read_token, "hydropolys.gpkg"),
                    file_path='hydropolys.gpkg'
                )
            ],
            output_files=[
                batchmodels.OutputFile(
                    file_pattern="buildings-clean.geojson",
                    destination=batchmodels.OutputFileDestination(
                        container=batchmodels.OutputFileBlobContainerDestination(
                            container_url=adafiles_output_url,
                            path=f"{id_}/buildings-clean.geojson",
                        )
                    ),
                    upload_options=upload_opts,
                )
            ],
            **task_common_args,
        ),
        
        # prepare for caladrius
        batchmodels.TaskAddParameter(
            id=f"prepare-data-{batch_name}-{num_id}",
            depends_on=batchmodels.TaskDependencies(task_ids=[f"filter-buildings-{batch_name}-{num_id}"]),
            command_line=f'/bin/bash -c "prepare-data --data {id_}/raw --buildings buildings-clean.geojson --dest caladrius"',
            resource_files=[
                batchmodels.ResourceFile(
                    storage_container_url=main.create_resource_url("xcctest", "adafiles", adafiles_read_token),
                    blob_prefix=f"{id_}/raw/"
                ),
                batchmodels.ResourceFile(
                    http_url=main.create_resource_url("xcctest", "adafiles", adafiles_read_token, container_path=f"{id_}/buildings-clean.geojson"),
                    file_path='buildings-clean.geojson'
                )
            ],
            output_files=[batchmodels.OutputFile(
                file_pattern="caladrius/**/*.png",
                destination=batchmodels.OutputFileDestination(
                    container=batchmodels.OutputFileBlobContainerDestination(
                        container_url=adafiles_output_url,
                        path=f"{id_}/caladrius",
                    )
                ),
                upload_options=upload_opts,
            )],
            **task_common_args,
        ),
        
        # run caladrius
        batchmodels.TaskAddParameter(
            id=f"run-caladrius-{batch_name}-{num_id}",
            depends_on=batchmodels.TaskDependencies(task_ids=[f"prepare-data-{batch_name}-{num_id}"]),
            command_line=f'/bin/bash -c "source ~/.bashrc && source activate cal && '\
                         f'python /caladrius/caladrius/run.py --run-name run --data-path {id_}/caladrius '\
                         f'--model-path best_model_wts.pkl '\
                         f'--checkpoint-path caladrius --output-type classification --inference"',
            resource_files=[
                batchmodels.ResourceFile(
                    storage_container_url=main.create_resource_url("xcctest", "adafiles", adafiles_read_token),
                    blob_prefix=f"{id_}/caladrius"
                ),
                batchmodels.ResourceFile(
                    http_url=main.create_resource_url("xcctest", "adafiles", adafiles_read_token, "best_model_wts.pkl"),
                    file_path=f'best_model_wts.pkl'
                )
            ],
            output_files=[batchmodels.OutputFile(
                file_pattern="caladrius/**/*",
                destination=batchmodels.OutputFileDestination(
                    container=batchmodels.OutputFileBlobContainerDestination(
                        container_url=adafiles_output_url,
                        path=f"{id_}/caladrius",
                    )
                ),
                upload_options=upload_opts,
            )],
            **task_common_args,
        ),
        
        # merge buildings and damage labels
        batchmodels.TaskAddParameter(
            id=f"final-layer-{batch_name}-{num_id}",
            depends_on=batchmodels.TaskDependencies(task_ids=[f"run-caladrius-{batch_name}-{num_id}"]),
            command_line=f'/bin/bash -c "final-layer --builds buildings-clean.geojson --damage damage-labels.txt --out buildings-predictions.geojson"',
            resource_files=[
                batchmodels.ResourceFile(
                    http_url=main.create_resource_url("xcctest", "adafiles", adafiles_read_token, container_path=f"{id_}/buildings-clean.geojson"),
                    file_path='buildings-clean.geojson'
                ),
                batchmodels.ResourceFile(
                    http_url=main.create_resource_url("xcctest", "adafiles", adafiles_read_token, container_path=f"{id_}/caladrius/run-input_size_32-learning_rate_0.001-batch_size_32/predictions/run-split_inference-epoch_001-model_inception-predictions.txt"),
                    file_path='damage-labels.txt'
                )
            ],
            output_files=[
                batchmodels.OutputFile(
                    file_pattern="buildings-predictions.geojson",
                    destination=batchmodels.OutputFileDestination(
                        container=batchmodels.OutputFileBlobContainerDestination(
                            container_url=adafiles_output_url,
                            path=f"{id_}/buildings-predictions.geojson",
                        )
                    ),
                    upload_options=upload_opts,
                )
            ],
            **task_common_args,
        )
    ]

# Add tasks to job
res = batch_client.task.add_collection(config['JOB_ID'], tasks)
# res.as_dict()

TASK 0, 12.1150.1730
['pre-event/1030010086A20400.tif', 'pre-event/1030010095164900.tif', 'post-event/1050010018A3AC00.tif', 'post-event/1050010018A3AD00.tif']
TASK 1, 12.1150.1731
['pre-event/1030010086A20400.tif', 'pre-event/1030010095164900.tif', 'post-event/1050010018A3AC00.tif', 'post-event/1050010018A3AD00.tif']


## Clean up Batch resources
### N.B. wait for tasks to be finished!!!

In [21]:
# delete current job
batch_client.job.delete(config['JOB_ID'])

In [22]:
# delete current pool
batch_client.pool.delete(config['POOL_ID'])

In [33]:
# delete all jobs
for job in batch_client.job.list():
    batch_client.job.delete(job.id)

# delete all pools
for pool in batch_client.pool.list():
    batch_client.pool.delete(pool.id)