## Stage Out Example

Steps
- Install Data Service (DS) Client Library
- Set Log Level
- Set Environment variables for Stage-Out
- Try Stage-Out (Dry-Run Only)
- Try Stage-Out
- Check the results

In [3]:
from anyio.streams import file
%pip install mdps-ds-lib

Note: you may need to restart the kernel to use updated packages.


### Setting the Log Level
- Log level mappings:
- 10 = debug
- 20 = info
- 30 = warning
- 40 = error

In [4]:
import logging
log_level = 30
logging.basicConfig(level=log_level, format="%(asctime)s [%(levelname)s] [%(name)s::%(lineno)d] %(message)s")

### Stage-out Only Environment Variables

Follow this link for more information: https://app.gitbook.com/o/xZRqGQeQXJ0RP4VMj7Lq/s/UMIRhLdbRQTvMWop8Il9/developer-docs/data/docs/users-guide/stage-out

In [23]:
import os
from mdps_ds_lib.lib.utils.file_utils import FileUtils

# The following environment variables are needed to "stage-in" from AWS S3
# They will also be needed if "EDL" settings and "STAC_AUTH_TYPE" settings are coming from parameter store
# Note that this may not be needed if JupyterNotebook can take care of the access.
os.environ['AWS_ACCESS_KEY_ID'] = 'xxx'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'xxx'
os.environ['AWS_SESSION_TOKEN'] = 'xxx'

os.environ['PROJECT'] = 'DEMO'
os.environ['VENUE'] = 'DEV43'

os.environ['VERIFY_SSL'] = 'FALSE'
os.environ['RESULT_PATH_PREFIX'] = ''  # We can usually ignore this
os.environ['STAGING_BUCKET'] = 'uds-sbx-cumulus-staging'
os.environ['GRANULES_SEARCH_DOMAIN'] = 'UNITY'
os.environ['OUTPUT_FILE'] = 'normal-stage-out/some_output/output.json'
os.environ['OUTPUT_DIRECTORY'] = 'normal-stage-out/output_dir'
FileUtils.mk_dir_p(os.environ.get('OUTPUT_DIRECTORY'))
os.environ['CATALOG_FILE'] = 'normal-stage-out/catalog.json'

#### Sample catalog.json used as a guide for stage-out

In [None]:
{
    "type": "Catalog",
    "id": "NA",
    "stac_version": "1.0.0",
    "description": "NA",
    "links": [
        {
            "rel": "root",
            "href": "/tmp/normal-stage-out/catalog.json",
            "type": "application/json"
        },
        {
            "rel": "item",
            "href": "some_granules/test_file01.nc.stac.json",
            "type": "application/json"
        },
        {
            "rel": "item",
            "href": "some_granules/test_file02.nc.stac.json",
            "type": "application/json"
        },
        {
            "rel": "item",
            "href": "some_granules/test_file03.nc.stac.json",
            "type": "application/json"
        },
        {
            "rel": "item",
            "href": "some_granules/test_file04.nc.stac.json",
            "type": "application/json"
        },
        {
            "rel": "item",
            "href": "some_granules/test_file05.nc.stac.json",
            "type": "application/json"
        },
        {
            "rel": "item",
            "href": "some_granules/test_file06.nc.stac.json",
            "type": "application/json"
        },
        {
            "rel": "item",
            "href": "some_granules/test_file07.nc.stac.json",
            "type": "application/json"
        },
        {
            "rel": "item",
            "href": "some_granules/test_file08.nc.stac.json",
            "type": "application/json"
        },
        {
            "rel": "item",
            "href": "some_granules/test_file09.nc.stac.json",
            "type": "application/json"
        },
        {
            "rel": "item",
            "href": "some_granules/test_file10.nc.stac.json",
            "type": "application/json"
        }
    ]
}

- Creating Some Mock Files for stage-out including STAC metadata file and a catalog
- Note that this is usually not done here as the application will take care of it.
- How each stac metadata file is created can be used as an example though

In [14]:
import json
from mdps_ds_lib.lib.utils.time_utils import TimeUtils
from pystac import Item, Asset, Catalog, Link

granules_dir = os.path.join('normal-stage-out', 'some_granules')
FileUtils.mk_dir_p(granules_dir)  # base directory
total_files = 3  # Uploading 3 mock granules

catalog = Catalog(  # Creating a STAC catalog (an example from previous cell)
    id='NA',  # we don't need to know the ID
    description='NA')
catalog.set_self_href(os.environ['CATALOG_FILE'])

for i in range(1, total_files+1):
    filename = f'test_file{i:02d}'
    with open(os.path.join(granules_dir, f'{filename}.nc'), 'w') as ff:  # Creating Data File
        ff.write('sample_file')
    with open(os.path.join(granules_dir, f'{filename}.nc.cas'), 'w') as ff:  # Creating native metadata file
        ff.write('''<?xml version="1.0" encoding="UTF-8" ?>
            <cas:metadata xmlns:cas="http://oodt.jpl.nasa.gov/1.0/cas">
                <keyval type="scalar">
                    <key>AggregateDir</key>
                    <val>snppatmsl1a</val>
                </keyval>
                <keyval type="vector">
                    <key>AutomaticQualityFlag</key>
                    <val>Passed</val>
                </keyval>
                <keyval type="vector">
                    <key>BuildId</key>
                    <val>v01.43.00</val>
                </keyval>
                <keyval type="vector">
                    <key>CollectionLabel</key>
                    <val>L1AMw_nominal2</val>
                </keyval>
                <keyval type="scalar">
                    <key>DataGroup</key>
                    <val>sndr</val>
                </keyval>
                <keyval type="scalar">
                    <key>EndDateTime</key>
                    <val>2016-01-14T10:06:00.000Z</val>
                </keyval>
                <keyval type="scalar">
                    <key>EndTAI93</key>
                    <val>726919569.000</val>
                </keyval>
                <keyval type="scalar">
                    <key>FileFormat</key>
                    <val>nc4</val>
                </keyval>
                <keyval type="scalar">
                    <key>FileLocation</key>
                    <val>/pge/out</val>
                </keyval>
                <keyval type="scalar">
                    <key>Filename</key>
                    <val>SNDR.SNPP.ATMS.L1A.nominal2.02.nc</val>
                </keyval>
                <keyval type="vector">
                    <key>GranuleNumber</key>
                    <val>101</val>
                </keyval>
                <keyval type="scalar">
                    <key>JobId</key>
                    <val>f163835c-9945-472f-bee2-2bc12673569f</val>
                </keyval>
                <keyval type="scalar">
                    <key>ModelId</key>
                    <val>urn:npp:SnppAtmsL1a</val>
                </keyval>
                <keyval type="scalar">
                    <key>NominalDate</key>
                    <val>2016-01-14</val>
                </keyval>
                <keyval type="vector">
                    <key>ProductName</key>
                    <val>SNDR.SNPP.ATMS.20160114T1000.m06.g101.L1A.L1AMw_nominal2.v03_15_00.D.201214135000.nc</val>
                </keyval>
                <keyval type="scalar">
                    <key>ProductType</key>
                    <val>SNDR_SNPP_ATMS_L1A</val>
                </keyval>
                <keyval type="scalar">
                    <key>ProductionDateTime</key>
                    <val>2020-12-14T13:50:00.000Z</val>
                </keyval>
                <keyval type="vector">
                    <key>ProductionLocation</key>
                    <val>Sounder SIPS: JPL/Caltech (Dev)</val>
                </keyval>
                <keyval type="vector">
                    <key>ProductionLocationCode</key>
                    <val>D</val>
                </keyval>
                <keyval type="scalar">
                    <key>RequestId</key>
                    <val>1215</val>
                </keyval>
                <keyval type="scalar">
                    <key>StartDateTime</key>
                    <val>2016-01-14T10:00:00.000Z</val>
                </keyval>
                <keyval type="scalar">
                    <key>StartTAI93</key>
                    <val>726919209.000</val>
                </keyval>
                <keyval type="scalar">
                    <key>TaskId</key>
                    <val>8c3ae101-8f7c-46c8-b5c6-63e7b6d3c8cd</val>
                </keyval>
            </cas:metadata>''')
    # Creating STAC metadata file object which is used by DS during stage-out and cataloging.
    # pystac library is used to create a stac item file to have a standardized stac.
    stac_item = Item(id=filename,
                     geometry={  # Set them if the algorithm knows what type of geometry is needed
                         "type": "Point",
                         "coordinates": [0.0, 0.0]
                     },
                     bbox=[-180, -90, 180, 90],  # Set them if the algorithm knows what type of geometry is needed
                     datetime=TimeUtils().parse_from_unix(0, True).get_datetime_obj(),  # Current metadata file creation datetime
                     properties={  # These 4 fields are mandatory
                         "start_datetime": "2016-01-31T18:00:00.009057Z",
                         "end_datetime": "2016-01-31T19:59:59.991043Z",
                         "created": "2016-02-01T02:45:59.639000Z",
                         "updated": "2022-03-23T15:48:21.578000Z",
                     },
                     href=os.path.join('some_granules', f'{filename}.nc.stac.json'),
                     collection='NA',  # No need to find out what collection it belongs to DS will take care of that.
                     assets={  # Point to all relevant files including itself
                         f'{filename}.nc': Asset(os.path.join('.', f'{filename}.nc'), title='test_file01.nc', roles=['data']),
                         f'{filename}.nc.cas': Asset(os.path.join('.', f'{filename}.nc.cas'), title='test_file01.nc.cas', roles=['metadata']),
                         f'{filename}.nc.stac.json': Asset(os.path.join('.', f'{filename}.nc.stac.json'), title='test_file01.nc.stac.json', roles=['metadata']),
                     })
    with open(os.path.join(granules_dir, f'{filename}.nc.stac.json'), 'w') as ff:  # Creating STAC metadata file
        ff.write(json.dumps(stac_item.to_dict(False, False)))
    # Adding to the Catalog so that DS can find out the STAC file and finds out other related files from STAC
    catalog.add_link(Link('item', os.path.join('some_granules', f'{filename}.nc.stac.json'), 'application/json'))
# Writing the main catalog file
with open(os.environ['CATALOG_FILE'], 'w') as ff:
    ff.write(json.dumps(catalog.to_dict(False, False)))


### Performing Stage-out as a dry-run to verify if things are setup correctly

In [25]:
os.environ['DRY_RUN'] = 'TRUE'
from mdps_ds_lib.stage_in_out.upoad_granules_factory import UploadGranulesFactory

upload_result = UploadGranulesFactory().get_class(UploadGranulesFactory.UPLOAD_S3_BY_STAC_CATALOG).upload()
print(upload_result)

Result of dry-run
{
    "granule_file": "normal-stage-out/some_granules/test_file02.nc.stac.json",
    "s3_url": "s3://uds-sbx-cumulus-staging/URN:NASA:UNITY:DEMO:DEV43:NA___001/URN:NASA:UNITY:DEMO:DEV43:NA___001:test_file02/test_file02.nc"
}
{
    "granule_file": "normal-stage-out/some_granules/test_file02.nc.stac.json",
    "s3_url": "s3://uds-sbx-cumulus-staging/URN:NASA:UNITY:DEMO:DEV43:NA___001/URN:NASA:UNITY:DEMO:DEV43:NA___001:test_file02/test_file02.nc.cas"
}
{
    "granule_file": "normal-stage-out/some_granules/test_file02.nc.stac.json",
    "s3_url": "s3://uds-sbx-cumulus-staging/URN:NASA:UNITY:DEMO:DEV43:NA___001/URN:NASA:UNITY:DEMO:DEV43:NA___001:test_file02/test_file02.nc.stac.json"
}
{
    "granule_file": "normal-stage-out/some_granules/test_file03.nc.stac.json",
    "s3_url": "s3://uds-sbx-cumulus-staging/URN:NASA:UNITY:DEMO:DEV43:NA___001/URN:NASA:UNITY:DEMO:DEV43:NA___001:test_file03/test_file03.nc"
}
{
    "granule_file": "normal-stage-out/some_granules/test_file03.nc

### Performing Stage-out

In [32]:
from glob import glob

# Cleaning old result files.

old_result_files = glob(os.path.join(os.environ['OUTPUT_DIRECTORY'], '*'))
for each_file in old_result_files:
    FileUtils.remove_if_exists(each_file)

In [33]:

os.environ['DRY_RUN'] = 'FALSE'

FileUtils.remove_if_exists(os.environ['OUTPUT_DIRECTORY'])
from mdps_ds_lib.stage_in_out.upoad_granules_factory import UploadGranulesFactory

upload_result = UploadGranulesFactory().get_class(UploadGranulesFactory.UPLOAD_S3_BY_STAC_CATALOG).upload()
print(upload_result)

2025-02-10 13:29:23,648 [AUDIT] [mdps_ds_lib.stage_in_out.upload_granules_by_complete_catalog_s3::11] uploading type=data, name=test_file01.nc, href=normal-stage-out/some_granules/./test_file01.nc
2025-02-10 13:29:23,648 [AUDIT] [mdps_ds_lib.stage_in_out.upload_granules_by_complete_catalog_s3::11] uploading type=data, name=test_file02.nc, href=normal-stage-out/some_granules/./test_file02.nc
2025-02-10 13:29:23,650 [AUDIT] [mdps_ds_lib.stage_in_out.upload_granules_by_complete_catalog_s3::11] uploading type=data, name=test_file03.nc, href=normal-stage-out/some_granules/./test_file03.nc
2025-02-10 13:29:24,041 [AUDIT] [mdps_ds_lib.stage_in_out.upload_granules_by_complete_catalog_s3::11] uploading type=metadata, name=test_file02.nc.cas, href=normal-stage-out/some_granules/./test_file02.nc.cas
2025-02-10 13:29:24,043 [AUDIT] [mdps_ds_lib.stage_in_out.upload_granules_by_complete_catalog_s3::11] uploading type=metadata, name=test_file03.nc.cas, href=normal-stage-out/some_granules/./test_file0

{"type": "Catalog", "id": "NA", "stac_version": "1.0.0", "description": "NA", "links": [{"rel": "root", "href": "/Users/wphyo/Projects/unity/uds_lib/examples/normal-stage-out/catalog.json", "type": "application/json"}, {"rel": "item", "href": "normal-stage-out/output_dir/successful_features.json", "type": "application/json"}, {"rel": "item", "href": "normal-stage-out/output_dir/failed_features.json", "type": "application/json"}]}


- Successful and failed items are written to 2 files into the directory set in `OUTPUT_DIRECTORY`