# Sounder SIPS L1A PGE Interface

In [None]:
import os, sys, shutil
import re
from glob import glob
import logging
import subprocess
from pprint import pformat

In [None]:
from xml.etree import ElementTree
from xml.etree.ElementTree import Element, tostring
from xml.dom import minidom

## Execution Parameters

In [None]:
# Location of input L0 files
input_path = "/pge/in"

# Where PGE output files and log files get written
output_path = "/pge/out"

# Location of dem and mcf static files
data_static_path = "/tmp/static"

# Enable verbose logging
verbose = True

# Start/end time strings
start_datetime = "2016-01-14T09:54:00.000Z"
end_datetime = "2016-01-14T11:54:00:000Z"

## Constants

In [None]:
# Where PGE static config files
config_static_path = "/pge/static"

# Source XML file to be modified with execution parameters
config_template_filename = "/pge/static/pge_config_template.xml"

# Where config file gets written
config_output_filename = os.path.join(output_path, "l1a_config.xml")

# Name of output log filename
log_filename = os.path.join(output_path, "L1AMw_main.log")

# Location of PGE executable
pge_executable = "/pge/bin/L1AMw_main"

# Location of MetExtractor executable
met_extractor_executable = "/pge/bin/MetExtractor"

## Set up Logging

In [None]:
if verbose:
    logging.basicConfig(level=logging.DEBUG)
else:
    logging.basicConfig(level=logging.INFO)
    
logger = logging.getLogger("PGE Wrapper")

## Output Path Creation

In [None]:
# Make sure the output directory exists
if not os.path.exists(output_path):
    logging.info(f"Creating missing output directory: {output_path}")
    os.makedirs(output_path)

## Set Up Static Data 

In [None]:
# Create symbolic links to expected location of static files

dem_src_path = os.path.join(data_static_path, "dem")
mcf_src_path = os.path.join(data_static_path, "mcf")

for path in (dem_src_path, mcf_src_path):
    if not os.path.exists(path):
        raise Exception(f"Source static data path not found: {path}")

dem_dst_path = "/tmp/static/dem"
mcf_dst_path = "/tmp/static/mcf"

for src_path, dst_path in [(dem_src_path, dem_dst_path), (mcf_src_path, mcf_dst_path)]:
    # Only error if the destination path exists and is not the same as the source
    if os.path.exists(dst_path) and not os.path.realpath(src_path) == os.path.realpath(dst_path):
        raise Exception(f"Destination static data path already exists: {dst_path}")
        
    # Destination path could exist if it is the same as the source path
    if not os.path.exists(dst_path):
        # Create containing directory if it does not exist, for instance if it is a temp directory
        dst_base = os.path.dirname(dst_path)
        if not os.path.exists(dst_base):
            os.makedirs(dst_base)
        
        logger.info(f"Creating symbolic link to static data: {src_path} -> {dst_path}")
        os.symlink(src_path, dst_path)

## Sort Input Files

In [None]:
def extract_l0_file_info(filename):
    match = re.match("P(\d{3})(\d{4})([A-Z]{13})T(\d{12})(\d{2}).PDS", os.path.basename(filename))
    
    if not match:
        raise Exception(f"Could not parse L0 filename: {filename}")

    return {
        'scid': match.group(1),
        'apid': match.group(2),
        'product_name': match.group(3),
        'creation_time': match.group(4),
        'numeric_id': match.group(5),
    }

In [None]:
# Find input files recursively
input_filenames = glob(os.path.join(input_path, "**", "*.PDS"), recursive=True)

# Filter filenames by product type
files_by_product = {}
for fn in input_filenames:
    file_info = extract_l0_file_info(fn)
    
    product_files = files_by_product[file_info['product_name']] = files_by_product.get(file_info['product_name'], [])
    product_files.append(fn)

In [None]:
# Cut the list of files in half where half goes to previous and half to current files
for prod_name in files_by_product.keys():
    prod_file_list = sorted(files_by_product[prod_name])
    num_files = len(prod_file_list)
    
    prev_files = prod_file_list[:num_files//2]
    curr_files = prod_file_list[num_files//2:]
    
    files_by_product[prod_name] = {
        'prev': prev_files,
        'curr': curr_files,
    }
    
logger.debug("Sorted input files:\n" + pformat(files_by_product))

## Create XML Configuration

In [None]:
# Parse configuration XML
config_root = ElementTree.parse(config_template_filename).getroot()

In [None]:
# Maps file product types to the config elements
prod_type_to_vector = {
    "AAAAAAAAAAAAA": "SNPP_EphAtt",
    "ATMSSCIENCEAA": "ATMS_SCIENCE",
}

In [None]:
# Modify input filenames
inp_file_elem = config_root.find("./group[@name='InputProductFiles']")

if inp_file_elem is None:
    raise Exception(f"Could not find InputProductFiles group in XML config template: {config_template_filename}")

# For each product type and occurance type (prev/next) assign filenames to relevent vectors
for prod_name in files_by_product.keys():
    for occurance_type, occurance_filenames in files_by_product[prod_name].items():
        prod_elem_name = prod_type_to_vector[prod_name]
        
        vector_elem = inp_file_elem.find(f"./vector[@name='{occurance_type}_{prod_elem_name}']")

        for fn_elem, inp_filename in zip(vector_elem, occurance_filenames):
            fn_elem.text = inp_filename

In [None]:
# Modify path for output filenames
out_file_elem = config_root.find("./group[@name='OutputProductFiles']/vector")

if out_file_elem is None:
    raise Exception(f"Could not find OutputProductFiles group in XML config template: {config_template_filename}")
    
output_filenames = []
for fn_elem in out_file_elem:
    fn_elem.text = os.path.join(output_path, os.path.basename(fn_elem.text))
    output_filenames.append(fn_elem.text)

In [None]:
# Modify SFIF filename path
sfif_elem = config_root.find("./group[@name='StaticFileIdentificationFiles']/scalar")
sfif_elem.text = os.path.join(config_static_path, os.path.basename(sfif_elem.text))

In [None]:
# Modify MonitorPath
mon_path_elem = config_root.find(".//scalar[@name='MonitorPath']")
mon_path_elem.text = output_path

In [None]:
# Modify start/end time
start_dt_elem = config_root.find("./group[@name='GranuleIdentification']/scalar[@name='StartDateTime']")
start_dt_elem.text = start_datetime

end_dt_elem = config_root.find("./group[@name='GranuleIdentification']/scalar[@name='EndDateTime']")
end_dt_elem.text = end_datetime

In [None]:
schema_fn = config_root.attrib['{http://www.w3.org/2001/XMLSchema-instance}noNamespaceSchemaLocation']

config_root.attrib['{http://www.w3.org/2001/XMLSchema-instance}noNamespaceSchemaLocation'] = \
    os.path.join(config_static_path, os.path.basename(schema_fn))

In [None]:
# Write created config
logger.info(f"Writing config file: {config_output_filename}")

with open(config_output_filename, mode = 'w', encoding = 'utf-8') as output:
    rough = tostring(config_root, 'utf-8')
    reparsed = minidom.parseString(rough)
    pretty_xml = reparsed.toprettyxml(indent='  ', newl='')
    output.write(pretty_xml)

## Create L1A Template Files

In [None]:
# Open SFIF file to locate template L1A filename
sfif_root = ElementTree.parse(sfif_elem.text).getroot()

In [None]:
tmpl_elem = sfif_root.find("./group[@name='StaticAuxiliaryInputFiles']/scalar[@name='L1aMwTemplate']")
l1a_template_fn = tmpl_elem.text

for out_fn in output_filenames:
    logger.info(f"Creating template L1A output file: {out_fn}")
    shutil.copyfile(l1a_template_fn, out_fn)

## Run L1AMw_main PGE executable

In [None]:
# Run L1AMw_main PGE executable

# Change to out path so that any PGS temporary files are written there
os.chdir(output_path)

l1a_cmd = pge_executable + ' ' + config_output_filename + ' ' + log_filename

logger.info(f"Running PGE executable: {l1a_cmd}")

l1a_status = subprocess.run(l1a_cmd, shell=True)

if (l1a_status.returncode != 0):
    raise Exception(f"Execution of PGE resulting in non zero exit status: {l1a_status}, check log file for details: {log_filename}")

## Run Met Extractor

In [None]:
# Extract from the SFIF file the path to 
met_const_elem = sfif_root.find("./group[@name='OutputProductConfiguration']//scalar[@name='MetFileConstants']")
met_const_filename = met_const_elem.text

met_mapping_elem = sfif_root.find("./group[@name='OutputProductConfiguration']//scalar[@name='MetFileMappings']")
met_mapping_filename = met_mapping_elem.text

In [None]:
# Write pev file for to capture config parameters to Product metadata

pev_filename = os.path.join(output_path, 'spdc.pev')
omit_list = ['ProductionDateTime', 'ProductionLocation', 'ProductionLocationCode', 'CollectionLabel', 'NodeInfo']
group_path_list = ['JobIdentification', 'SCFIdentification']

In [None]:
def extract_config_group_to_pev(config_root, group_path, pev_file, omit_list):
    
    scalar_fields = config_root.findall(f".//group[@name='{group_path}']/scalar")
    
    for field in scalar_fields:
        name = field.attrib['name']
        value = field.text
        
        if name not in omit_list:
            pev_file.write(f"{name}={value}\n")

In [None]:
with open(pev_filename,'w') as pev_file:
    for group_path in group_path_list:
        extract_config_group_to_pev(config_root, group_path, pev_file, omit_list)
    pev_file.close()

In [None]:
for out_file in output_filenames:

    # make sure to write abspath into met file
    met_cmd = met_extractor_executable + ' -Ddata.file.reader.hdf5.data.types.map.file=' + met_mapping_filename + \
        ' --dataFile -file ' + os.path.abspath(out_file) + ' -reader SipsNcHDF5FileReader ' + \
        ' --metFile -toFile ' + out_file + '.cas -writer XmlCasWriter ' + \
        ' --supportFile -file ' + met_const_filename + ' -reader PropEqValFileReader ' + \
        ' --supportFile -file ' + pev_filename + ' -reader PropEqValFileReader -Ddebug=true'

    met_status = subprocess.run(met_cmd, shell=True)
    if (met_status.returncode != 0):
        raise Exception(f"Error executing MetExtractor command: {met_cmd}")