# User API

Full documentation of the Rescale API is available at [https://engineering.rescale.com/api-docs](https://engineering.rescale.com/api-docs). This document builds a story that aims to cover the entire User API surface.

## Starting the notebook

The only prerequisite to running this tutorial locally is a working installation of Python 3.8+. To obtain a local copy of the repository, clone it with `git` or [download zip](https://github.com/rescale-labs/Api_UserAPI_Reference/archive/refs/heads/main.zip). We're going to work within a virtual environment so that our dependencies are installed locally without affecting the global installation.

```
# Clone or unzip repository
$ git clone https://github.com/rescale-labs/Api_UserAPI_Reference.git

$ cd Api_UserAPI_Reference
$ python -m venv .venv
$ . .venv/bin/activate
$ pip install -r requirements.txt
$ jupyter notebook README.ipynb
```

Jupyter Notebook cells need to be executed in order, as subsequent cells may depend on imports and variables defined in previous cells.

### Python and JSON

Python dictionaries can be mapped 1-1 to JSON format. The requests module automatically translates between a dictionary and the JSON format when sending payload and fetching results. The following code demonstrates this mapping.


In [None]:
payload = {
    "string_property": "string",
    "numeric_property": 12,
    "array_property": [{"id": 1}, {"id": 2}],
    "object_property": {
        "name": "My Job",
        "walltime": 12,
        "interactive": False,
        "files": None,
    },
}

import json

print(json.dumps(payload, indent=4))

## Authorization

The Rescale API authorizes requests using an *API Key*. This key is unique per User, per Workspace. A give user may have multiple *API Keys* active - one per Workspace.

To generate an *API Key*, go to the API section of the User Profile.

![](README.images/user_profile_apikey.png)

> NOTE: Remember that your *API Key* is a secret. Never store it in a code repository. Never share it with your colleagues. If you realize that your *API Key* was exposed - delete it in the user profile and generate a new key.

Rescale tools like Rescale CLI look for `apiconfig` authorization configuration file in users' home directory.

```
$HOME/.config/rescale/apiconfig             # Linux
%USERPROFILE%\config\rescale\apiconfig      # Windows
```

We will use this convention to store our credentials. Create the `apiconfig` text file in the aforementioned location and fill it with the following lines

```
[default]
apibaseurl = https://eu.rescale.com
apikey = 79a49b3132335a44742e86c9126e5cfaa1ea2489
```

If you need to execute your script on several platforms, you can define alternative profiles (for example `[us]`). The [config.py](config.py) module is provided for your convenience. You can copy it next to your scripts and use it as a standard way to retrieve credentials. Let's use it.

In [None]:
import config

api_url, api_key = config.get_profile()

We now have our `apikey` and a base URL for our API calls. Let's make the first call to get our user details. To make REST API calls, we will use the [`requests`](https://requests.readthedocs.io/en/latest/) module. It is listed as a dependency in the [`requirements.txt`](requirements.txt) file.

In [None]:
import requests

results = requests.get(f"{api_url}/api/v2/users/me/")
if results.status_code != 200:
    print(results.status_code)

We got status code `401 Unauthorized` which suggests that we're missing our credentials. Let's define `headers` that we will use to authorize future API calls. Instead of checking for result code, let's use a function that raises an exception for all statuses that signify lack of success. Finally, we pretty-print the response JSON document.

In [None]:
headers = {"Authorization": f"Token {api_key}"}
results = requests.get(f"{api_url}/api/v2/users/me/", headers=headers)
results.raise_for_status()

from pprint import pprint

pprint(results.json())

We're now ready to proceed and build up job submission.

## Coretypes and Analyses


The job definition requires us to specify software to be attached to a cluster running our job and the hardware (coretype and cores count). Let's [get the list of coretypes](https://engineering.rescale.com/api-docs/#coretypes).

In [None]:
coretypes_res = requests.get(f"{api_url}/api/v2/coretypes/", headers=headers)
coretypes_res.raise_for_status()

# Display the total count of coretypes and the amount present in the response
print(
    f"Count: {coretypes_res.json()['count']}; Length: {len(coretypes_res.json()['results'])}"
)

# Display reference to the next page
print(f"Next page: {coretypes_res.json()['next']}")

# Display a coretype object and its properties
pprint(coretypes_res.json()["results"][0])

The first thing to note is that the response returned a total count of coretypes that is larger than the total count of coretype objects in the `results` list. This will often happen for endpoints that potentially return large amounts of data. In such situations, results are paged, and each page contains a reference to the `next` page.

Since paging is common, let's define a function that will loop through all the pages and aggregate objects returned in the `results` list of each page.

In [None]:
def get_all_result_pages(url, headers={}, params={}):
    results = []

    res = requests.get(url, headers=headers, params=params)
    res.raise_for_status()

    results.extend(res.json()["results"])

    while res.json()["next"] != None:
        res = requests.get(res.json()["next"], headers=headers)
        res.raise_for_status()
        results.extend(res.json()["results"])

    return results

In [None]:
coretypes = get_all_result_pages(f"{api_url}/api/v2/coretypes/", headers)

print(f"Length: {len(coretypes)}")

We're now confident that we fetched all coretypes. For our simple job, we will need a general purpose coretype with a low corecount. Let's list coretype `code`s in the `general` category together with their `cores` counts and `processorInfo`.

In [None]:
general_coretypes = {
    c["code"]: {"cores": c["cores"], "processorInfo": c["processorInfo"]}
    for c in coretypes
    if "general" in c["categoryCodes"]
}

pprint(general_coretypes)

Now that we have a shortlist, we need to decide which coretype to use. Since our test calculation does not have specific requirements, we will use the least expensive option. Let's [fetch coretype prices](https://engineering.rescale.com/api-docs/#list-all-compute-prices), filter prices for on-demand economy (ODE) an `linux` OS, link them with general purpose coretypes and get the minimum price.

In [None]:
# Get prices
prices_res = requests.get(f"{api_url}/api/v2/billing/computeprices/", headers=headers)
prices_res.raise_for_status()

# Print sample pricing object
pprint(prices_res.json()[-1])

# Filter out active ODE linux based coretypes
linux_ode_coretypes = {
    c["coreType"]: float(c["value"]["amount"])
    for c in prices_res.json()
    if c["planType"].endswith("on-demand")
    and c["os"] == "linux"
    and c["isActive"]
    and c["coreType"] in general_coretypes
}

# Find the least expensive one
min_price_coretype = min(linux_ode_coretypes, key=linux_ode_coretypes.get)
print(
    f"Lowest price coretype: {min_price_coretype} at {linux_ode_coretypes[min_price_coretype]}"
)

Spot market prices are dynamic and therefore your cheapest coretype may change even during a day. Let's assume that `granite` is the coretype we want to use.

We know which coretype to use, we still need to select the analysis. To build our job creation request, we need to know analysis and version codes. Let's query API for the [list of available analyses](https://engineering.rescale.com/api-docs/#analyses) (software tiles).

In [None]:
analyses = get_all_result_pages(f"{api_url}/api/v2/analyses/", headers)

versions_count = 0
for a in analyses:
    versions_count += len(a["versions"])

print(
    f"Total number of analyses: {len(analyses)}, Total number of versions: {versions_count}"
)
print(
    f"Sample analysis code: {analyses[0]['code']} and version code: {analyses[0]['versions'][0]['versionCode']}"
)

pprint(analyses[0])

If your workspace does not have software filters in place, you should see over 700 analyses and over 3200 unique versions. It may be cumbersome to find the code and version that you need. A more practical approach is to create a job using the web portal and then query the Rescale API to get a JSON document listing job properties (the Job resource model). We will do it in the next section.

## Jobs, Files and Clusters

To fetch the definition of a previously saved Job, go to the portal and get a Job ID for a job similar to the one you'd like to create via the Rescale API.

![](README.images/webportal_jobid.png)

Once we captured the Job ID, we can [get a specific Job](https://engineering.rescale.com/api-docs/#get-a-specific-job).

In [None]:
# Update to match ID of your job
job_id = "ObMcJc"

results = requests.get(f"{api_url}/api/v2/jobs/{job_id}", headers=headers)
results.raise_for_status()

# Display the analysis section
pprint(results.json()["jobanalyses"][0]["analysis"])

# Display the entire job definition JSON
pprint(results.json())

Note that `analyses` property is a list. This represents a possibility of having multiple software tiles (analyses) attached to a single Job.

The Rescale API also allows to [list all user jobs](https://engineering.rescale.com/api-docs/#list-all-jobs). Let's count how many jobs completed this year. Additional query parameters allow filtering jobs by:

* `state` - one of `(completed, not_completed)`
* `job_status` - one of `(PENDING, QUEUED, STARTED, VALIDATED, EXECUTING, COMPLETED, STOPPING, WAITING_FOR_CLUSTER, FORCE_STOP, WAITING_FOR_QUEUE)`

In [None]:
jobs = get_all_result_pages(
    f"{api_url}/api/v2/jobs/",
    headers,
    params={"state": "completed", "job_status": "COMPLETED"},
)

from datetime import datetime

this_year = datetime.today().year

# dateInserted does not conform with ISO format - a small replacement is needed
this_year_jobs = [
    j
    for j in jobs
    if datetime.fromisoformat(j["dateInserted"].replace("Z", "+00:00")).year
    == this_year
]
print(f"Total jobs: {len(jobs)}, Jobs in {this_year}: {len(this_year_jobs)}")

Let's follow by creating a new Job. We will use `user_included` analysis (Bring Your Own Software) to run a [Python script](job_inputs/calculate_pi.py) which estimates the value of π using the [Leibniz’s formula](https://en.wikipedia.org/wiki/Leibniz_formula_for_%CF%80). The script requires an [input file](job_inputs/range.inp) which specifies the number of iterations. Estimated value is stored as text in `pi_estimate.res` and in a binary format in `pi_estimate.bin`.

> NOTE: If your Workspace does not have the Bring Your Own Software software tile enabled, you can use any analysis as all Rescale clusters have a Python interpreter available.

It may be the case that your Rescale setup requires jobs to be submitted against a specific Project. Let's [list available projects](https://engineering.rescale.com/api-docs/#list-projects-available-to-your-user).

In [None]:
projects = get_all_result_pages(f"{api_url}/api/v2/users/me/projects/", headers)

for p in projects:
    print(f"{p['id']}\t{p['name']}")

Now that we have all the needed information about the hardware (`coreType: granite`) and software (`code: user_included; version: 0`) we can specify our job definition payload and [create a Job](https://engineering.rescale.com/api-docs/#create-a-job). To keep things simple in this tutorial, we're not handling exceptions raised by our HTTP requests.

Production code, should try to recover from an exception or give end user information on how to proceed. Here, we expect an exception, so we will catch it and display an error. This demonstrates the Rescale API returning useful error messages.

In [None]:
# Replace with your projectId if required/desired
project_id = None

job_definition = {
    "name": "Rescale UserApi Reference Tutorial",
    "jobanalyses": [
        {
            "analysis": {"code": "user_included", "version": "0"},
            "command": "python3 calculate_pi.py range.inp",
            "hardware": {"coreType": "granite", "coresPerSlot": 1, "walltime": 1},
        }
    ],
    "projectId": project_id,
}

job_save_res = requests.post(
    f"{api_url}/api/v2/jobs/", headers=headers, json=job_definition
)
try:
    job_save_res.raise_for_status()
except:
    pprint(job_save_res.json())

The issue is that the coretype we have selected, `granite`, does not support configuration with `1` core per slot. Let's fix it by updating the Job definition.

In [None]:
job_definition["jobanalyses"][0]["hardware"]["coresPerSlot"] = 2

job_create_res = requests.post(
    f"{api_url}/api/v2/jobs/", headers=headers, json=job_definition
)
job_create_res.raise_for_status()

pprint(job_create_res.json())

The job was successfully created. We can check whether it is visible in the Rescale web portal.

![](README.images/webportal_newjob.png)

Job creation operation just saved the job, it has not yet been submitted. To [submit a job](https://engineering.rescale.com/api-docs/#submit-a-saved-job), we need to capture its ID and call the `submit` operation.

In [None]:
job_id = job_create_res.json()["id"]

job_submit_res = requests.post(
    f"{api_url}/api/v2/jobs/{job_id}/submit/", headers=headers
)
job_submit_res.raise_for_status()

print(f"Status code: {job_submit_res.status_code}")

Status `200` signifies a success. If we go back to the web portal, we will see job statuses changing.

![](README.images/webportal_submittedjob.png)

We want to wait until the job finishes. Let us poll for [job statuses](https://engineering.rescale.com/api-docs/#list-job-status-history) programmatically. We will define a utility function that polls until a specific Job status is reached and prints out statuses on every poll.

In [None]:
def poll_for_job_status(
    api_url, headers, job_id, poll_status, interval=30, verbose=False
):
    import time

    while True:
        job_statuses = get_all_result_pages(
            f"{api_url}/api/v2/jobs/{job_id}/statuses/", headers=headers
        )
        if verbose:
            sorted_statuses = sorted(
                job_statuses,
                key=lambda s: datetime.fromisoformat(s["statusDate"]),
            )
            print(" > ".join([s["status"] for s in sorted_statuses]))

        status_items = [s for s in sorted_statuses if s["status"] == poll_status]
        if len(status_items) > 0:
            return status_items[0]

        time.sleep(interval)

In [None]:
poll_for_job_status(api_url, headers, job_id, "Completed", verbose=True)

Our job went through all the statuses and reached the desired terminal state, however, it seems that the calculation failed. Let's try to figure out why by [listing output files](https://engineering.rescale.com/api-docs/#list-job-output-files). We will search for files that have `output` string in their name.

In [None]:
output_files = get_all_result_pages(
    f"{api_url}/api/v2/jobs/{job_id}/files/", headers, params={"search": "output"}
)

pprint(output_files)

The `process_output.log` file is created for all jobs and captures everything written to a console (standard output and error streams). Let's [fetch the plaintext contents](https://engineering.rescale.com/api-docs/#get-plaintext-content-of-a-file) of this file.

In [None]:
file_id = output_files[0]["id"]

file_contents_response = requests.get(
    f"{api_url}/api/v2/files/{file_id}/lines/", headers=headers
)
file_contents_response.raise_for_status()

for line in file_contents_response.json()["lines"]:
    print(line, end="")

Ha! All is clear. We forgot to upload input files. Let's [upload files](https://engineering.rescale.com/api-docs/#upload-a-file) and capture their IDs. Since file upload is a frequent operation, we will encapsulate in a function.

In [None]:
import os


def file_upload(api_url, headers, file_path):
    files = [
        (
            "file",
            (
                os.path.basename(file_path),
                open(file_path, "rb"),
                "application/octet-stream",
            ),
        )
    ]

    file_upload_res = requests.post(
        f"{api_url}/api/v2/files/contents/", headers=headers, files=files
    )
    file_upload_res.raise_for_status()

    return file_upload_res.json()["id"]

In [None]:
file1_id = file_upload(api_url, headers, "job_inputs/calculate_pi.py")
file2_id = file_upload(api_url, headers, "job_inputs/range.inp")

print(file1_id, file2_id)

Let's try to update our previous job definition by extending the job analysis specification with the `inputFiles` property, and resubmit the job.

In [None]:
job_definition["jobanalyses"][0]["inputFiles"] = [{"id": file1_id}, {"id": file2_id}]
pprint(job_definition)

job_update_res = requests.patch(
    f"{api_url}/api/v2/jobs/{job_id}/", headers=headers, json=job_definition
)
job_update_res.raise_for_status()
pprint(job_update_res.status_code)

job_submit_res = requests.post(
    f"{api_url}/api/v2/jobs/{job_id}/submit/", headers=headers
)
try:
    job_submit_res.raise_for_status()
except:
    print(job_submit_res.status_code)
    print(job_submit_res.json())

Now. Although the API responds with `200 OK` for `PATCH`, both of the above operations had no effect, as changing or re-submitting a Completed job is not allowed.

> NOTE: This is going to be changed to return `400 BAD REQUEST` with an informative error message.

Since we cannot reuse our failed job (the Rescale API does not support a clone operation) let's [delete it](https://engineering.rescale.com/api-docs/#delete-a-job).

Before we call a `DELETE` operation on a Job, we need to make sure that the underlying cluster of the job reached its `Stopped` state. If we try to delete a `Completed` job with Job's cluster still running, the API will return `400 BAD REQUEST`. Let's define a function to poll for either job status or for [cluster status](https://engineering.rescale.com/api-docs/#list-cluster-status-history-for-a-job). The code will wait until the cluster reaches the `"Stopped"` state and then delete a Job.

Cluster statuses can have the following values: `("Stopped", "Stopping", "Stop Requested", "Started", "Starting", "Not Started")`

In [None]:
from enum import Enum


class JobStatus(Enum):
    PENDING = "Pending"
    QUEUED = "Queued"
    STARTED = "Started"
    VALIDATED = "Validated"
    EXECUTING = "Executing"
    COMPLETED = "Completed"
    STOPPING = "Stopping"
    WAITING_FOR_CLUSTER = "Waiting for Cluster"
    FORCE_STOP = "Force Stop"
    WAITING_FOR_QUEUE = "Waiting for Queue"


class ClusterStatus(Enum):
    NOT_STARTED = "Not Started"
    PENDING = "Pending"
    QUEUED = "Queued"
    STARTING = "Starting"
    STARTED = "Started"
    STOP_REQUESTED = "Stop Requested"
    STOPPING = "Stopping"
    STOPPED = "Stopped"
    UNKNOWN = "Unknown"
    ERROR = "Error"


def poll_for_status(api_url, headers, job_id, status, interval=30, verbose=False):
    import time
    from datetime import datetime

    while True:
        if type(status) == type(JobStatus.COMPLETED):
            statuses = get_all_result_pages(
                f"{api_url}/api/v2/jobs/{job_id}/statuses/", headers=headers
            )
        elif type(status) == type(ClusterStatus.STARTED):
            statuses = get_all_result_pages(
                f"{api_url}/api/v2/jobs/{job_id}/cluster_statuses/", headers=headers
            )
        else:
            raise Exception("Unknown Status Type.")

        if verbose:
            sorted_statuses = sorted(
                statuses,
                # Currently dates are returned with inconsistent formatting
                key=lambda s: datetime.fromisoformat(s["statusDate"])
                if type(status) == type(JobStatus.COMPLETED)
                else datetime.fromisoformat(s["statusDate"].replace("Z", "+00:00")),
            )
            print(" > ".join([s["status"] for s in sorted_statuses]))

        status_items = [s for s in sorted_statuses if s["status"] == status.value]
        if len(status_items) > 0:
            return status_items[0]

        time.sleep(interval)

In [None]:
poll_for_status(api_url, headers, job_id, ClusterStatus.STOPPED, verbose=True)

delete_job_res = requests.delete(f"{api_url}/api/v2/jobs/{job_id}/", headers=headers)
delete_job_res.raise_for_status()

print(delete_job_res.status_code)

The next steps are to create a new job with files attached, submit it and wait until it completes. With the new Job definition, we also specify that we want to run the job using On-Demand Economy (spot market) instances (the `isLowPriority` property).

In [None]:
job_definition = {
    "name": "Rescale UserApi Reference Tutorial (with inputs)",
    "jobanalyses": [
        {
            "analysis": {"code": "user_included", "version": "0"},
            "command": "python3 calculate_pi.py range.inp",
            "hardware": {"coreType": "granite", "coresPerSlot": 2, "walltime": 1},
            "inputFiles": [{"id": file1_id}, {"id": file2_id}],
        }
    ],
    "project_id": project_id,
    "isLowPriority": True,
}

job_create_res = requests.post(
    f"{api_url}/api/v2/jobs/", headers=headers, json=job_definition
)
job_create_res.raise_for_status()
job_id = job_create_res.json()["id"]

job_create_res = requests.post(
    f"{api_url}/api/v2/jobs/{job_id}/submit/", headers=headers
)
job_create_res.raise_for_status()

poll_for_status(api_url, headers, job_id, JobStatus.COMPLETED, verbose=True)


Let's get a list of output files and download them all. Downloading files is a frequent operation, let's encapsulate it in a function.

In [None]:
from pathlib import Path


def download_file(api_url, headers, file_id, out_dir, file_name):
    response = requests.get(
        f"{api_url}/api/v2/files/{file_id}/contents/", headers=headers
    )

    chunk_size = 4096
    with open(Path(out_dir, file_name), "wb") as fd:
        for chunk in response.iter_content(chunk_size):
            fd.write(chunk)

In [None]:
out_dir = "job_outputs"
Path(out_dir).mkdir(exist_ok=True)

files = get_all_result_pages(f"{api_url}/api/v2/jobs/{job_id}/files/", headers)
pprint(files)

for f in files:
    download_file(api_url, headers, f["id"], out_dir, f["name"])

Rescale Files metadata contains the `typeId` property, which can have one of the following values.

| id | type                 |
| -- | -------------------- |
| 1  | INPUT_FILE           |
| 2  | TEMPLATE_FILE        |
| 3  | PARAM_FILE           |
| 4  | SCRIPT               |
| 5  | OUTPUT_FILE          |
| 8  | CASE_FILE            |
| 10 | TEMPORARY_FILE       |
| 11 | CHECKPOINT_ARCHIVE   |
| 12 | SNAPSHOT_FILE        |

We can [retrieve metadata](https://engineering.rescale.com/api-docs/#get-metadata-of-a-file) for each file and compare checksums with downloaded files. We fetched this information already when listing job files, here we demonstrate using the `/files` resource.


In [None]:
import hashlib

for f in files:
    file_metadata_res = requests.get(
        f"{api_url}/api/v2/files/{f['id']}/", headers=headers
    )
    file_metadata_res.raise_for_status()
    remote_hash = file_metadata_res.json()

    hash_sha512 = hashlib.sha512()
    chunk_size = 4096

    with open(Path(out_dir, f["name"]), "rb") as file:
        for chunk in iter(lambda: file.read(chunk_size), b""):
            hash_sha512.update(chunk)

    if hash_sha512.hexdigest() == f["fileChecksums"][0]["fileHash"]:
        print(f"OK\t{f['name']}")
    else:
        print(f"FAIL\t{f['name']}")

Finally, let's clean up our job by deleting it together with input files (remember to wait for cluster to be stopped).

In [None]:
poll_for_status(api_url, headers, job_id, ClusterStatus.STOPPED, verbose=True)

job_delete_res = requests.delete(
    f"{api_url}/api/v2/jobs/{job_id}/",
    headers=headers,
    params={"deleteInputFiles": True},
)
job_delete_res.raise_for_status()
print(job_create_res.status_code)


## License settings and complex Job definitions

It is possible to attach several analyses (software tiles) to a single Job. Commands of such analyses are executed in order they have been specifies in the `jobanalyses` array (or left-to-right in the Rescale Portal). This is usefeul in cases where we have coupled workflows, for example couplicng mechnical and fluid calculations.

There will be situations where multiple softwares are attached to a Job, but there is a single script that drives the calculation. In such cases only one `command` property can be filled. It does not matter which one. To pass command validation performed by the platform, specify an empty command as `"command": "# "`.

License server settings are passed as environmental variables. In the example below ANSYS license settings are passed as `ANSYSLI_SERVERS` and `ANSYSLMD_LICENSE_FILE` environemntal variables. To find out which environmental variables need to be set, it is best to set up the anaysis manually in the Rescale Portal, save it and the fetch it via the API (as in the beggining of the [Jobs, Files and Clusters](#jobs-files-and-clusters) section).

In [None]:
file1_id = file_upload(api_url, headers, "job_inputs/calculate_pi.py")
file2_id = file_upload(api_url, headers, "job_inputs/range.inp")
file3_id = file_upload(api_url, headers, "job_inputs/run_plot.jou")
file4_id = file_upload(api_url, headers, "job_inputs/tjunction_plot.cas.gz")

job_definition = {
    "isLowPriority": False,
    "name": "Rescale UserApi Reference Tutorial (multi tile job with licensed software)",
    "description": "",
    "jobanalyses": [
        {
            "analysis": {"code": "user_included", "version": "0"},
            "command": "python3 calculate_pi.py range.inp",
            "hardware": {"coresPerSlot": 4, "coreType": "emerald_max", "walltime": 1},
            "inputFiles": [
                {"id": file1_id},
                {"id": file2_id},
                {"id": file3_id},
                {"id": file4_id},
            ],
        },
        {
            "envVars": {
                "ANSYSLI_SERVERS": "2325@ansys.server.com",
                "ANSYSLMD_LICENSE_FILE": "1055@ansys.server.com",
            },
            "analysis": {"code": "ansys_fluent", "version": "2019r1"},
            "command": "fluent 3ddp -gu -ssh -cnf=$FLUENT_HOSTS -t$RESCALE_CORES_PER_SLOT -i run_plot.jou -driver x11",
            "hardware": {"coresPerSlot": 4, "coreType": "emerald_max", "walltime": 1},
        },
    ],
    "projectId": None,
}

job_create_res = requests.post(
    f"{api_url}/api/v2/jobs/", headers=headers, json=job_definition
)
job_create_res.raise_for_status()

We can now go to the Rescale Portal and inspect our Job definition visually. It should look similar to the following recording. If you do have a Fluent license, you can proceed with submitting the Job.

![](README.images/multi_tile_job_with_license.gif)

## Quasi persistent clusters

There are simulation workflows where execution is driven by some external control process. For example, an optimizer, that will submit multiple calculations with slightly changed parameters to minimize some objective function.

Instead of launching separate jobs, that start multiple clusters executing a predefined command, we could start a cluster and then execute commands from an external process, dynamically. If we execute consecutive jobs serially, using a quasi-persistent instead of submitting individual jobs, saves us time waiting for cluster startup. There are other possibilities like implementing fine-grained checkpointing when running on economy priority coretypes.

[SSH](https://en.wikipedia.org/wiki/Secure_Shell) is used for both: executing commands on a remote cluster and transferring files (via [SCP](https://en.wikipedia.org/wiki/Secure_copy_protocol)). To enable SSH connections to Rescale clusters, a public key of a key-pair need to be registered in the User Profile. To generate keypair execute the following command.

In [None]:
! ssh-keygen -f rescale-key -P ""
! cat rescale-key.pub

Once executed, copy the contents of the public key (`rescale-key.pub`) and paste it the Job Settings section of the User Profile as demonstrated below. Make sure to keep your private key secret. We will use it to authenticate with the cluster.

![](README.images/user_profile_public_key.gif)

To create a cluster that will keep running until it is stopped or a walltime is reached, we need to specify a command that will never exit. For example `"command": "sleep inf"`. Once a cluster is created (Job has a `EXECUTING` status), we can query the (undocumented) `/api/v2/jobs/{job_id}/instances/` API endpoint, to get cluster public IP address, username and SSH port. We will use this information to establish an SSH connection. Let's create a quasi-persistent cluster.

In [None]:
project_id = None

job_definition = {
    "name": "Rescale UserApi Reference Tutorial (sleep inf)",
    "jobanalyses": [
        {
            "analysis": {"code": "user_included", "version": "0"},
            "command": "sleep inf",
            "hardware": {"coreType": "granite", "coresPerSlot": 2, "walltime": 1},
        }
    ],
    "project_id": project_id,
    "isLowPriority": True,
}
job_create_res = requests.post(
    f"{api_url}/api/v2/jobs/", headers=headers, json=job_definition
)
job_create_res.raise_for_status()
job_id = job_create_res.json()["id"]

job_create_res = requests.post(
    f"{api_url}/api/v2/jobs/{job_id}/submit/", headers=headers
)
job_create_res.raise_for_status()

poll_for_status(api_url, headers, job_id, JobStatus.EXECUTING, verbose=True)

instances = get_all_result_pages(f"{api_url}/api/v2/jobs/{job_id}/instances/", headers)
pprint(instances[0])

Let's extract `username`, `sshPort` and `publicIp` properties of the cluster instance model. The `role` attribute is useful when we create multi-node clusters. We use it to distinguish between the head and worker nodes cluster instances will either have a role `MPI_MASTER` (head node) or `MPI_SLAVE` (for a worker node). Please excuse our language here.

We will use the [Paramiko](https://www.paramiko.org/) python module to establish secure connection with the cluster. The [scp](https://github.com/jbardin/scp.py) python module uses a Paramiko transport to send and receive files. Let's try to estimate PI using quasi-persistent cluster. Here, instead of upload files as Job inputs, we copy these to a running cluster using SCP. Then we execute the command over SSH. See code comments for explanations.

In [None]:
import paramiko
import scp


def key_based_connect(host, port, username, pkey_path):
    pkey = paramiko.RSAKey.from_private_key_file(pkey_path)
    client = paramiko.SSHClient()
    policy = paramiko.AutoAddPolicy()
    client.set_missing_host_key_policy(policy)
    client.connect(host, port, username, pkey=pkey)

    return client

In [None]:
# Path to private key
PKEY_PATH = "rescale-key"

public_ip = instances[0]["publicIp"]
ssh_port = instances[0]["sshPort"]
username = instances[0]["username"]

print(f"Connecting to {username}@{public_ip}:{ssh_port}")

ssh_client = key_based_connect(public_ip, ssh_port, username, PKEY_PATH)
scp_client = scp.SCPClient(ssh_client.get_transport())

scp_client.put("job_inputs/calculate_pi.py", "work/calculate_pi.py")
scp_client.put("job_inputs/range.inp", "work/range.inp")

# The exec_command returns immediately, so no result file will yet be available.
# A direct way of waiting util command finishes is to keep reading the standard output.
_, stdout, _ = ssh_client.exec_command("cd work && python3 calculate_pi.py range.inp")
for line in iter(stdout.readline, ""):
    print(line, end="")

# Download and display the result file
scp_client.get("work/pi_estimate.res", "job_outputs/pi_estimate.res_ssh.log")
with open("job_outputs/pi_estimate.res_ssh.log") as f:
    print(f.read())

Our calculation is now completed, but the cluster is still running, spinning our our infinite sleep command (`sleep inf`). We need to explicitly shut it down. There are 3 ways to [stop a job](https://engineering.rescale.com/api-docs/#stop-a-job). In our case we want the Job to shutdown as we're not interested in syncing files to cloud storage. Let's shut it down (we expect response status code to be `202`).

In [None]:
job_stop_res = requests.post(
    f"{api_url}/api/v2/jobs/{job_id}/stop/", headers=headers, json=job_definition
)
job_stop_res.raise_for_status()
print(job_stop_res)

When stopping jobs executing complex commands, or when we want to sync files in the `$HOME/work` directory of our quasi-persistent cluster, then we need to stop our Job gracefully. We have two options:

* Graceful stop, where `SIGTERM` signal sent to the `command` process with files synced to cloud storage (`/jobs/{job_id}/stop/`)
* Forced stop, where a `SIGTERM` signal sent to the `command` process and then files are saved to cloud storage

## Optimizing file transfers

There are three main approaches to speeding up file transfers:

* file size reduction through compression
* transferring multiple files at the same time
* splitting a file into chunks and transferring these concurrently

The next sections will focus on compression and chunked transfers. Presented techniques of concurrent execution should be easy to apply to uploading multiple files at the same time. 

### Archiving files before upload

The simplest approach to file transfer optimization is to use compression. The following script uses the standard Python `zipfile` module to create an archive of input files.

Note that the `command` in the Job definition starts with changing the current directory to the directory we have compressed. Zip archives are automatically decompressed by the platform. Automatic decompression can be disabled by specifying an additional parameter to the file specification dictionary.

```
"inputFiles": [{"id": file_id, "decompress": False}]
```

In [None]:
import zipfile
import os

ZIP_FILE = "job_inputs.zip"


def create_zip(dir_path, zip_path):
    with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf:
        for root, _, files in os.walk(dir_path):
            for file in files:
                file_path = os.path.join(root, file)
                rel_path = os.path.relpath(file_path, dir_path)
                zipf.write(file_path, rel_path)


create_zip("job_inputs", ZIP_FILE)
file_id = file_upload(api_url, headers, ZIP_FILE)
os.remove(ZIP_FILE)


def get_process_output_lines(api_url, headers, job_id):
    output_files = get_all_result_pages(
        f"{api_url}/api/v2/jobs/{job_id}/files/",
        headers=headers,
        params={"search": "process_output.log"},
    )
    file_id = output_files[0]["id"]

    file_contents_response = requests.get(
        f"{api_url}/api/v2/files/{file_id}/lines", headers=headers
    )
    file_contents_response.raise_for_status()

    return file_contents_response.json()["lines"]


job_definition = {
    "name": "Rescale UserApi Reference Tutorial (with zipped inputs)",
    "jobanalyses": [
        {
            "analysis": {"code": "user_included", "version": "0"},

            # Note that we need to cd into a directory
            "command": "cd job_inputs; python calculate_pi.py range.inp",

            "hardware": {"coreType": "granite", "coresPerSlot": 2, "walltime": 1},
            "inputFiles": [{"id": file_id}],
        }
    ],
    "project_id": project_id,
    "isLowPriority": True,
}

job_create_res = requests.post(
    f"{api_url}/api/v2/jobs/", headers=headers, json=job_definition
)
job_create_res.raise_for_status()

job_id = job_create_res.json()["id"]
job_create_res = requests.post(
    f"{api_url}/api/v2/jobs/{job_id}/submit/", headers=headers
)
job_create_res.raise_for_status()

poll_for_job_status(api_url, headers, job_id, "Completed", verbose=True)

for l in get_process_output_lines(api_url, headers, job_id):
    print(l, end="")

### Concurrent file download

Rescale API file download endpoint supports [HTTP Range requests](https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests). This allows client code to split the download into several chunks and download these concurrently.

Let's start from submitting a job that will generate a 1 GB result file.

In [None]:
TEST_FILE_NAME = "testfile"

job_definition = {
    "name": "Rescale UserApi Reference Tutorial (1GB result file)",
    "jobanalyses": [
        {
            "analysis": {"code": "user_included", "version": "0"},
            "command": f"openssl rand -out {TEST_FILE_NAME} -base64 792917038",
            "hardware": {"coreType": "granite", "coresPerSlot": 2, "walltime": 1},
        }
    ],
    "project_id": project_id,
}

job_create_res = requests.post(
    f"{api_url}/api/v2/jobs/", headers=headers, json=job_definition
)
job_create_res.raise_for_status()
job_id = job_create_res.json()["id"]

job_create_res = requests.post(
    f"{api_url}/api/v2/jobs/{job_id}/submit/", headers=headers
)
job_create_res.raise_for_status()

poll_for_status(api_url, headers, job_id, "Completed", verbose=True)


The command of the above Job used `openssl` to create a random file that is now stored in cloud storage. The code below defines several functions that work together to download a file in chucks, each in its own thread. Consult the code comments for explanation.

In [None]:
import hashlib
import os
import threading
from collections import namedtuple
from pathlib import Path

Chunk = namedtuple("Chunk", ["idx", "start_byte", "end_byte"])
FILE_IO_CHUNK_SIZE = 4096


def calculate_file_hash(file_path):
    hash_sha512 = hashlib.sha512()

    with open(file_path, "rb") as file:
        for chunk in iter(lambda: file.read(FILE_IO_CHUNK_SIZE), b""):
            hash_sha512.update(chunk)

    return hash_sha512.hexdigest()


def concatenate_files(output_file, *input_files, delete_inputs=True):
    with open(output_file, "wb") as output:
        for file_name in input_files:
            with open(file_name, "rb") as input_file:
                chunk = input_file.read(FILE_IO_CHUNK_SIZE)
                while chunk:
                    output.write(chunk)
                    chunk = input_file.read(FILE_IO_CHUNK_SIZE)
            if delete_inputs:
                os.remove(file_name)


def _chunk_download(api_url, headers, file_id, dir_path, file_chunk: Chunk):
    headers["Range"] = f"bytes={file_chunk.start_byte}-{file_chunk.end_byte}"
    response = requests.get(
        f"{api_url}/api/v2/files/{file_id}/contents", headers=headers
    )
    response.raise_for_status()

    with open(Path(dir_path, f"{file_id}.{file_chunk.idx}"), "wb") as fd:
        for chunk in response.iter_content(FILE_IO_CHUNK_SIZE):
            fd.write(chunk)


def concurrent_download(
    api_url,
    headers,
    file_id,
    dir_path=".",
    file_name=None,
    num_threads=10,
    threshold_mb=50,
):
    # get file size to determine whether splitting makes sense
    response = requests.get(f"{api_url}/api/v2/files/{file_id}/", headers=headers)
    response.raise_for_status()

    decrypted_size = response.json()["decryptedSize"]

    # Check if size is above threshold, if not download in one piece
    num_threads = num_threads if decrypted_size > threshold_mb * 1048576 else 1

    file_hash = response.json()["fileChecksums"][0]["fileHash"]
    file_name = file_name if file_name != None else response.json()["name"]

    chunk_size = int(decrypted_size / num_threads)
    threads = []
    for idx in range(0, num_threads):
        start_byte = idx * chunk_size

        # Make sure the last chunk fetches all remaining bytes
        end_byte = (
            (idx + 1) * chunk_size - 1 if idx != num_threads - 1 else decrypted_size + 1
        )

        t = threading.Thread(
            target=_chunk_download,
            args=(
                api_url,
                headers,
                file_id,
                dir_path,
                Chunk(idx, start_byte, end_byte),
            ),
        )
        t.start()
        threads.append(t)

    for t in threads:
        t.join()

    concatenate_files(
        file_name, *[f"{file_id}.{idx}" for idx in range(0, num_threads)]
    )

    # Compare file hashes to make sure downloaded file is not corrupted.
    filehash = calculate_file_hash(file_name)
    if filehash != file_hash:
        raise Exception("File hashes not equal. Corrupted file download.")


# Get test file ID
output_files_res = get_all_result_pages(
    f"{api_url}/api/v2/jobs/{job_id}/files/", headers, params={"search": TEST_FILE_NAME}
)
test_file_id = output_files_res[0]["id"]

# Compare download speed
import time

t1 = time.time()
concurrent_download(api_url, headers, test_file_id, num_threads=10)
print(f"Time with 10 threads: {time.time() - t1} s")

t1 = time.time()
concurrent_download(api_url, headers, test_file_id, num_threads=1)
print(f"Time with 1 thread: {time.time() - t1} s")


Depending on your connection speed and hardware, we should see download time reduction, for example

```
Time with 10 threads: 35.551738023757935 s
Time with 1 thread: 124.59398603439331 s
```



### Concurrent upload with archive volumes

The file upload endpoint does not support range requests, a feature we used for concurrent downloads. It is, however, still possible to use the file splitting approach to speed up file uploads. The solution is to split a file on a local computer, upload parts, and reassemble the file within a Job.

In our approach, we will use a compression utility that supports splitting an archive into volumes. This will give us an added benefit of reducing the number of bytes to transfer.

The default `7zip` tool available on Rescale clusters does not support the latest, high-throughput compression algorithms. Therefore, we will use `7za` executables, included in this repository, that support new algorithms (you can find source distributions here: [Windows](https://mcmilk.de/projects/7-Zip-zstd), [Linux/macOS](https://github.com/p7zip-project/p7zip)). Make sure to set `LOCAL_7ZA` path to a binary matching your operating system/architecture.

For testing, we will reuse the `testfile` downloaded in the previous section. To parallelize uploading of file chunks, we will use [concurrent futures,](https://docs.python.org/3/library/concurrent.futures.html) a high-level abstraction for concurrent execution. We could also use the previous approach using the `threading` module, however, in the upload case, we're interested in a result of a concurrently executed function (file ID). This is greatly simplified by using futures.

In [None]:
import concurrent.futures
import glob
import os
import subprocess
import time

LOCAL_7ZA = "7za_bin/7za_v17.05_mac_arm64"
REMOTE_7ZA = "7za_bin/7za_v17.05_lnx_x64"

TEST_FILE = "testfile"


def create_split_archive(
    _7za_exe, archive_name, input_path, volume_size_mb=100, delete_existing=True
):
    for f in glob.glob(f"{archive_name}.*"):
        os.remove(f)

    subprocess.run(
        f"{_7za_exe} a {archive_name} -mx=4 -m0=brotli -mmt16 -aoa -v{volume_size_mb}m {input_path}",
        shell=True,
        check=True,
    )
    return glob.glob(f"{archive_name}.*")


def parallel_upload(api_url, headers, input_files, num_threads=10):
    with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
        futures = {
            executor.submit(file_upload, api_url, headers, f): f for f in input_files
        }

        file_ids = []
        for future in concurrent.futures.as_completed(futures):
            file_id = future.result()
            file_ids.append(file_id)

    return file_ids


t1 = time.time()
split_files = create_split_archive(LOCAL_7ZA, "testfile.7z", TEST_FILE)
file_ids = parallel_upload(api_url, headers, split_files)
print(f"Parallel upload of split archive: {time.time()-t1} s")

t1 = time.time()
file_id = file_upload(api_url, headers, TEST_FILE)
print(f"Upload of decompressed file: {time.time()-t1} s")


Depending on your connection speed and hardware, we should see upload time reduction, for example

```
Parallel upload of split archive: 66.50943064689636 s
Upload of decompressed file: 177.7647111415863 s
```

We have uploaded our input archive as volumes, so the job that wants to use this archive first needs to reassemble it and decompress it. Since Rescale clusters do not have the `7za` tool supporting the [Brotli algorithm](https://github.com/google/brotli) installed by default, we need to provide it as one of the inputs. Note that we need to upload the executable only once and reuse its file ID for subsequent job submissions (we can also search for existence of a specific file and reuse its ID if found).

Let's create a job that will reassemble the archive on a cluster, and calculate a hash of an extracted test file.

In [None]:
import re

file_id = file_upload(api_url, headers, REMOTE_7ZA)
file_ids.append(file_id)

job_definition = {
    "name": "Rescale UserApi Reference Tutorial (split archive)",
    "jobanalyses": [
        {
            "analysis": {"code": "user_included", "version": "0"},
            "command": ";".join(
                [
                    "cat testfile.7z.* > testfile.7z",
                    "rm testfile.7z.*",
                    f"./{os.path.basename(REMOTE_7ZA)} e -mmt16 -aoa testfile.7z",
                    "echo checksum `shasum -a 512 testfile | awk '{print $1}'`",
                    "rm testfile*",
                ]
            ),
            "hardware": {"coreType": "granite", "coresPerSlot": 2, "walltime": 1},
            "inputFiles": [{"id": id, "decompress": False} for id in file_ids],
        }
    ],
    "project_id": project_id,
    "isLowPriority": True,
}

job_create_res = requests.post(
    f"{api_url}/api/v2/jobs/", headers=headers, json=job_definition
)
job_create_res.raise_for_status()
job_id = job_create_res.json()["id"]

job_create_res = requests.post(
    f"{api_url}/api/v2/jobs/{job_id}/submit/", headers=headers
)
job_create_res.raise_for_status()

poll_for_job_status(api_url, headers, job_id, "Completed", verbose=True)

# Get the process output file and extract hash calculated by the job
output_lines = get_process_output_lines(api_url, headers, job_id)
pattern = r".*checksum ([0-9a-f]*)"
remote_hash = re.match(pattern, output_lines[-3]).group(1)

# Compare to hash calculated for a local file
local_hash = calculate_file_hash(TEST_FILE)
if remote_hash == local_hash:
    print("Hashes match")
else:
    print("Hashes do not match")


Finally, let's free up some cloud storage space by [deleting files](https://engineering.rescale.com/api-docs/#delete-a-file) which have names containing string: `testfile` and were uploaded today.

In [None]:
from datetime import datetime, timezone

test_files = get_all_result_pages(
    f"{api_url}/api/v2/files/",
    headers=headers,
    params={"search": "testfile"},
)

files_to_delete = [
    f
    for f in test_files
    if (
        datetime.now(timezone.utc)
        - datetime.fromisoformat(f["dateUploaded"].replace("Z", "+00:00"))
    ).days
    == 0
]

for f in files_to_delete:
    file_delete_res = requests.delete(
        f"{api_url}/api/v2/files/{f['id']}/", headers=headers
    )
    file_delete_res.raise_for_status()


Note that the above code will delete all matching input and output files. If deleted files were attached as Job inputs, the link will still be there, but if user tries to clone the job, submission will not work. Deleted files fill need to be removed from inputs.

![](README.images/webportal_deleted_input_files.png)

### Concurrent transfer reliability

Note that the above presented techniques do not implement any reliability measures. This is intentional, as adding retry or partial retry logic would make code less readable. In production code, consider adding logic to recover at least from failures which are now raised with `res.raise_for_status()`. If you feel that an official, reliable Python module for Rescale API based file transfers, published in a [PyPI index](https://pypi.org/) would be a good idea - get in touch with your Rescale contact.


## Storage devices

> TODO

## Snapshots, Runs, Tasks and DOE Jobs

> TODO