In [None]:
import os
import json
import shutil

from typing import List, Dict
from datetime import datetime
from hashlib import md5

## Defining your job inputs

The following cell is tagged with "parameters", which allows papermill to identify the cell containing per-run parameters
Cell tags may be accessed using the double-gear icon in JupyterLab's left-hand gutter.

All variables defined in the following cell are treated as job input parameters, and will be accessible through the `_context.json` file at runtime.

For more information, visit https://papermill.readthedocs.io/en/latest/

In [None]:
# Job input parameters
str_arg: str = "this is a string parameter"
py3_list_arg: List[str] = ["this", "is", "a", "list", "parameter"]
py2_list_arg = ["python2", "hinting", "is", "also", "supported"]  # type: List
int_arg: int = 100
float_arg: float = 10.2553
enumerated_arg: "enum" = ["yes", "no", "maybe so"]
dict_arg: Dict = {"a": 1, "b": 2, "c": 3}
inferred_str_arg = "it is also possible for papermill to infer parameter type"

# PCM-System Parameters
# These use reserved-prefix parameter names (_*) and are also parsed during `notebook-pge-wrapper specs` to generate the hysds-io and job-spec
_time_limit = 57389
_soft_time_limit = 4738
_disk_usage = "10GB"
_submission_type = "individual"
_required_queue = "factotum-job_worker-small"
_label = "PGE_NAME_PLACEHOLDER"

## Defining your process

The following cell contains trivial stubbed function examples as might be used in a job execution flow.

Generally, a job consists of retrieving some data based on the job's arguments, processing it somehow, and writing the output to one or more files.

In [None]:
def retrieve_data() -> str:
    return 'This is generally an existing product or stack from either the HySDS catalog (via pele) or a 3rd-party DAAC like ASF.'

def process_and_store_data(data) -> None:
    processed_data = data.upper()

    with open('my_sample_product.txt', 'w+') as out_file:
        out_file.write(processed_data)


## Defining your job outputs and metadata files

The following cell contains the functions necessary to create a trivial data product for ingestion into the PCM data product catalog.

These functions should be augmented to include your desired dataset definition data, metadata and job output files

It is also typical to include important fields (e.g. track number, orbit direction and temporal bound timestamps) in the dataset id

In [None]:
working_dir = os.path.abspath(os.curdir)

def generate_dummy_context_file() -> None:
    """When run in HySDS, a _context.json file will be present in the working directory, so this is only necessary for local development"""
    filepath: str = os.path.join(working_dir, '_context.json')
    print(f'Writing dummy context to {filepath}')
    with open(filepath, 'w+') as context_file:
        json.dump({'run_timestamp': datetime.now().isoformat()}, context_file)

def generate_dataset_id(id_prefix: str, context: str) -> str:
    """Generates a globally-unique ID for the data product produced.
    Uniqueness is generally ensured by the context, which will (theoretically) be either unique, or subject to deduplication by HySDS"""
    
    hash_suffix = md5(context.encode()).hexdigest()[0:5]

    job_id = f'{id_prefix}-{datetime.now().isoformat()}-{hash_suffix}'

    print(f'Generated job ID: {job_id}')
    return job_id


def generate_dataset_file(dataset_id: str, **kwargs) -> None:
    """Stores standardized metadata used for indexing products in HySDS GRQ"""
    dataset_definition_filepath: str = os.path.join(working_dir, dataset_id, f'{dataset_id}.dataset.json')
    metadata: dict = {
        'version': kwargs.get('version', 'v1.0'),
    }
    
    optional_fields = [
        'label',
        'location',  # Must adhere to geoJSON "geometry" format
        'starttime',
        'endtime'
    ]
    for field in optional_fields:
        if field in kwargs:
            metadata[field] = kwargs.get(field)
    
    with open(dataset_definition_filepath, 'w+') as dataset_file:
        print(f'Writing to {dataset_definition_filepath}')
        json.dump(metadata, dataset_file)
    
def generate_metadata_file(dataset_id: str, metadata: Dict) -> None:
    """Stores custom metadata keys/values used for indexing products in HySDS GRQ"""
    metadata_filepath: str = os.path.join(working_dir, dataset_id, f'{dataset_id}.met.json')
    with open(metadata_filepath, 'w+') as metadata_file:
        print(f'Writing to {metadata_filepath}')
        json.dump(metadata, metadata_file)
        

        
def generate_data_product(working_dir: str = working_dir, id_prefix: str = 'ON_DEMAND-MY_JOB_TYPE') -> None:
    """Generates metadata/dataset files and packages them in a specially-named directory with the desired job output files, for ingestion into the data product catalog"""
    context_filepath: str = os.path.join(working_dir, '_context.json') 
    with open(context_filepath) as context_file:
        context: str = context_file.read()
            
    dataset_id: str = generate_dataset_id(id_prefix, context)
    
    data_product_dir = os.path.join(working_dir, dataset_id)
    print(f'Generating data product at {data_product_dir}')
    
    os.mkdir(data_product_dir)
    generate_metadata_file(dataset_id, {'my_metadata_field': 'metadata_value'})
    generate_dataset_file(dataset_id)
    
    print(f'Moving PGE output...')
    shutil.move(os.path.join(working_dir, 'my_sample_product.txt'), os.path.join(data_product_dir, 'my_sample_product.txt'))
        
    
    


## Defining your job's high-level execution flow

The following cell contains a trivial set of procedural calls, which will be run

In [None]:
generate_dummy_context_file()

data = retrieve_data()
process_and_store_data(data)

generate_data_product()

print('PGE execution complete!')
