# Introduction

This tutorial is a hands-on introduction to the Rescale HTC framework. The framework is applied to solve a real-world use case, covers most of the Rescale HTC API, and discusses pros and cons of alternative solutions. Best practices and rationale behind them are highlighted. The reader is advised to go through the entire interactive tutorial step-by-step.

The tutorial supplements the two other documents forming the Rescale HTC documentation: *The Rescale HTC Manual* and *The HTCCTL User Guide* (contact your Rescale representative to get access to these documents).

# Problem setting

We have a set of 5000 random images. The goal is to find an image that has the most faces in it. To accomplish this, we detect faces in each image, draw bounding boxes around them, and calculate the total number of faces. Then, we aggregate results and display the image with the most faces.

The validation set of the [Common Objects in Context](https://cocodataset.org/#download) dataset (5000 images) is used as the input image set.

## Face recognition algorithm

For face recognition, we use the [Cascade Classification](https://docs.opencv.org/2.4/modules/objdetect/doc/cascade_classification.html) as implemented in [OpenCV](https://opencv.org/). Details of the face detection implementation are omitted. The implementation is encapsulated in the `face_detect` function of the [`face_tagger.py`](face_tagger/face_tagger.py). The function takes image path as input and, if faces are detected, writes two files: `{image}_tagged.dat` file containing the face count, and a `{image}_tagged.jpg` file with bounding boxes.

# Solution

A typical workflow for mapping a computational problem to Rescale HTC consists of the following steps:

* Design work distribution approach
* Implement and containerize application code
* Create an HTC Project
* Publish the container image
* Create an HTC Task
* Populate storage with inputs
* Submit HTC Jobs
* Aggregate results
* Clean up resources

The following sections implement each of these steps.

## Distributing work

Our approach to work distribution and storage interaction will affect our application code implementation, HTC project setup and submission. Therefore, we need to carefully design our work distribution approach.

In our use case, we have 5000 images that need tagging. Our local run demonstrated that single detection, including input/output, takes about ~0.12 seconds. A single run for the entire image library would take about ~10 minutes. This may not be a good problem for HTC, but let's imagine that the full library has 5 million images. Here, we use a smaller input set to validate our design.

It is a good practice to implement and test the entire workflow on a subset of inputs.

The first thing we need to think about is where are we going to store inputs and outputs. Rescale HTC gives us an option to use project shared storage or task storage. Since our input image library is immutable, let's put it in project shared storage. We will use task storage to store results generated by jobs.

The face detection algorithm used by our application can be parameterized by the scaling factor. We may at some point want to find the optimal value for this parameter. By using separate input and output storage, we can clearly differentiate between results obtained for different parameters. All experiments use the same input library (project shared storage), tasks encapsulate runs for different parameters. The task name can then carry contextual information, and task storage will keep results that are not mixed with other experiments.

Next, we need to decide how are we going to split work between tasks. In an extreme case, we could specify image names as inputs to each job with `batchSize=1`. Submitting 5000 individual jobs prevents the Rescale HTC runtime from maximizing throughput. We should always think in terms of batches. Each job in a batch is aware of its `index`. We can use this `index` and let the job itself discover a chunk of work to process. We have several options here.

Furthermore, we could split the list of image names into chunks, and store these for each job `index` before submission. Multiple `image_list.{index}` files could then be pushed to either project or task storage. A job instance would then fetch the image list using its `index`. Such files could have the following form:

```
$ cat image_list.1
000000581357.jpg
000000581482.jpg
000000581615.jpg
000000581781.jpg
```

The downside of this approach is that we need to regenerate the `image_list.{index}` files, every time we decide to change the batch size.

An alternative solution would be to determine the chunk during runtime. Each job could list files in the project shared storage, sort them, and take a chunk of the resulting list based on its `index`. This is a valid approach, but it may go wrong if project storage becomes polluted.

We will proceed with an approach where a file containing a list of all image names is uploaded to project shared storage. The name of this file is then passed to the job. A job fetches this file and determines its chunk based on `index` within a batch.

## Containerization

The Docker image contains executable code and its dependencies required to run the task.

Our [Dockerfile](docker/Dockerfile) uses the official `python:3.10` container as a base, installs a few library dependencies and required Python modules, including the Rescale `htcctl` utility for interacting with object storage.

The image also includes the [face_tagger.py](face_tagger/face_tagger.py) script, which implements the detection pipeline. We will discuss its structure later in this tutorial.

Make sure your Docker Desktop is running. The following command should execute without error.

In [None]:
! docker ps

Before publishing the container to our private HTC repository, it is always a good idea to build and test it locally. To build the image, execute the following (may take a couple of minutes).

In [None]:
! docker build -f docker/Dockerfile -t rescale/opencv-htc-tutorial .

Now let's detect some faces using our container image.

In [None]:
! docker run --rm -v $(pwd)/face_tagger/:/data rescale/opencv-htc-tutorial \
    python /opt/face_tagger.py /data/test_images/2_faces.jpg
! cat face_tagger/test_images/2_faces.jpg_tagged.dat

Our script detected 2 faces and output this information both, in the console,
and in the `2_faces.jpg_tagged.dat` text file. Detected face regions are
viusalised in the `2_faces.jpg_detect.jpg` file.

In [None]:
from IPython import display

display.Image("face_tagger/test_images/2_faces.jpg_tagged.jpg")

## The Rescale HTC API

The API resources we are going to interact with are documented with [OpenAPI specification](https://htc.rescale.com/api-docs/ogrgmNBE+EiA4WoxiZRnHw==/#/). Use this documentation as the primary source of truth. It is always up to date and provides in-depth descriptions of models and operations. Refer to *The Rescale HTC Manual* for higher level explanations.

## Authentication and authorization

The Rescale API authorizes requests using an *API Key*. This key is unique per User, per Workspace. A give user may have multiple *API Keys* active - one per Workspace.

To generate an *API Key*, go to the API section of the User Profile.

![](README.assets/user_profile_apikey.png)

> NOTE: Remember that your *API Key* is a secret. Never store it in a code repository. Never share it with your colleagues. If you realize that your *API Key* was exposed - delete it in the user profile and generate a new key.

Rescale tools like [Rescale CLI](https://rescale.com/documentation/main-2/rescale-advanced-features/rescale-cli-1-1-x/setting-up-rescale-cli-110/) look for the `apiconfig` authorization configuration file in users' home directory.

```
$HOME/.config/rescale/apiconfig             # Linux
%USERPROFILE%\config\rescale\apiconfig      # Windows
```

We will use this convention to store our credentials. Create the `apiconfig` text file in the aforementioned location and fill it with the following lines

```
[default]
apibaseurl = https://platform.rescale.com
apikey = 79aaabb6477ffdddee56587697777abbbc66c6c9

```

The `config` module extracts the Rescale API Key from the `apiconfig` file.

In [None]:
import config

_, api_key = config.get_profile()

The Rescale HTC Bearer token has an expiration time of 6h. Depending on use case, it may be sensible to get a new token before each call. This especially applies to polling for job status of long-running job batches.

Before obtaining the token, let's import all modules that will be used in the tutorial.

In [None]:
import copy
import datetime
import os
import time
from pprint import pprint

import jwt
import pandas as pd
import plotly.express as px
import requests

The following call to the [`/auth/token`](https://htc.rescale.com/api-docs/ogrgmNBE+EiA4WoxiZRnHw==/#/Token%20Resource/get_auth_token) endpoint returns an HTC Bearer token. Decoded `jwt` token contents are displayed for reference. Note the `groups` section, which lists roles associated with the owner of an HTC Bearer token.

In [None]:
token_res = requests.get(
    "https://htc.rescale.com/auth/token",
    headers={"Authorization": f"Token {api_key}"},
)
token_res.raise_for_status()

decoded_token = jwt.decode(
    token_res.json()["tokenValue"], options={"verify_signature": False}
)
pprint(decoded_token, indent=4)
HTC_TOKEN = token_res.json()["tokenValue"]

## Create a project

A project can use multiple regions and cloud providers to get higher throughput. Calling the [`/regions`](https://htc.rescale.com/api-docs/ogrgmNBE+EiA4WoxiZRnHw==/#/Provider%20Resource/get_htc_regions) endpoint gives us available regions are supported platforms.

In [None]:
regions_res = requests.get(
    "https://htc.rescale.com/api/v1/htc/regions",
    headers={"Authorization": f"Bearer {HTC_TOKEN}"},
)
regions_res.raise_for_status()

print(
    *[
        f"{r['region']}: {', '.join(r['supportedArchitectures'])}"
        for r in regions_res.json()
    ],
    sep="\n",
)

The above regions are compute regions. We can also find out, for which regions, storage containers have been enabled by calling the [`/storage`](https://htc.rescale.com/api-docs/ogrgmNBE+EiA4WoxiZRnHw==/#/Storage%20Resource/get_htc_storage) endpoint.

In [None]:
storage_res = requests.get(
    "https://htc.rescale.com/api/v1/htc/storage",
    headers={"Authorization": f"Bearer {HTC_TOKEN}"},
)
storage_res.raise_for_status()

print(
    *[
        f"{r['region']}: {r['storageName']}"
        for r in storage_res.json()
    ],
    sep="\n",
)

To create a new project, we call the [`/projects`](https://htc.rescale.com/api-docs/ogrgmNBE+EiA4WoxiZRnHw==/#/Project%20Resource/get_htc_projects) endpoint. For simplicity, we chose to create a project in a single region that supports `AARCH64` and `X86` architectures, and has storage defined.

Note that the `projectName` and `projectDescription` are not required to be unique. Every call to this endpoint will create a new project with unique `projectId`.

In [None]:
REGION = "AWS_EU_WEST_1"

payload = {
    "projectName": "htc-tutorial",
    "projectDescription": "Tutorial project",
    "regions": [ REGION ]
}

project_res = requests.post(
    "https://htc.rescale.com/api/v1/htc/projects",
    headers={"Authorization": f"Bearer {HTC_TOKEN}"},
    json=payload,
)
project_res.raise_for_status()
pprint(project_res.json(), indent=4)

PROJECT_ID = project_res.json()["projectId"]

## Publish Docker image

By creating a new project, we also created an associated container registry. Let's [create a container image repository](https://htc.rescale.com/api-docs/ogrgmNBE+EiA4WoxiZRnHw==/#/Container%20Registry%20Resource/post_htc_projects__projectId__container_registry_repo__repoName_). Note that if a repository already exists, the response property `successfullyCreated` will be set to `false`.

In [None]:
REPO_NAME = "opencv-htc-tutorial"
IMAGE_TAG = "v1"

repo_res = requests.post(
    f"https://htc.rescale.com/api/v1/htc/projects/{PROJECT_ID}/container-registry/repo/{REPO_NAME}",
    headers={"Authorization": f"Bearer {HTC_TOKEN}"},
)
repo_res.raise_for_status()
pprint(repo_res.json(), indent=4)

REGISTRY_URI = repo_res.json()["registryURI"]

Before we can push images to the project's container registry, we need to [get the container registry token](https://htc.rescale.com/api-docs/ogrgmNBE+EiA4WoxiZRnHw==/#/Container%20Registry%20Resource/get_htc_projects__projectId__container_registry_token).

In [None]:
docker_token_res = requests.get(
    f"https://htc.rescale.com/htc/projects/{PROJECT_ID}/container-registry/token",
    headers={"Authorization": f"Bearer {HTC_TOKEN}"},
)
docker_token_res.raise_for_status()

DOCKER_TOKEN = docker_token_res.text

We can now login with the Docker registry.

In [None]:
repo_netloc = REGISTRY_URI.split("/")[0]
! echo $DOCKER_TOKEN | docker login --username AWS --password-stdin $repo_netloc

Let's build our Docker image for both architectures supported in our project region (docker uses  architecture labels: `amd64` and `arm64`, that differ from region labels). Cross compilation may be very slow. In production environments, where support for both `amd64` and `arm64` is required, it may make sense to pre-build required images using internal CI/CD systems which execute builds on build agents that match target architectures. The [Docker Buildx](https://docs.docker.com/build/building/multi-platform/) plugin automates multi-platform image creation.

In [None]:
! docker buildx create --name htcbuilder --use --bootstrap
! docker buildx build --platform linux/amd64,linux/arm64 \
    -f docker/Dockerfile -t $REGISTRY_URI$REPO_NAME:$IMAGE_TAG --push .

We can query the repository to check whether the manifest list for our images shows support for both architectures.

In [None]:
! docker manifest inspect $REGISTRY_URI$REPO_NAME:$IMAGE_TAG

We can also [get all container images](https://htc.rescale.com/api-docs/ogrgmNBE+EiA4WoxiZRnHw==/#/Container%20Registry%20Resource/get_htc_projects__projectId__container_registry_images) registered in the project's container registry. We should see a tagged image `opencv-htc-tutorial:v1`.

In [None]:
images_res = requests.get(
    f"https://htc.rescale.com/api/v1/htc/projects/{PROJECT_ID}/container-registry/images",
    headers={"Authorization": f"Bearer {HTC_TOKEN}"},
)
images_res.raise_for_status()
pprint(images_res.json(), indent=4)


## Upload input files

The image library we will use as input is publicly available. Let's download and extract it.

In [None]:
! curl -O http://images.cocodataset.org/zips/val2017.zip
! unzip -qo val2017.zip

There are two ways of interacting with the Rescale HTC cloud storage. The first one utilizes the `htcctl` command line tool, which abstracts away cloud provider-specific details and simplifies file uploads/downloads in a multi cloud setting. This should be the default choice for most cases.

The second one uses cloud provider native tools that can be used with cloud provider-specific credentials and storage properties exposed by the HTC API. This should be used only in special cases.

As discussed at the beginning of the tutorial, we want to upload the input dataset to the project shared storage. Note that `htcctl` keeps path to files in uploaded object key name. To strip the `val2017/` from object key name - then we need to invoke the command from within the `val2017/` directory. Refer to *The HTCCTL User Guide* for more information on `htcctl`.

In [None]:
%pushd val2017
! RESCALE_API_TOKEN=$api_key \
    htcctl --project-shared --project-id $PROJECT_ID \
        upload *
%popd

Following our approach to work distribution, let's create a file listing all image file names and upload it to the project shared storage. 

In [None]:
! ls -1 val2017/ > image_list.dat
! RESCALE_API_TOKEN=$api_key \
    htcctl --project-shared --project-id $PROJECT_ID \
        upload image_list.dat

## Create Task

Now that we have our input files in the project's object storage, we can proceed with [creating a task](https://htc.rescale.com/api-docs/ogrgmNBE+EiA4WoxiZRnHw==/#/Task%20Resource/post_htc_projects__projectId__tasks).

In [None]:
payload = {
    "taskName": "htc-task-from-htctutorial",
    "taskDescription": "HTC Task from HTC Tutorial",
}

task_res = requests.post(
    f"https://htc.rescale.com/api/v1/htc/projects/{PROJECT_ID}/tasks",
    headers={"Authorization": f"Bearer {HTC_TOKEN}"},
    json=payload,
)
task_res.raise_for_status()
pprint(task_res.json(), indent=4)
TASK_ID = task_res.json()["taskId"]


## Simulating HTC runtime context locally 

Rescale HTC runtime injects environmental variables that establish a context for a job being executed. It is possible to set these locally, so we can troubleshoot and resolve issues without having to wait for a result of a job submission. We will use variables set by the previous operations and use their values to populate `RESCALE_HTC_` variables. The `HTC_INPUT_IMAGE` is a custom variable expected by our application. The `RESCALE_HTC_` variables are used by the `htcctl` tool. We are going to pull the docker image directly from our project's docker registry.

> NOTE: The actual token injected into a container during runtime is a Job Bearer Token, which has a reduced set of privileges compared to the User Bearer Token. Consult the `Job Bearer Tokens` section of `The Rescale HTC Manual`.

In [None]:
! docker pull $REGISTRY_URI$REPO_NAME:$IMAGE_TAG
! docker run --rm \
    -e RESCALE_HTC_PROJECT_ID=$PROJECT_ID \
    -e RESCALE_HTC_TASK_ID=$TASK_ID \
    -e RESCALE_HTC_BEARER_TOKEN=$HTC_TOKEN \
    -e HTC_INPUT_IMAGE=true \
        $REGISTRY_URI$REPO_NAME:$IMAGE_TAG python /opt/face_tagger.py 000000367818.jpg

Listing objects in the task storage should yield two result files (tagged image and a text file with face count).

In [None]:
! RESCALE_API_TOKEN=$api_key \
    htcctl --project-id $PROJECT_ID --task-id $TASK_ID \
        list

We can now download the tagged image and display it.

In [None]:
RESULT_FILE = "000000367818.jpg_tagged.jpg"

! RESCALE_API_TOKEN=$api_key \
    htcctl --project-id $PROJECT_ID --task-id $TASK_ID \
        download $RESULT_FILE

display.Image(RESULT_FILE)

## Create and run jobs

Before submitting a batch of jobs, it is a good practice to [submit a single job](https://htc.rescale.com/api-docs/ogrgmNBE+EiA4WoxiZRnHw==/#/Job%20Resource/post_htc_projects__projectId__tasks__taskId__jobs_batch) of `batchSize=1` just to confirm that everything works fine in the context of the Rescale HTC runtime.

In [None]:
payload = [ {
    "jobName": "Test HTC tutorial job",
    "batchSize": 1,
    "htcJobDefinition": {
        "imageName": "opencv-htc-tutorial:v1",
        "maxVCpus": 1,
        "maxMemory": 128,
        "commands": ["python", "/opt/face_tagger.py"],
        "envs": [{"name": "HTC_INPUT_IMAGE", "value": "true"}],
        "execTimeoutSeconds": 5000,
    },
} ]

batch_res = requests.post(
    f"https://htc.rescale.com/api/v1/htc/projects/{PROJECT_ID}/tasks/{TASK_ID}/jobs/batch",
    headers={"Authorization": f"Bearer {HTC_TOKEN}"},
    json=payload,
)
batch_res.raise_for_status()
pprint(batch_res.json(), indent=4)

PARENT_JOB_ID = batch_res.json()[0]["parentJobId"]


There are a couple of approaches to job status tracking. We will start from polling for [job instance events](https://htc.rescale.com/api-docs/ogrgmNBE+EiA4WoxiZRnHw==/#/Job%20Resource/get_htc_projects__projectId__tasks__taskId__jobs__jobId__events). We want to poll for terminal states `FAILED` or `SUCCEEDED`. Job statuses are refreshed every 30 seconds, therefore, polling with higher frequency does not make sense.

In [None]:
def poll_for_status_events(htc_token, project_id, task_id, parent_job_id, batch_index):
    while True:
        events_res = requests.get(
            f"https://htc.rescale.com/api/v1/htc/projects/{project_id}/tasks/{task_id}/jobs/{parent_job_id}:{batch_index}/events",
            headers={"Authorization": f"Bearer {htc_token}"},
        )
        events_res.raise_for_status()

        statuses = [s["status"] for s in events_res.json()["items"]]
        if "FAILED" in statuses or "SUCCEEDED" in statuses:
            break

        print("Waiting 30s for job to complete...")
        time.sleep(30)

    pprint(
        [
            s
            for s in events_res.json()["items"]
            if s["status"] in ("FAILED", "SUCCEEDED")
        ][0],
        indent=4,
    )


poll_for_status_events(HTC_TOKEN, PROJECT_ID, TASK_ID, PARENT_JOB_ID, 0)


Our job reached the `FAILED` terminal state and exited with non-zero `exitCode`. To aid troubleshooting, let's [fetch the job logs](https://htc.rescale.com/api-docs/ogrgmNBE+EiA4WoxiZRnHw==/#/Job%20Resource/get_htc_projects__projectId__tasks__taskId__jobs__jobId__logs).

In [None]:
def get_logs(htc_token, project_id, task_id, parent_job_id, batch_index):
    logs_res = requests.get(
        f"https://htc.rescale.com/api/v1/htc/projects/{project_id}/tasks/{task_id}/jobs/{parent_job_id}:{batch_index}/logs",
        headers={"Authorization": f"Bearer {htc_token}"},
    )
    logs_res.raise_for_status()

    for i in logs_res.json()["items"]:
        print(f"{i['timestamp']}\t{i['message']}")


get_logs(HTC_TOKEN, PROJECT_ID, TASK_ID, PARENT_JOB_ID, 0)


Logs indicate that the application expected an additional command line argument. If the `HTC_INPUT_IMAGE` environmental variable is set, the application expects image name as an argument. Let's fix the `htcJobDefinition` and extend `commands` with image name.

In [None]:
fixed_payload = copy.deepcopy(payload)
fixed_payload[0]["htcJobDefinition"]["commands"].append("000000367818.jpg")

fixed_batch_res = requests.post(
    f"https://htc.rescale.com/api/v1/htc/projects/{PROJECT_ID}/tasks/{TASK_ID}/jobs/batch",
    headers={"Authorization": f"Bearer {HTC_TOKEN}"},
    json=fixed_payload,
)
fixed_batch_res.raise_for_status()

PARENT_JOB_ID = fixed_batch_res.json()[0]["parentJobId"]

poll_for_status_events(HTC_TOKEN, PROJECT_ID, TASK_ID, PARENT_JOB_ID, 0)
get_logs(HTC_TOKEN, PROJECT_ID, TASK_ID, PARENT_JOB_ID, 0)


Now for the real thing. We want to submit a batch of jobs that will process the full set of 5000 images. When splitting the workload into batches, we need to consider that a single batch is limited to 1000 job instances. This limitation allows the Rescale HTC runtime to maximize throughput by distributing jobs across cloud providers and regions. Our problem is small enough to fit into a single batch of jobs. The job submission endpoint accepts an array (of max size of 100) of batches allowing for creation of 100000 job instances with a single API call.

As previously mentioned, we will utilize the `AWS_BATCH_JOB_ARRAY_INDEX` value injected by runtime to determine the chunk of images to be processed by a job instance (the `AWS_` prefixed envvar will also work for other CSPs, there is a limitation on the AWS side that forces us to use the `AWS_` prefixed name across all clouds). To allow jobs in a batch to determine the chunk of work to process at runtime, we will also inject our user-defined value `BATCH_JOB_ARRAY_SIZE` equal to the size of the batch. Note that environmental variables starting with `AWS_` are reserved and cannot be defined by users (`AWS_BATCH_JOB_ARRAY_SIZE` would not work).

Before we launch the full job, it is a good practice to check if a run, simulating HTC environment will succeed. With batch size of 1000 and 5000 total images - we should have 5 images processed by a single run. Job output should show 5 images being downloaded.

In [None]:
! docker run --rm \
    -e RESCALE_HTC_BEARER_TOKEN=$HTC_TOKEN \
    -e RESCALE_HTC_PROJECT_ID=$PROJECT_ID \
    -e RESCALE_HTC_TASK_ID=$TASK_ID \
    -e AWS_BATCH_JOB_ARRAY_INDEX=100 \
    -e IMG_OBJKEY_LIST=image_list.dat \
    -e BATCH_JOB_ARRAY_SIZE=1000 \
        $REGISTRY_URI$REPO_NAME:$IMAGE_TAG python /opt/face_tagger.py

Before we launch the full batch, let's confirm that partial result files generated by our local run are present in the task storage. We are going to use AWS CLI to interact with S3 storage of our task. First, we need to [fetch credentials](https://htc.rescale.com/api-docs/ogrgmNBE+EiA4WoxiZRnHw==/#/Task%20Resource/get_htc_projects__projectId__tasks__taskId__storage_token__region_) for the storage region we are using.

In [None]:
def get_task_storage_token(htc_token, project_id, task_id, region):
    regions_res = requests.get(
        f"https://htc.rescale.com/api/v1/htc/projects/{project_id}/tasks/{task_id}/storage/token/{region}",
        headers={"Authorization": f"Bearer {htc_token}"},
    )
    regions_res.raise_for_status()

    return (
        regions_res.json()["credentials"]["AWS_SESSION_TOKEN"],
        regions_res.json()["credentials"]["AWS_ACCESS_KEY_ID"],
        regions_res.json()["credentials"]["AWS_SECRET_ACCESS_KEY"],
        regions_res.json()["storagePath"],
    )


(
    AWS_SESSION_TOKEN,
    AWS_ACCESS_KEY_ID,
    AWS_SECRET_ACCESS_KEY,
    STORAGE_S3_PATH,
) = get_task_storage_token(HTC_TOKEN, PROJECT_ID, TASK_ID, REGION)


AWS CLI expects credentials to be stored in environmental variables.

In [None]:
! AWS_SESSION_TOKEN=$AWS_SESSION_TOKEN \
    AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID \
        AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY \
            aws s3 ls $STORAGE_S3_PATH

We are going to query [group summary statistics](https://htc.rescale.com/api-docs/ogrgmNBE+EiA4WoxiZRnHw==/#/Task%20Resource/get_htc_projects__projectId__tasks__taskId__group_summary_statistics) to track statuses of jobs within a group. The code polls for statuses until the sum of jobs in terminal states (`SUCCEEDED` or `FAILED`) is equal to the batch size. Status counts captured at polling intervals are then plotted to visualize sate transitions for the group.

In [None]:
def monitor_group(project_id, task_id, group_name, group_size):
    items = []
    while True:
        group_res = requests.get(
            f"https://htc.rescale.com/api/v1/htc/projects/{project_id}/tasks/{task_id}/group-summary-statistics",
            params={"group": group_name},
            headers={"Authorization": f"Bearer {HTC_TOKEN}"},
        )
        group_res.raise_for_status()
        job_statuses = group_res.json()["items"][0]["jobStatuses"]

        timestamp = datetime.datetime.now()
        items.extend([[timestamp, s, job_statuses[s]] for s in job_statuses.keys()])

        if job_statuses["SUCCEEDED"] + job_statuses["FAILED"] == group_size:
            print("ALL DONE")
            pprint(job_statuses, indent=4)
            break

        print(",".join([f"{s}: {job_statuses[s]}" for s in job_statuses.keys()]))
        print("Sleeping 30s...")
        time.sleep(30)

    df = pd.DataFrame(items, columns=["ts", "status", "count"])
    fig = px.line(df, x="ts", y="count", color="status", log_y=True, symbol="status")
    fig.show()


Now let's submit a batch of jobs. We will use the `group` query parameter to assign our batch to a group. This will allow us to track statuses for the entire group. Environmental variables `BATCH_JOB_ARRAY_SIZE` and `IMG_OBJKEY_LIST` are read by the application to determine the chunk of images to process by each job.

In [None]:
BATCH_SIZE = 1000
GROUP_NAME = f"htc-tutorial-jobgroup-{int(time.time())}"

batch_payload = [
    {
        "jobName": "Test HTC tutorial job",
        "batchSize": BATCH_SIZE,
        "htcJobDefinition": {
            "imageName": "opencv-htc-tutorial:v1",
            "maxVCpus": 1,
            "maxMemory": 128,
            "commands": ["python", "/opt/face_tagger.py"],
            "envs": [
                {"name": "BATCH_JOB_ARRAY_SIZE", "value": f"{BATCH_SIZE}"},
                {"name": "IMG_OBJKEY_LIST", "value": "image_list.dat"},
            ],
            "execTimeoutSeconds": 5000,
        },
    }
]

batch_res = requests.post(
    f"https://htc.rescale.com/api/v1/htc/projects/{PROJECT_ID}/tasks/{TASK_ID}/jobs/batch",
    params={"group": GROUP_NAME},
    headers={"Authorization": f"Bearer {HTC_TOKEN}"},
    json=batch_payload,
)
batch_res.raise_for_status()
pprint(batch_res.json(), indent=4)

monitor_group(PROJECT_ID, TASK_ID, GROUP_NAME, BATCH_SIZE)


## Aggregate results

Our batch of jobs successfully completed. It is time to aggregate results. Let's fetch all the `.dat` files deposited in task storage. Note that storage tokens expire, we will fetch a new set of storage credentials.

In [None]:
(
    AWS_SESSION_TOKEN,
    AWS_ACCESS_KEY_ID,
    AWS_SECRET_ACCESS_KEY,
    STORAGE_S3_PATH,
) = get_task_storage_token(HTC_TOKEN, PROJECT_ID, TASK_ID, REGION)

! rm -fr val2017_output/*
! AWS_SESSION_TOKEN=$AWS_SESSION_TOKEN \
    AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID \
        AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY \
            aws s3 cp $STORAGE_S3_PATH val2017_output --recursive \
                --exclude "*" --include "*.dat" --quiet

Next, we'll process the `.dat` files to find the one with the highest count of images.

In [None]:
def find_max_faces(dir_path):
    max_face_count = (None, 0)
    for file in [f for f in os.listdir(dir_path) if f.endswith("dat")]:
        with open(f"{dir_path}/{file}", "r") as f:
            face_cnt = int(f.readlines()[1])
            if face_cnt > max_face_count[1]:
                max_face_count = (file, face_cnt)
    return max_face_count


max_face_count = find_max_faces("val2017_output")
MAX_CNT_FILE = max_face_count[0].replace(".dat", ".jpg")


Let's fetch the image with the largest detected faces count and display it.

In [None]:
! RESCALE_HTC_BEARER_TOKEN=$HTC_TOKEN RESCALE_HTC_REGION=$REGION \
    htcctl --project-id $PROJECT_ID --task-id $TASK_ID \
        download $MAX_CNT_FILE -d val2017_output/

display.Image(f"val2017_output/{MAX_CNT_FILE}")

## Clean up resources

Now that we have successfully completed our task and found the image with the most faces in it, we can free up the storage resources by deleting the task.

In [2]:
delete_res = requests.delete(
    f"https://htc.rescale.com/api/v1/htc/projects/{PROJECT_ID}/tasks/{TASK_ID}",
    headers={"Authorization": f"Bearer {HTC_TOKEN}"}
)
delete_res.raise_for_status()

! rm -fr val2017*
! find . -type f \( -name "*_tagged.*" -o -name "*.dat" \) -delete

## Cancelling jobs

We may encounter situations where something goes wrong with our jobs, our status monitoring shows unexpected number of failing jobs. Instead of waiting for all submitted jobs to complete, we can cancel the job group.

Our application, when it detects that `SIMULATE_FAILURES` environmental variable, will randomly fail or timeout jobs. When we poll for status, at each interval, we calculate the percentage of failed jobs. If 15% of jobs are in a failed state - then job group is cancelled.

The `/jobs` endpoint also allows for fetching jobs with specific statuses - we will use this later. Note that we can fetch either all statuses or filter for an individual status.

In [None]:
def get_all_jobs(htc_token, project_id, task_id, group, status=None):
    statuses = []

    params = {"group": group, "viewType": "SIMPLE"}
    if status:
        params["status"] = status

    res = requests.get(
        f"https://htc.rescale.com/api/v1/htc/projects/{project_id}/tasks/{task_id}/jobs",
        params=params,
        headers={"Authorization": f"Bearer {htc_token}"},
    )
    res.raise_for_status()
    statuses.extend(res.json()["items"])

    while "next" in res.json():
        res = requests.get(
            res.json()["next"], headers={"Authorization": f"Bearer {htc_token}"}
        )
        res.raise_for_status()
        statuses.extend(res.json()["items"])

    return statuses


def extract_status_counts(statuses):
    status_counts = {
        "FAILED": 0,
        "RUNNABLE": 0,
        "RUNNING": 0,
        "STARTING": 0,
        "SUBMITTED_TO_PROVIDER": 0,
        "SUBMITTED_TO_RESCALE": 0,
        "SUCCEEDED": 0,
    }

    for s in [i["status"] for i in statuses]:
        status_counts[s] += 1

    return status_counts


The following function monitors job statuses using the above approach and decides whether a job group should be cancelled.

In [None]:
def monitor_for_cancellation(htc_token, project_id, task_id, group, batch_size):
    cancel = False
    while True:
        job_statuses = extract_status_counts(
            get_all_jobs(htc_token, project_id, task_id, group)
        )
        timestamp = datetime.datetime.now()

        succeeded = job_statuses["SUCCEEDED"]
        failed = job_statuses["FAILED"]

        print(f"SUCCEEDED: {succeeded}, FAILED: {failed}. Waiting for 30s...")

        if succeeded + failed == batch_size:
            print("ALL DONE")
            break
        elif (succeeded + failed > 0) and (failed / (succeeded + failed) > 0.2):
            print("TOO MANY FAILURES - CANCELLING...")
            cancel = True
            break
        time.sleep(30)

    if cancel:
        cancel_res = requests.post(
            f"https://htc.rescale.com/api/v1/htc/projects/{project_id}/tasks/{task_id}/jobs/cancel",
            params={"group": group},
            headers={"Authorization": f"Bearer {htc_token}"},
        )
        cancel_res.raise_for_status()
        monitor_group(project_id, task_id, group, batch_size)


Before submitting a new batch, we need to create a new task, as the previous task was deleted. Our new task represents a separate experiment.

In [None]:
task_payload = {
    "taskName": "htc-task-from-htctutorial",
    "taskDescription": "HTC Task from HTC Tutorial",
}

task_res = requests.post(
    f"https://htc.rescale.com/api/v1/htc/projects/{PROJECT_ID}/tasks",
    headers={"Authorization": f"Bearer {HTC_TOKEN}"},
    json=task_payload,
)
task_res.raise_for_status()
TASK_ID = task_res.json()["taskId"]


Now let's submit it.

In [None]:
fail_payload = [
    {
        "jobName": "Test HTC tutorial job",
        "batchSize": BATCH_SIZE,
        "htcJobDefinition": {
            "imageName": "opencv-htc-tutorial:v1",
            "maxVCpus": 1,
            "maxMemory": 128,
            "commands": ["python", "/opt/face_tagger.py"],
            "envs": [
                {"name": "SIMULATE_FAILURES", "value": "true"},
                {"name": "BATCH_JOB_ARRAY_SIZE", "value": f"{BATCH_SIZE}"},
                {"name": "IMG_OBJKEY_LIST", "value": "image_list.dat"},
            ],
            "execTimeoutSeconds": 60,
        },
    }
]

GROUP_NAME = f"htc-tutorial-jobgroup-{int(time.time())}"

batch_res = requests.post(
    f"https://htc.rescale.com/api/v1/htc/projects/{PROJECT_ID}/tasks/{TASK_ID}/jobs/batch",
    params={"group": GROUP_NAME},
    headers={"Authorization": f"Bearer {HTC_TOKEN}"},
    json=fail_payload,
)
batch_res.raise_for_status()

monitor_for_cancellation(HTC_TOKEN, PROJECT_ID, TASK_ID, GROUP_NAME, BATCH_SIZE)


The state transition plot shows that jobs are getting cancelled at different statuses. We expect some jobs to fail because of the artificial exception thrown by the application, some may have timed out, some will fail due to cancellation. Let's query for statuses of `FAILED` jobs and inspect failure codes and reasons.

In [None]:
failure_codes_reasons = {}
for s in get_all_jobs(HTC_TOKEN, PROJECT_ID, TASK_ID, GROUP_NAME, "FAILED"):
    code_reason = f"{s['failureCode']}:{s['statusReason']}"
    if code_reason in failure_codes_reasons:
        failure_codes_reasons[code_reason] += 1
    else:
        failure_codes_reasons[code_reason] = 0
pprint(failure_codes_reasons)


This concludes the basic workflow.