In [1]:
# import the MAAP package to handle queries
from maap.maap import MAAP


# Choose bounding box and other parameters

The bounding box should be a single string of four lat/lon, in the order:
[left  bottom  right top]

In [42]:
bboxes = {
    # Mt Wilson (near JPL) (small, ~5 sec)
    "MtWilson": (-118.06817, 34.22169, -118.05801, 34.22822),
    # uses ~5-6 GB RAM, takes a few minutes to run
    "WesternUS-CA-NV": (-124.81360, 32.44506, -113.75989, 42.24498),
    "Italy": (6.26868, 36.00380, 18.57179, 47.28139),  # requires more than 8 GB RAM
    # very large bbox options -- might run out of memory
    "WesternUS-CA-CO": (-125.41615, 31.03621, -101.80916, 49.17324),
    "WesternUS-CA-WA-Chicago-NewOrleans": (-125.53514, 25.42150, -87.06960, 49.22105),
}

# Run benchmarking for one location at a time. Select the name here.
bbox_name = "MtWilson"

# Number of DPS jobs to launch per benchmarking run
num_DPS_jobs = 2

# True to perform compute-intensive computations
do_compute = True

In [None]:
# Helper function
def bbox_arg_for(name: str) -> str:
    left, bottom, right, top = bbox_for[name]
    return f"{left} {bottom} {right} {top}"


## Launch jobs in DPS

If you're still in development, skip this section
and jump to "Test Run in ADE" section below.

It's faster to test the algorithm in the ADE first, and
then run it on DPS.

In [5]:
# Register Algorithm from YAML file
maap = MAAP()
maap.register_algorithm_from_yaml_file("/projects/sam_get_dem_test.yml").text

'{"code": 200, "message": {"id": "5272dc689849cf23a7abe760f4e83f209a6b29cc", "short_id": "5272dc68", "created_at": "2024-02-14T21:37:02.000+00:00", "parent_ids": ["05a4ee051b5fc397ad492747ddad377f0718057c"], "title": "Registering algorithm: sam_get_dem_test", "message": "Registering algorithm: sam_get_dem_test", "author_name": "root", "author_email": "root@156a1941fa17", "authored_date": "2024-02-14T21:37:02.000+00:00", "committer_name": "root", "committer_email": "root@156a1941fa17", "committed_date": "2024-02-14T21:37:02.000+00:00", "trailers": {}, "web_url": "https://repo.maap-project.org/root/register-job-hysds-v4/-/commit/5272dc689849cf23a7abe760f4e83f209a6b29cc", "stats": {"additions": 7, "deletions": 7, "total": 14}, "status": "pending", "project_id": 3, "last_pipeline": {"id": 12112, "iid": 792, "project_id": 3, "sha": "5272dc689849cf23a7abe760f4e83f209a6b29cc", "ref": "main", "status": "pending", "source": "push", "created_at": "2024-02-14T21:37:03.882Z", "updated_at": "2024-0

In [3]:
# Submit DPS Job
maap = MAAP(maap_host='api.maap-project.org')
username = maap.profile.account_info()["username"]

from collections import namedtuple
Queue = namedtuple('Queue', ['name', 'ncpu', 'memory_GB'])
queues = [Queue("maap-dps-worker-32vcpu-64gb", 8, 64)]

job_identifier = f"{bbox_name}__{'' if do_compute else 'no-'}compute__{queue}"

# Structure: { <name of queue> : [<maap.py DPS job object>,...] 
jobs: dict[str, list[maap.dps.dps_job.DPSJob]] = {}

for q in queues:
    jobs[q.name] = []
    for _ in range(num_DPS_jobs):
        jobs[queue].append(
            maap.submitJob(
                identifier=job_identifier,  # create a unique job tag
                algo_id="GET-DEM-v0.2.0",  # Name of the registered algorithm
                version="develop",
                username=username,
                queue=q.name,
                # for benchmarking consistency, use same bbox for all jobs
                bbox=bbox_arg_for(bbox_name),
                # TODO: Is there a cleaner design for specifying CLI flags for DPS jobs?
                compute="--compute" if do_compute else ""
            )
        )


In [7]:
# Monitor the job status via JN.
# For a GUI, go to "View and Submit Jobs" via the Launcher
n_failed = 0
n_succeeded = 0
n_running = 0
n_except = 0
n_accepted = 0

for j in jobs.values():
    try:
        j.retrieve_attributes()
    except:
        n_except += 1
        continue
    if j.status.lower() == 'failed':
        n_failed += 1
    elif j.status.lower() == 'succeeded':
        n_succeeded += 1
    elif j.status.lower() == 'running':
        n_running += 1
    elif j.status.lower() == 'accepted':
        n_accepted += 1
    else:
        print(j.status)

print("failed: ", n_failed)
print("accepted: ", n_accepted)
print("running: ", n_running)
print("succeeded: ", n_succeeded)
print("except: ", n_except)


failed:  0
accepted:  0
running:  0
succeeded:  2
except:  0


In [8]:
print(type(jobs[0]))
print(jobs[0])

<class 'maap.dps.dps_job.DPSJob'>
{'job_id': '35045e0e-d5a5-4f7a-bd05-205ff2331b69', 'status': 'Succeeded', 'machine_type': None, 'architecture': None, 'machine_memory_size': None, 'directory_size': None, 'operating_system': None, 'job_start_time': None, 'job_end_time': None, 'job_duration_seconds': None, 'cpu_usage': None, 'cache_usage': None, 'mem_usage': None, 'max_mem_usage': None, 'swap_usage': None, 'read_io_stats': None, 'write_io_stats': None, 'sync_io_stats': None, 'async_io_stats': None, 'total_io_stats': None, 'error_details': None, 'response_code': 200, 'outputs': ['http://maap-ops-workspace.s3-website-us-west-2.amazonaws.com/niemoell/dps_output/GET-DEM-v0.2.0/develop/test7/2024/04/03/08/04/32/785838', 's3://s3.us-west-2.amazonaws.com:80/maap-ops-workspace/niemoell/dps_output/GET-DEM-v0.2.0/develop/test7/2024/04/03/08/04/32/785838', 'https://s3.console.aws.amazon.com/s3/buckets/maap-ops-workspace/niemoell/dps_output/GET-DEM-v0.2.0/develop/test7/2024/04/03/08/04/32/785838/?r

In [12]:
# get the json file names
# Structure: { <name of queue> : [<path to first json file>, <path to second json file>, ...] }
json_files: dict[str, list[str]] = {q: [] for q in jobs.keys()}

# Parse the json file paths from the maap.dps.dps_job.DPSJob objects
prefix = f"http://maap-ops-workspace.s3-website-us-west-2.amazonaws.com/{username}/"
for q, job in jobs.items():
    for output in job.outputs:
        if prefix in output:
            path = output.replace(prefix, "/my-private-bucket/")
            path += "/profile.json"  # Scalene outputs a file named "profile.json"
            json_files[q].append(path)

print(json_files)

['/my-private-bucket/dps_output/GET-DEM-v0.2.0/develop/test7/2024/04/03/08/04/32/785838profile.json', '/my-private-bucket/dps_output/GET-DEM-v0.2.0/develop/test7/2024/04/03/08/04/30/988300profile.json']


In [50]:
### Delete section after development -- these variables should be constructed in previous cells
from collections import namedtuple
Queue = namedtuple('Queue', ['name', 'ncpu', 'memory_GB'])
queues = [Queue("maap-dps-worker-32vcpu-64gb", 8, 64)]

json_files = {q.name: ["/projects/get-dem-nemo/json_example/profile.json", "/projects/get-dem-nemo/json_example/profile.json"] for q in queues}
print(json_files)

job_identifier = "TMP TMP"

####


{'maap-dps-worker-32vcpu-64gb': ['/projects/get-dem-nemo/json_example/profile.json', '/projects/get-dem-nemo/json_example/profile.json']}


In [52]:
# extract metrics
import json
from dataclasses import dataclass

@dataclass
class QueueBenchmark:
    instance_type: str  # this is the queue name on NASA MAAP
    node_ncpu: int  # number of physical CPU cores per machine (not hyperthreads)
    node_memory_GB: int  # Example: `8` for 8GB nodes
    bbox_name: str
    max_memory_usage_MB: float
    mean_alg_runtime: float
    mean_runtime_fetch_and_stitch: float
    mean_runtime_compute: float
    job_identifier: str = None  # use for looking up a job type

# Place to store the Benchmark results
results = []

for q in queues:
    max_mem_usage = 0
    cumulative_time_alg = 0
    cumulative_fetch_stitch = 0
    cumulative_compute = 0

    for job in json_files[q.name]:
        with open(json_f) as f:
            data = json.load(f)

        max_mem_usage = max(max_mem_usage, data["max_footprint_mb"])

        total_job_time = data["elapsed_time_sec"]
        cumulative_time_alg += total_job_time
        
        for func in data["files"]["/app/get-dem/get_dem.py"]["functions"]:
            
            # Runtime for this function in seconds
            func_time = total_job_time * func["n_usage_fraction"]
            
            if func["line"] == "get_dem":
                cumulative_fetch_stitch += func_time
            elif func["line"] in ("read_dem_as_array", "do_computations"):
                cumulative_compute += func_time

    n_jobs_in_queue = len(json_files[q.name])
    assert n_jobs_in_queue == num_DPS_jobs

    results.append(QueueBenchmark(instance_type=q.name,
                            node_ncpu=q.ncpu,
                            node_memory_GB=q.memory_GB,
                            bbox_name=bbox_name,
                            max_memory_usage_MB=max_mem_usage,
                            mean_alg_runtime=cumulative_time_alg / n_jobs_in_queue,
                            mean_runtime_fetch_and_stitch=cumulative_fetch_stitch / n_jobs_in_queue,
                            mean_runtime_compute=cumulative_compute / n_jobs_in_queue,
                            job_identifier=job_identifier,
                           )
                  )


In [53]:
# Display results

import pandas as pd
pd.DataFrame(results)



Unnamed: 0,instance_type,node_ncpu,node_memory_GB,bbox_name,max_memory_usage_MB,mean_alg_runtime,mean_runtime_fetch_and_stitch,mean_runtime_compute,job_identifier
0,maap-dps-worker-32vcpu-64gb,8,64,MtWilson,52148.41012,2072.02719,370.49245,1701.323608,TMP TMP


## Test Run in the ADE

In [3]:
# To run from the ADE, update the conda environment:
# (For DPS, this step is handled during algorithm registration.)
!./build-env.sh

Retrieving notices: ...working... done
Channels:
 - conda-forge
Platform: linux-64
Collecting package metadata (repodata.json): done
Solving environment: done

Downloading and Extracting Packages:
gdal-3.8.3           | 1.6 MB    |                                       |   0% 
ld_impl_linux-64-2.4 | 688 KB    |                                       |   0% [A

keyutils-1.6.1       | 115 KB    |                                       |   0% [A[A


numpy-1.26.4         | 7.1 MB    |                                       |   0% [A[A[A



readline-8.2         | 275 KB    |                                       |   0% [A[A[A[A




azure-storage-blobs- | 505 KB    |                                       |   0% [A[A[A[A[A





poppler-data-0.4.12  | 2.2 MB    |                                       |   0% [A[A[A[A[A[A






libpq-16.2           | 2.4 MB    |                                       |   0% [A[A[A[A[A[A[A







libspatialite-5.1.0  | 3.9 MB    |         

In [4]:
# WARNING - only use for testing! Not DPS!
import os
from time import time

# Build a DEM
start = time()

# Do a test run via CLI to ensure input arguments are passed correctly to the algorithm.
# !./run.sh -118.06817 34.22169 -118.05801 34.22822  # small bbox; should take 7-10 seconds.

# Hint: Make sure to initialize `bbox` above. Otherwise will error.
!./run.sh {bbox} --compute
# !./run.sh {bbox}


print(f"Time to make DEM: {time()-start} seconds")


[02/14 13:32:10] [INFO dem.py] Bounds: -118.06817 34.22169 -118.05801 34.22822
[02/14 13:32:10] [INFO cop_dem.py] Creating /projects/get-dem-nemo/nasa/output/dem.tif
[02/14 13:32:10] [INFO cop_dem.py] Fetching remote tiles...
[02/14 13:32:10] [INFO cop_dem.py] Running GDAL command:
[02/14 13:32:10] [INFO cop_dem.py] gdalwarp /vsicurl/https://raw.githubusercontent.com/scottstanie/sardem/master/sardem/data/cop_global.vrt /projects/get-dem-nemo/nasa/output/dem.tif -of GTiff -ot Int16 -te -118.068169999999995 34.2216900000000024 -118.058009999999996 34.2282200000000003 -tr 0.000277777777777777778 0.000277777777777777778 -s_srs "epsg:4326+3855" -t_srs "epsg:4326" -wo NUM_THREADS=4 -r nearest -wm 5000 -multi
Creating output file that is 37P x 24L.
0...10...20...30...40...50...60...70...80...90...100 - done.
Time to fetch and create dem.tif: 6.497464895248413 seconds
Number of CPU cores available on instance:  32
Time to perform multicore computations: 0.00019598007202148438 seconds
Time to m

In [13]:
from osgeo import gdal
gdal.__version__

'3.7.0'