In [1]:
from maap.maap import MAAP
maap = MAAP(maap_host='api.maap-project.org')

# Launch DPS for `tile_forestage.py`

In [1]:
import os
import geopandas as gpd
import pandas as pd
import glob
import datetime

import sys
sys.path.append('/projects/code/icesat2_boreal/lib')
import ExtractUtils

NASA MAAP


## Steps to commit, build DPS registration yaml, register DPS algorithm from yaml
### Commit with Tag for running
1) Tag the version of the repo that works to run your alg. Use a *tag* of `build_stack_v2024_1` or whatever is appropriate (eg, for AGB runs maybe do `boreal_agb_2023_v1` for consistency?)
  - to maintain sanity, use this GitHub *tag* also as the `algorithm_version` that you need to supply to your algorithm config yaml  
2) think now about how you want your output organized:  
  - remember, the output will be like: `dps_output/<algorithm name>/<algorithm_version>/<IDENTIFIER>`  
  - note: identifier for biomass runs should be `AGB_2020` , for height `HT_2020`  
  - if different types of `AGB_2020` runs (eg like is you are testing different sets of parameters associated with model dev/application you can keep the results separate from one run to the next, delivering the output into different subdirs, by modifying this IDENTIFIER like this: `AGB_2020/run_param_set01`     
    
  
3) follow git instructions (every time!!):  
 - git add changes  
 - git commit -m 'message'  
 - git tag -f `tile_standage_v1`    
 - git push  
 - git push origin -f `tile_standage_v1`  

3) if it looks weird check git log to make sure tag is at same place as origin and dps

### Use MAAP Registration call in notebook chunk to register DPS algorithm
 - We need to register a DPS algorithm called `run_build_stack_LC` before proceeding to the chunks below...

In [None]:
maap.register_algorithm_from_yaml_file("/projects/code/icesat2_boreal/dps/registered/run_tile_forestage.yml").text

In [11]:
# Boreal Tiles 
boreal_tiles_model_ready_fn = 'https://maap-ops-workspace.s3.amazonaws.com/shared/montesano/databank/boreal_tiles_v004.gpkg'
boreal_tiles = gpd.read_file(boreal_tiles_model_ready_fn)
m = boreal_tiles.explore(color='red')

Unnamed: 0,tile_num,tile_version,tile_group,map_version,geometry
0,1,version 1,eurasia west,none,"POLYGON ((-2151478.000 9423304.000, -2061478.0..."
1,2,version 1,eurasia west,none,"POLYGON ((-2061478.000 9423304.000, -1971478.0..."
2,3,version 1,eurasia west,none,"POLYGON ((-1971478.000 9423304.000, -1881478.0..."
3,4,version 1,eurasia west,none,"POLYGON ((-2241478.000 9333304.000, -2151478.0..."
4,5,version 1,eurasia west,none,"POLYGON ((-2151478.000 9333304.000, -2061478.0..."


# Build a DPS list

In [246]:
DPS_INPUT_TILE_NUM_LIST = boreal_tiles['tile_num'].to_list()
print(len(DPS_INPUT_TILE_NUM_LIST))

5197


In [8]:
RUN_MISSING_TILES = True
MISSING_TILES = list(range(3448, 3468)) + list(range(3355,3375)) + list(range(3543, 3563)) 
if RUN_MISSING_TILES:
    DPS_INPUT_TILE_NUM_LIST = MISSING_TILES

In [9]:
len(DPS_INPUT_TILE_NUM_LIST)

60

#### Note: make sure the `in_params_dict` coincides with the args of `tile_forestage.py`

In [12]:
in_params_dict = {
            'in_url': 'https://datapub.gfz-potsdam.de/download/10.5880.GFZ.1.4.2023.006-VEnuo/GAMIv2-1_2010-2020_100m.nc',
            'in_vector_fn': 'https://maap-ops-workspace.s3.amazonaws.com/shared/montesano/databank/boreal_tiles_v004.gpkg' ,
            'in_id_col': 'tile_num',
            'in_id_num': '' ,
            'year': '2020'
            }

In [13]:
in_params_dict

{'in_url': 'https://datapub.gfz-potsdam.de/download/10.5880.GFZ.1.4.2023.006-VEnuo/GAMIv2-1_2010-2020_100m.nc',
 'in_vector_fn': 'https://maap-ops-workspace.s3.amazonaws.com/shared/montesano/databank/boreal_tiles_v004.gpkg',
 'in_id_num': '',
 'in_id_col': 'tile_num',
 'year': '2020'}

## Run a DPS job across the list

In [17]:
# MAAP algorithm version name
IDENTIFIER='forestage_2020'
MAAP_VERSION = 'tile_forestage_v1'
ALGO_ID = "run_tile_forestage"
USER = 'montesano'
WORKER_TYPE = 'maap-dps-gedi_boreal_worker-16gb'

In [78]:
RUN_NAME = IDENTIFIER
print(f"{ALGO_ID}, {MAAP_VERSION}, {RUN_NAME}")

run_build_stack, build_stack_v2023_2, TCC_TP_2020


In [80]:
%%time

import json

submit_results_df_list = []
len_input_list = len(DPS_INPUT_TILE_NUM_LIST)
print(f"# of input tiles for DPS: {len_input_list}")

for i, INPUT_TILE_NUM in enumerate(DPS_INPUT_TILE_NUM_LIST):
    
    DPS_num = i+1
    
    # Update the in_params_dict with th current INPUT_TILE_NUM
    in_params_dict['in_tile_num'] = INPUT_TILE_NUM
    
    submit_result = maap.submitJob(
            identifier=IDENTIFIER,
            algo_id=ALGO_ID,
            version=MAAP_VERSION,
            username=USER, # username needs to be the same as whoever created the workspace
            queue=WORKER_TYPE,
            **in_params_dict
        )
    
    # Build a dataframe of submission details
    submit_result_df = pd.DataFrame( 
        {
                'dps_num':[DPS_num],
                'tile_num':[INPUT_TILE_NUM],
                'submit_time':[datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%s')],
                'dbs_job_hour': [datetime.datetime.now().hour],
                'algo_id': [ALGO_ID],
                'user': [USER],
                'worker_type': [WORKER_TYPE],
                'job_id': [submit_result.id],
                'submit_status': [submit_result.status],
            
        } 
    )
    
    # Append to a list of data frames of submission results
    submit_results_df_list.append(submit_result_df)
    
    if DPS_num in [1, 5, 10, 50, 100, 250, 500, 750, 1000, 1500, 2000, 2500, 3000, 3500, 4000, 4500, 5000, 7000, 9000, 11000, 13000, 15000, 17000, 19000, 21000, 24000, len_input_list]:
        print(f"DPS run #: {DPS_num}\t| tile num: {INPUT_TILE_NUM}\t| submit status: {submit_result.status}\t| job id: {submit_result.id}") 
        
# Build a final submission results df and save
submit_results_df = pd.concat(submit_results_df_list)
submit_results_df['run_name'] = RUN_NAME
nowtime = pd.Timestamp.now().strftime('%Y%m%d%H%M')
print(f"Current time:\t{nowtime}")
submit_results_df.to_csv(f'/projects/my-public-bucket/dps_submission_results/DPS_{ALGO_ID}_{RUN_NAME}_submission_results_{len_input_list}_{nowtime}.csv')
submit_results_df.info()

# of input tiles for DPS: 287
DPS run #: 1	| tile num: 39425	| submit status: success	| job id: 5513828d-4b53-4300-bba4-7137b28de73a
DPS run #: 5	| tile num: 1031	| submit status: success	| job id: 32ebcb9f-bb60-4a9d-b955-27f026480c24
DPS run #: 10	| tile num: 1548	| submit status: success	| job id: 70b592ee-e8b9-45e9-b19a-1248024b06c5
DPS run #: 50	| tile num: 4392	| submit status: success	| job id: 0fde5ca7-45da-41c0-a28c-8e8445618739
DPS run #: 100	| tile num: 1197	| submit status: success	| job id: a125208e-d55c-4daf-8ad3-6a3fb76b7219
DPS run #: 250	| tile num: 428	| submit status: success	| job id: 9cff7ae8-82f9-4ba3-b139-a5907048ebce
DPS run #: 287	| tile num: 504	| submit status: success	| job id: c2953a09-1535-4835-9f85-0ae95174884f
Current time:	202501170803
<class 'pandas.core.frame.DataFrame'>
Index: 287 entries, 0 to 0
Data columns (total 10 columns):
 #   Column         Non-Null Count  Dtype 
---  ------         --------------  ----- 
 0   dps_num        287 non-null    in

After almost any DPS job, you have to assess what was marked as `success` and `fail`.  

This involves:
1. building a table of job status based on job ids captured in the job_results_df from the DPS run chunk (this takes 40 mins for ~47k jobs). This tells you how many jobs failed.
2. merging the `job status table` with the `job results df`. This tells you which specific granules (or tile nums) failed.
3. building another input list of granules/tiles for a follow-up DPS run.
## Assess DPS results
Build a table of job status based on job id - how many jobs failed?

In [81]:
import importlib
import ExtractUtils
importlib.reload(ExtractUtils)

NASA MAAP


<module 'ExtractUtils' from '/projects/code/icesat2_boreal/lib/ExtractUtils.py'>

In [82]:
LIST_SUBMISSIONS = sorted(glob.glob(f'/projects/my-public-bucket/dps_submission_results/DPS_{ALGO_ID}_*_submission_results_*.csv'),key=ExtractUtils.func, reverse=True)
LIST_SUBMISSIONS[0:1]

['/projects/my-public-bucket/dps_submission_results/DPS_run_build_stack_TCC_TP_2020_submission_results_287_202501170803.csv']

In [None]:
%%time

running_list = []
fails_list = []
success_list = []
offline_list = []

#for DPS_DATETIME in [nowtime]:
for fn in LIST_SUBMISSIONS[0:3]:
    #if DPS_DATETIME in fn and not 'job_status' in fn:

    DPS_alg_id = os.path.basename(fn.split('_submission_results_')[0].replace('DPS_',''))
    thentime = fn.split('_')[-1].replace('.csv','')
    print(f'DPS alg:\t\t{DPS_alg_id}')
    print(f'DPS run name:\t\t{RUN_NAME}')
    print(f'DPS launch time:\t{thentime}')

    # Build job status table
    df_jstatus = ExtractUtils.BUILD_TABLE_JOBSTATUS(pd.read_csv(fn))

    # Save job status table
    df_jstatus.to_csv(f'/projects/my-public-bucket/dps_submission_results/DPS_{RUN_NAME}_submission_results_job_status_{len(df_jstatus)}_{thentime}.csv')

    # Get current fails df and append to list
    df_jstatus['run_type'] = RUN_NAME

    running_list.append(df_jstatus[ (df_jstatus['status'] == 'Running') ] )
    fails_list.append(  df_jstatus[ (df_jstatus['status'] == 'Failed') ] )
    success_list.append(df_jstatus[ (df_jstatus['status'] == 'Succeeded') ] )
    offline_list.append(df_jstatus[ (df_jstatus['status'] == 'Offline') ] )
    print(f"Count offline jobs: {df_jstatus[ (df_jstatus['status'] == 'Offline') ].shape[0]}\n")
            
df_all_running = pd.concat(running_list)          
df_all_fails =   pd.concat(fails_list)
df_all_success = pd.concat(success_list)
df_all_offline = pd.concat(offline_list)

In [27]:
#!aws s3 rm --recursive s3://maap-ops-workspace/montesano/dps_output/run_build_stack_topo/...../CopernicusGLO30

In [28]:
# New list = fails + still running + failed to even submit
FAILS = df_all_fails.tile_num.to_list() +\
        df_all_running.tile_num.to_list() +\
        df_all_offline.tile_num.to_list() +\
        list(set(df_jstatus[df_jstatus.submit_status == 'failed'].tile_num.to_list()))
FAILS = [int(i) for i in FAILS]
len(FAILS)
print(FAILS)
DPS_INPUT_TILE_NUM_LIST=FAILS

[]
