SBG - Spectral Resample Process - Application Notebook

In [2]:
import datetime as dt
import glob
import json
import logging
import os
from pathlib import Path

import shutil
import sys
import numpy as np
import hytools as ht
from hytools.io import parse_envi_header, write_envi_header
from hytools.io.envi import WriteENVI
from hytools.brdf import calc_flex_single,set_solar_zn
from hytools.topo import calc_scsc_coeffs
from hytools.masks import mask_create
from hytools.misc import set_brdf
from PIL import Image
import pystac
import spectral.io.envi as envi

from unity_sds_client.resources.dataset import Dataset
from unity_sds_client.resources.data_file import DataFile

# stage_in packages
from unity_sds_client.resources.collection import Collection

2024-02-22 18:45:47,625	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.


In [3]:

anc_names = ['path_length','sensor_az','sensor_zn',
                'solar_az', 'solar_zn','phase','slope',
                'aspect', 'cosine_i','utc_time','']


# Hard-code configuration dictionary
#######################################################
config_dict = {}
config_dict["topo"] =  {}
config_dict["topo"]['type'] =  'scs+c'
config_dict["topo"]['calc_mask'] = [["ndi", {'band_1': 850,'band_2': 660,
                                             'min': 0.1,'max': 1.0}],
                                    ['ancillary',{'name':'slope',
                                                  'min': np.radians(5),'max':'+inf' }],
                                    ['ancillary',{'name':'cosine_i',
                                                  'min': 0.12,'max':'+inf' }]]
config_dict["topo"]['apply_mask'] = [["ndi", {'band_1': 850,'band_2': 660,
                                             'min': 0.1,'max': 1.0}],
                                    ['ancillary',{'name':'slope',
                                                  'min': np.radians(5),'max':'+inf' }],
                                    ['ancillary',{'name':'cosine_i',
                                                  'min': 0.12,'max':'+inf' }]]
config_dict["topo"]['c_fit_type'] = 'nnls'

config_dict["brdf"] = {}
config_dict["brdf"]['type'] =  'flex'
config_dict["brdf"]['grouped'] =  False
config_dict["brdf"]['geometric'] = 'li_dense_r'
config_dict["brdf"]['volume'] = 'ross_thick'
config_dict["brdf"]["b/r"] = 2.5
config_dict["brdf"]["h/b"] = 2
config_dict["brdf"]['sample_perc'] = 0.1
config_dict["brdf"]['interp_kind'] = 'linear'
config_dict["brdf"]['calc_mask'] = [["ndi", {'band_1': 850,'band_2': 660,
                                              'min': 0.1,'max': 1.0}]]
config_dict["brdf"]['apply_mask'] = [["ndi", {'band_1': 850,'band_2': 660,
                                              'min': 0.1,'max': 1.0}]]
config_dict["brdf"]['bin_type'] = 'dynamic'
config_dict["brdf"]['num_bins'] = 18
config_dict["brdf"]['ndvi_bin_min'] = 0.1
config_dict["brdf"]['ndvi_bin_max'] = 1.0
config_dict["brdf"]['ndvi_perc_min'] = 10
config_dict["brdf"]['ndvi_perc_max'] = 95

config_dict["glint"]  = {}
config_dict['glint']['type'] = 'gao'
config_dict['glint']['correction_wave'] = 860
config_dict['glint']['apply_mask'] =  [["ndi", {'band_1': 850,'band_2': 660,
                                              'min': -1,'max': 0.}],
                                       ["band", {'band': 560,
                                              'min': 0,'max': 0.2}]]
config_dict['glint']['truncate'] = True
#######################################################

In [4]:
def generate_quicklook(input_file):

    img = ht.HyTools()
    img.read_file(input_file)
    image_file = input_file.replace('.bin','.png')

    if 'DESIS' in img.base_name:
        band3 = img.get_wave(560)
        band2 = img.get_wave(850)
        band1 = img.get_wave(660)
    else:
        band3 = img.get_wave(560)
        band2 = img.get_wave(850)
        band1 = img.get_wave(1660)

    rgb=  np.stack([band1,band2,band3])
    rgb[rgb == img.no_data] = np.nan

    rgb = np.moveaxis(rgb,0,-1).astype(float)
    bottom = np.nanpercentile(rgb,5,axis = (0,1))
    top = np.nanpercentile(rgb,95,axis = (0,1))
    rgb = np.clip(rgb,bottom,top)
    rgb = (rgb-np.nanmin(rgb,axis=(0,1)))/(np.nanmax(rgb,axis= (0,1))-np.nanmin(rgb,axis= (0,1)))
    rgb = (rgb*255).astype(np.uint8)

    im = Image.fromarray(rgb)
    im.save(image_file)


def generate_stac_metadata(header_file):

    header = envi.read_envi_header(header_file)
    base_name = os.path.basename(header_file)[:-4]

    metadata = {}
    metadata['id'] = base_name
    metadata['start_datetime'] = dt.datetime.strptime(header['start acquisition time'], "%Y-%m-%dt%H:%M:%Sz")
    metadata['end_datetime'] = dt.datetime.strptime(header['end acquisition time'], "%Y-%m-%dt%H:%M:%Sz")
    # Split corner coordinates string into list
    coords = [float(x) for x in header['bounding box'].replace(']', '').replace('[', '').split(',')]
    geometry = [list(x) for x in zip(coords[::2], coords[1::2])]
    # Add first coord to the end of the list to close the polygon
    geometry.append(geometry[0])
    metadata['geometry'] = {
        "type": "Polygon",
        "coordinates": geometry
    }
    base_tokens = base_name.split('_')
    metadata['collection'] = f"SISTER_{base_tokens[1]}_{base_tokens[2]}_{base_tokens[3]}_{base_tokens[5]}"
    metadata['properties'] = {
        'sensor': base_tokens[1],
        'description': header['description'],
        'product': base_tokens[3],
        'processing_level': base_tokens[2]
    }
    return metadata


def create_item(metadata, assets):
    item = pystac.Item(
        id=metadata['id'],
        datetime=metadata['start_datetime'],
        start_datetime=metadata['start_datetime'],
        end_datetime=metadata['end_datetime'],
        geometry=metadata['geometry'],
        collection=metadata['collection'],
        bbox=None,
        properties=metadata['properties']
    )
    # Add assets
    for key, href in assets.items():
        item.add_asset(key=key, asset=pystac.Asset(href=href))
    return item

Reflectance Correction Code

Inputs and Configurations

In the original pre-process, inputs are supplied by a run_config file. This consists of 2 entries (a reflectance file, uncertainty file, and a CRID).

In the Unity system, the data files required will be staged in for the application, and the crid is a config item that is passed in. To make this work in Unity, we will also pass in an "output collection" which is needed if we want to "persist" the output products in the data catalog.


In [5]:
#Retrieve Reflectance Dataset
input_dataset     = '/unity/ads/input_collections/REFLECT_CORRECT_MERGE/catalog.json' # type: stage-in
#input_dataset    = '/unity/ads/input_collections/SBG-L2-RSRFL/catalog.json' # type: stage-in
#input_L1B_dataset = '/unity/ads/input_collections/SBG-L1B-PRE/catalog.json' # type: stage-in
output_stac_catalog_dir   = '/unity/ads/outputs/SBG-L2A_CORFL/' # type: stage-out

output_collection_name    = 'urn:nasa:unity:unity:dev:SBG-L2A_CORFL___1'

#Pre-process variables
#From the config.json, retrieve the following information:
crid = "000" #hardcoded but will be passed in
experimental = False

Import Files from STAC Item Collection

In [8]:

inp_collection = Collection.from_stac(input_dataset)

data_filenames = inp_collection.data_files(['data'])
data_filenames 

if2
if2


[unity_sds_client.resources.DataFile(location=/unity/ads/input_collections/SBG-L2-RSRFL/./SISTER_EMIT_L2A_RSRFL_20240103T131936_001.bin),
 unity_sds_client.resources.DataFile(location=/unity/ads/input_collections/SBG-L2-RSRFL/./SISTER_EMIT_L2A_RSRFL_20240103T131936_001.hdr),
 unity_sds_client.resources.DataFile(location=/unity/ads/input_collections/SBG-L2-RSRFL/./SISTER_EMIT_L2A_RSRFL_20240103T131936_001_UNC.bin),
 unity_sds_client.resources.DataFile(location=/unity/ads/input_collections/SBG-L2-RSRFL/./SISTER_EMIT_L2A_RSRFL_20240103T131936_001_UNC.hdr),
 unity_sds_client.resources.DataFile(location=/unity/ads/input_collections/SBG-L1B-PRE/./SISTER_EMIT_L1B_RDN_20240103T131936_001.bin),
 unity_sds_client.resources.DataFile(location=/unity/ads/input_collections/SBG-L1B-PRE/./SISTER_EMIT_L1B_RDN_20240103T131936_001_LOC.bin),
 unity_sds_client.resources.DataFile(location=/unity/ads/input_collections/SBG-L1B-PRE/./SISTER_EMIT_L1B_RDN_20240103T131936_001_OBS.bin)]

Get the data files from the STAC files

In [13]:
for datafile in data_filenames:
    f = datafile.location
    if 'L1B_RDN' in f:
        if "_OBS.bin" in f:
           print(f)
           obs_base_name = Path(f).stem
           obs_file = f
        else:
            continue    
    elif "L2A_RSRFL" in f:
        if "_UNC.bin" in f:
            continue
            # print(f)
            # obs_base_name = Path(f).stem
            # obs_file = f
        elif ".bin" in f:
            print(f)
            rfl_base_name = Path(f).stem
            rfl_file = f
    else:
        continue
   
print(rfl_base_name)
print(obs_base_name)


/unity/ads/input_collections/SBG-L2-RSRFL/./SISTER_EMIT_L2A_RSRFL_20240103T131936_001.bin
/unity/ads/input_collections/SBG-L1B-PRE/./SISTER_EMIT_L1B_RDN_20240103T131936_001_OBS.bin
SISTER_EMIT_L2A_RSRFL_20240103T131936_001
SISTER_EMIT_L1B_RDN_20240103T131936_001_OBS


In [14]:
# Set up console logging using root logger
logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s", level=logging.INFO)
logger = logging.getLogger("sister-reflect_correct")
# Set up file handler logging
handler = logging.FileHandler("pge_run.log")
handler.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s %(levelname)s [%(module)s]: %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.info("Starting reflect_correct.py")

if experimental:
    disclaimer = "(DISCLAIMER: THIS DATA IS EXPERIMENTAL AND NOT INTENDED FOR SCIENTIFIC USE) "
else:
    disclaimer = ""

2024-02-22 18:51:59,625 INFO: Starting reflect_correct.py


In [15]:

if not os.path.exists(output_stac_catalog_dir):
    os.mkdir(output_stac_catalog_dir)

# rfl_base_name = os.path.basename(run_config['inputs']['reflectance_dataset'])
sister,sensor,level,product,datetime,in_crid = rfl_base_name.split('_')

# rfl_file = f'{run_config["inputs"]["reflectance_dataset"]}/{rfl_base_name}.bin'

out_rfl_file =  f'{output_stac_catalog_dir}/SISTER_{sensor}_L2A_CORFL_{datetime}_{crid}.bin'

#obs_base_name = os.path.basename(run_config['inputs']['observation_dataset'])
#obs_file = f'{run_config["inputs"]["observation_dataset"]}/{obs_base_name}.bin'

# Load input file
anc_files = dict(zip(anc_names,[[obs_file,a] for a in range(len(anc_names))]))

In [16]:
rfl = ht.HyTools()
rfl.read_file(rfl_file,'envi',anc_files)
rfl.create_bad_bands([[300,400],[1337,1430],[1800,1960],[2450,2600]])

if sensor in ['PRISMA','DESIS','EMIT']:
    corrections = ['Topographic','Glint']
else:
    corrections = ['Topographic','BRDF','Glint']

#Run corrections
if 'Topographic' in corrections:
    logger.info('Calculating topo coefficients')
    rfl.mask['calc_topo'] =  mask_create(rfl,config_dict['topo']['calc_mask'])
    rfl.mask['apply_topo'] =  mask_create(rfl,config_dict['topo']['apply_mask'])
    try: 
        calc_scsc_coeffs(rfl,config_dict['topo'])
        rfl.corrections.append('topo')
    except:
        logger.error("Errror calculating topo corrections.")
        pass
    
if 'BRDF' in corrections:
    logger.info('Calculating BRDF coefficients')
    set_brdf(rfl,config_dict['brdf'])
    set_solar_zn(rfl)
    rfl.mask['calc_brdf'] =  mask_create(rfl,config_dict['brdf']['calc_mask'])
    calc_flex_single(rfl,config_dict['brdf'])
    rfl.corrections.append('brdf')
    
if 'Glint' in corrections:
    try:
        logger.info('Setting glint coefficients')
        rfl.glint = config_dict['glint']
        rfl.corrections.append('glint')
    except:
        logger.error("Errror calculating glint corrections.")
        pass

#Export corrected reflectance
header_dict = rfl.get_header()
header_dict['description'] =f'{" ".join(corrections)} corrected reflectance'

logger.info('Exporting corrected image')
writer = WriteENVI(out_rfl_file,header_dict)
iterator = rfl.iterate(by='line', corrections=rfl.corrections)
while not iterator.complete:
    line = iterator.read_next()
    writer.write_line(line,iterator.current_line)
writer.close()

generate_quicklook(out_rfl_file)

# Take care of disclaimer in ENVI header and rename files
if experimental:
    out_hdr_file = out_rfl_file.replace(".bin", ".hdr")
    hdr = parse_envi_header(out_hdr_file)
    hdr["description"] = disclaimer + hdr["description"].capitalize()
    write_envi_header(out_hdr_file, hdr)
    for file in glob.glob(f"{output_stac_catalog_dir}/SISTER*"):
        shutil.move(file, f"{output_stac_catalog_dir}/EXPERIMENTAL-{os.path.basename(file)}")

corfl_file = glob.glob(output_stac_catalog_dir+"/*%s.bin" % crid)[0]
corfl_basename = os.path.basename(corfl_file)[:-4]

#output_runconfig_path = f'{output_stac_catalog_dir}/{corfl_basename}.runconfig.json'
#shutil.copyfile(run_config_json, output_runconfig_path)

output_log_path = f'{output_stac_catalog_dir}/{corfl_basename}.log'
if os.path.exists("pge_run.log"):
    shutil.copyfile('pge_run.log', output_log_path)

2024-02-22 18:52:22,277 INFO: Calculating topo coefficients


Reflectance and ancillary no data extents do not match, combining no data masks.


2024-02-22 18:52:23,492 ERROR: Errror calculating topo corrections.
2024-02-22 18:52:23,495 INFO: Setting glint coefficients
2024-02-22 18:52:23,499 INFO: Exporting corrected image


In [17]:


# Add items for data products
hdr_files = glob.glob(output_stac_catalog_dir + "/*SISTER*.hdr")
hdr_files.sort()
for hdr_file in hdr_files:
    # TODO: Use incoming item.json to get properties and geometry and use hdr_file for description (?)
    metadata = generate_stac_metadata(hdr_file)

In [18]:
#Uncertain about this part. What is supposed to get sent out to STAC? 
from datetime import datetime, timezone

# Create a collection
out_collection = Collection(output_collection_name)

# Add output file(s) to the dataset
file = glob.glob(f"{output_stac_catalog_dir}/*{crid}*.hdr")

if file:
    header = envi.read_envi_header(file[0])    
    start_time = metadata['start_datetime']
    end_time = metadata['end_datetime']
    # Create a Dataset for the collection
    name = os.path.splitext(os.path.basename(file[0]))[0]
    dataset = Dataset(
        name=name,
        collection_id=out_collection.collection_id, 
        start_time=start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
        end_time=end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
        creation_time=datetime.utcnow().replace(tzinfo=timezone.utc).isoformat(),
        )
    
    for file in glob.glob(f"{output_stac_catalog_dir}/*{crid}*"):  

        if file.endswith(".bin"):
            dataset.add_data_file(DataFile("binary", file, ["data"]))
        elif file.endswith(".png"):
            dataset.add_data_file(DataFile("image/png", file, ["browse"]))
        elif file.endswith(".hdr"):
            dataset.add_data_file(DataFile("header", file, ["data"]))
        else:
            dataset.add_data_file(DataFile(None, file, ["metadata"]))

    dataset.add_data_file(DataFile("text/json", output_stac_catalog_dir + '/' +  name +'.json', ["metadata"]))


# Add the dataset to the collection
out_collection._datasets.append(dataset)



In [19]:
Collection.to_stac(out_collection, output_stac_catalog_dir)