<div><img src="https://radar.community.uaf.edu/wp-content/uploads/sites/667/2021/03/HydroSARbanner.jpg" width="100%" /></div>

**NASA A.37 Project:** Integrating SAR Data for Improved Resilience and Response to Weather-Related Disasters
**PI:** Franz J. Meyer

# HydroSAR Transition workshop

## Archiving successful HyP3 jobs in an AWS S3 bucket

In order to automatically archive newly created HydroSAR products we:
1. Query HyP3 for all the successful HydroSAR jobs we've submitted
2. Query our archive for all the products we've archived
3. Deduplicate the products lists to determine the *new* products to archive
4. Transfer the new products to our archive

This notebook walks through the archiving process and would be run on a regular schedule (cron) in application.

Note: Here we're taking the strategy to *always* search for **all** possible products, *always* look up **all** previously created products, and then *deduplicate* the two lists to determine the new products. You could instead keep track of the last time the script was *successfully* run and only search for new products since then. While this would be more performant, our strategy is independent of previous runs so is generally more fault-tolerant and can be started and stopped at will.

### Query HyP3 for all the HydroSAR products

We use HyP3 as our workflow engine and can query for all the scenes we've already processed. Importantly, when we submit jobs, we will assign all of them a project name which is used to group jobs together and later search for them.

First we need to specify our project name so that we can find all the jobs associated with the Area of Interest (AOI) we're monitoring

In [None]:
project_name = 'HKHwatermaps'

In this notebook, we'll prompt for an Earthdata Login username and password, but they can be provided via the `username` and `password` keyword arguments, or automatically pulled from the users `~/.netrc` file.

Note: Typically you'll want to use a shared "operational" Earthdata Login user as you can only search for jobs associated with your username.

In [None]:
import hyp3_sdk as sdk
hyp3 = sdk.HyP3('https://hyp3-watermap.asf.alaska.edu', prompt=True)

Now we'll search for all the jobs with our project name and filter to the succeeded jobs.

In [None]:
processed_jobs = hyp3.find_jobs(name=project_name)
succeeded_jobs = processed_jobs.filter_jobs(running=False)

### 2. Query our archive for all the products we've archived

For our archive, we are storing all the products under a project prefix in an S3 Bucket.

In [None]:
archive_bucket = 'hyp3-nasa-disasters'

We'll build our set of archived products by querying the bucket using [boto3](https://boto3.amazonaws.com/v1/documentation/api/latest/index.html). In general, AWS and `boto3` require AWS credentials to interact with AWS resources. However, because ASF's product archive is a public bucket we can disable signing so that we don't need credentials to read from this bucket. So, we first setup `boto3`:

In [None]:
import boto3
from botocore.handlers import disable_signing

S3 = boto3.resource('s3')
S3.meta.client.meta.events.register('choose-signer.s3.*', disable_signing)

And now we query the bucket and create a set of all objects in the bucket

In [None]:
from pathlib import Path

project_archive = set()
for object in S3.Bucket(archive_bucket).objects.filter(Prefix=f'{project_name}/'):
        project_archive.add(Path(object.key).name)

Importantly, this list will be a set of file names. For example, one item in the set looks like:

In [None]:
print(next(iter(project_archive)))

If you're archiving products outside of AWS, please check out the [HydroSAR_service_archiving_local.ipynb](./HydroSAR_service_archiving_local.ipynb) notebook.

### 3. Deduplicate the products lists to determine the *new* products to archive

Every successful HyP3 `WATER_MAP` job will have created a set of HydroSAR products, each of which can be identified by its file suffix. First, we define the list products we'd like to archive:

In [None]:
product_suffixes = ['_VV.tif', '_VH.tif', '_dem.tif', '_rgb.tif', '_HAND.tif', '_WM.tif', '_WaterDepth.tif', '_FloodDepth.tif']

Then we'll loop through all the succeeded jobs, and see if all the products we want to archive are in the project archive.

Because the HydroSAR service, as currently implemented by ASF, is archiving products in a public bucket, we'll want to use `boto3` to transfer the products between buckets so that we don't have to download and re-upload them. For every unarchived product we'll create a list of dictionaries which hold the source S3 bucket information and the target S3 bucket information.

In [None]:
from tqdm import tqdm

products_to_archive = []
for job in tqdm(succeeded_jobs):
    hyp3_product_bucket = job.files[0]['s3']['bucket']
    hyp3_product_key = job.files[0]['s3']['key']
    for sfx in product_suffixes:
        source_key = hyp3_product_key.replace('.zip', sfx)
        target_key = source_key.replace(job.job_id, project_name)
        if target_key not in project_archive:
            products_to_archive.append(
                {
                    'source_bucket': hyp3_product_bucket,
                    'source_key': source_key,
                    'target_bucket': archive_bucket,
                    'target_key': target_key,
                }
            )

If instead, we wanted to archive the products *outside* of AWS, we would want to build a list of download links, which we cover in the [HydroSAR_service_archiving_local.ipynb](./HydroSAR_service_archiving_local.ipynb) notebook.

### 4. Transfer the new products to our archive

Before we start transferring our new products, we need to provide AWS credentials that can write to our bucket:

In [None]:
import os
from getpass import getpass

aws_access_key_id = getpass('AWS_ACCESS_KEY_ID: ')
os.environ['AWS_ACCESS_KEY_ID'] = aws_access_key_id

aws_secret_access_key = getpass('AWS_SECRET_ACCESS_KEY: ')
os.environ['AWS_SECRET_ACCESS_KEY'] = aws_secret_access_key

We can also set up a more performant AWS S3 transfer configuration:

In [None]:
from boto3.s3.transfer import TransferConfig

chunk_size = 104857600
transfer_config = TransferConfig(multipart_threshold=chunk_size, multipart_chunksize=chunk_size)

And then begin the transfer of products from HyP3 to our archive.

In [None]:
for product in tqdm(products_to_archive):
        bucket = S3.Bucket(product['target_bucket'])
        copy_source = {'Bucket': product['source_bucket'], 'Key': product['source_key']}
        bucket.copy(CopySource=copy_source, Key=product['target_key'], Config=transfer_config)

## Operations

To do all this operationally, you'll typically want to extract all the above configuration into a separate configuration file, and the python code into an easily executable script, and then execute the script regularly via a cron job (or similar) *somewhere*.

In practice, ASF:
* Has defined all the configuration in this file: <https://github.com/ASFHyP3/hyp3-nasa-disasters/blob/main/data_management/hkh_watermaps.json>
* Scripted this notebook: <https://github.com/ASFHyP3/hyp3-nasa-disasters/blob/main/data_management/hyp3_transfer_script.py>
* Executes the script every 6 hours using GitHub Actions "scheduled" (cron) workflows: <https://github.com/ASFHyP3/hyp3-nasa-disasters/blob/main/.github/workflows/process-and-transfer.yml>
