# HLS Monthly Median Collection

This notebook demonstrates the collection of monthly median HLS tiles given a catalog generated in the first [notebook](01-hls-catalog.ipynb), or any catalog generated with the utils.hls.HLSCatalog class methods. Dask is utilized for working with excessively large files that do not fit in RAM. While local computing will function in the given Anaconda environment, scaling with Microsoft's Planetary Computer, or (even better) an equivalent environment in the East US 2 region (where HLS data is stored) is recommended. It is also recommended to monitor both log outputs and blob storage during collection to ensure that tiles are being continuously collected and stored without error.

In [1]:
import os
import sys
sys.path.append('..')

# pip/conda installed
import fsspec
try:
    from dask_gateway import GatewayCluster
    clustenv='distributed'
except ModuleNotFoundError:
    clustenv='local'
    print('Using a local cluster...')

from utils import get_logger
from utils.hls.catalog import HLSCatalog
from utils.hls.compute import process_jobs
from utils.hls.compute import jobs_from_catalog
from utils.hls.compute import calculate_job_median

In [2]:
# fill with your desired blob container for tile data collection
%store -r envdict
envdict['TILE_BLOB_CONTAINER'] = ''
envdict['COL_ENV'] = clustenv
%store envdict

Stored 'envdict' (dict)


In [3]:
# If a local cluster is being used, only the workers and clust_type arguments are used and resources are allocated dynamically.
logger = get_logger('hls-test')
cluster_args = dict(
    workers=128, #int(.8*os.cpu_count())
    clust_type=envdict['COL_ENV'],
    worker_threads=1,
    worker_memory=8,
    scheduler_threads=4,
    scheduler_memory=8
)
code_path = '../utils'

In [None]:
catalog_url = fsspec.get_mapper(
    f"az://{envdict['CATALOG_BLOB_CONTAINER']}/catalogs/hls_test_tiles.zarr",
    account_name="usfs",
    account_key=envdict['AZURE_STRG_ACCOUNT_KEY'] 
)
catalog = HLSCatalog.from_zarr(catalog_url)

In [None]:
# kwargs for calculate_job_median
job_groupby = "time.month"
bands = catalog.xr_ds.attrs['bands']
chunks = {'band': 1, 'x': 3660, 'y': 3660} # read an entire tile once (each tile is 3660x3660)

In [None]:
# Filter to scenes from 2015 and later, then group by year
yr_catalogs = catalog.xr_ds.where(catalog.xr_ds['year'] >= 2015, drop=True).groupby('year')
catalog_groupby = "tile"

In [None]:
for yr, ca in yr_catalogs:
    logger.info(f"Starting process for {yr}")
    ca.info()
    storage_prefix = f"{envdict['TILE_BLOB_CONTAINER']}/{yr}"
    if not os.path.exists('checkpoints/'):
        os.makedirs('checkpoints')
    checkpoint_path = f"checkpoints/{yr}.txt"
    jobs = jobs_from_catalog(ca, catalog_groupby)
    process_jobs(
        jobs=jobs,
        job_fn=calculate_job_median,
        checkpoint_path=checkpoint_path,
        logger=logger,
        cluster_args=cluster_args,
        code_path=code_path,
        concurrency=2,  # run 2 jobs at once
        cluster_restart_freq=16,  # restart after 16 jobs
        # kwargs for calculate_job_median
        job_groupby=job_groupby,
        bands=bands, 
        chunks=chunks,
        account_name=envdict['AZURE_STRG_ACCOUNT_NAME'],
        storage_container=storage_prefix,
        account_key=envdict['AZURE_STRG_ACCOUNT_KEY'],
    )