fair_facts_v2_total:

- This is a minimal prototype of a total module for summing sealevel rise projections generated from different sources and modules. facts-total is a CLI tool that accepts a path to each netCDF file you would like summed as well as an output path where the summed result will be written. Each input netCDF file represents output from a FACTS sea level component module. It is the responsibility of the user to ensure that the desired and correct files are specified; check that file paths are correct and that each file specified belongs to the same scale ('global' or 'local').

- It is possible to run multiple FACTS sea-level components with different default values for common parameters such as pyear-start and pyear-end. If that happens, total will not cause a failure, but will show a message similar to the following:



In [1]:
!pip install radical-asyncflow

Defaulting to user installation because normal site-packages is not writeable


In [2]:
import asyncio
import time
import os
import shlex

from radical.asyncflow import WorkflowEngine
from radical.asyncflow import ConcurrentExecutionBackend
from concurrent.futures import ThreadPoolExecutor
from radical.asyncflow.logging import init_default_logger


In [3]:
import logging
import sys

# Create logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)  # Capture all levels

# Prevent duplicate logs if logger already has handlers
if not logger.handlers:
    # File handler
    file_handler = logging.FileHandler('fair_facts_v2_total.log')
    file_handler.setLevel(logging.DEBUG)
    
    # Console handler
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setLevel(logging.INFO)  # Only INFO+ to console
    
    # Formatter
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )
    
    file_handler.setFormatter(formatter)
    console_handler.setFormatter(formatter)
    
    # Add handlers
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)

# Use the logger
logger.debug("logger.debug() - This only goes to file")
logger.info("logger.info() - This goes to both file and console")
logger.warning("logger.warning() - Warning message")
logger.error("logger.error() - Test Error message")

# Get the full path
log_path = os.path.abspath(file_handler.baseFilename)
print(f"Log file exists {os.path.exists('fair_facts_v2_total.log')} at location: {log_path}")

2026-02-11 13:28:36 - __main__ - INFO - logger.info() - This goes to both file and console
2026-02-11 13:28:36 - __main__ - ERROR - logger.error() - Test Error message
Log file exists True at location: /gpfsm/dnb06/projects/p151/gtamkin/facts2.0/notebooks/fair_facts_v2_total.log


In [4]:
import asyncio
import subprocess

from pathlib import Path

def file_exists_and_has_content(filepath):
    path = Path(filepath)
    return path.is_file() and path.stat().st_size > 0
    
async def main():
    init_default_logger(logging.DEBUG)

    # Create backend and workflow
    engine = await ConcurrentExecutionBackend(ThreadPoolExecutor())
    flow = await WorkflowEngine.create(engine)
    
    # Ensure output directories exist
    def setup_directories():
        os.makedirs('./data/output/total', exist_ok=True)

    @flow.executable_task
    async def total_task(component, name):
        """Facts total task - executes singularity command"""
        filename = ""
        if (component == 'all'):
            filename = '/mnt/total_out/totaled_output_all_'+name+'.nc'
            _filename = './data/output/totaled_output_all_'+name+'.nc'
            cmd = [
                '/usr/local/other/singularity/4.0.3/bin/singularity', 'exec',
                '--bind', '/discover/nobackup/projects/sealevel/facts2.0/data/input:/mnt/total_in',
                '--bind', './data/output:/mnt/total_out',
                '/discover/nobackup/projects/sealevel/facts2.0/containers/sealevel-facts-total_latest-sandbox',
                'facts-total',
                '--item=/mnt/total_out/lws/'+name+'.nc',
                '--item=/mnt/total_out/sterodynamics/'+name+'.nc',
                # ADD ICE COMPONENT IF AVAILABLE:
                # '--item=/mnt/total_out/ice/'+name+'.nc',
                '--pyear-start=2020',
                '--pyear-end=2150',
                '--pyear-step=10',
                '--output-path='+filename
            ]
        else:
            filename = '/mnt/total_out/totaled_output_'+component+'_'+name+'.nc'
            _filename = './data/output/totaled_output_'+component+'_'+name+'.nc'
            cmd = [
                '/usr/local/other/singularity/4.0.3/bin/singularity', 'exec',
                '--bind', '/discover/nobackup/projects/sealevel/facts2.0/data/input:/mnt/total_in',
                '--bind', './data/output:/mnt/total_out',
                '/discover/nobackup/projects/sealevel/facts2.0/containers/sealevel-facts-total_latest-sandbox',
                'facts-total',
                '--item=/mnt/total_out/'+component+'/'+name+'.nc',
                '--pyear-start=2020',
                '--pyear-end=2150',
                '--pyear-step=10',
                '--output-path='+filename
            ]
        
        # Log the command
        cmd_str = shlex.join(cmd)
        logger.info(f"Executing: {cmd_str}")
        
        # RUN THE COMMAND ASYNCHRONOUSLY
        proc = await asyncio.create_subprocess_exec(
            *cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )
        
        stdout, stderr = await proc.communicate()

        # Verify that file was created
        path = Path(_filename)
        if path.is_file() and path.stat().st_size > 0:
            logger.info(f"Output successfully created: {path}")
        else:            
            error_msg = f"Output unsuccessfully created: {path}"
            logger.error(f"Command failed with return code: {error_msg}")
            raise RuntimeError(f"Task failed: {error_msg}")
        
        logger.info(f"Command output: {stdout.decode()}")
        if stderr:
            logger.warning(f"Command stderr: {stderr.decode()}")
            
        return {
            'command': cmd_str,
            'component': component,
            'name': name,
            'returncode': proc.returncode
        }

    async def run_climate_workflow(pipeline_id):
        """Run the complete climate workflow"""
        logger.info(f'Starting climate workflow {pipeline_id} at {time.time()}')

        # Setup directories
        setup_directories()
        
        # Start ALL tasks in parallel (don't await yet)
        total_future_lws_lslr = total_task('lws','lslr')
        total_future_lws_gslr = total_task('lws','gslr')
        total_future_sterodynamics_lslr = total_task('sterodynamics','lslr')
        total_future_sterodynamics_gslr = total_task('sterodynamics','gslr')
        total_future_all_lslr = total_task('all','lslr')
        total_future_all_gslr = total_task('all','gslr')

        results = None
        try:
            results = await asyncio.wait_for(
                asyncio.gather(
                    total_future_lws_lslr,
                    total_future_lws_gslr,
                    total_future_sterodynamics_lslr,
                    total_future_sterodynamics_gslr,
                    total_future_all_lslr,
                    total_future_all_gslr,
                    return_exceptions=True
                ),
                timeout=60  # 1 minute timeout
                # timeout=300  # 5 minute timeout
            )
            return results
        except asyncio.TimeoutError:
            logger.info("Tasks terminated after 1 minutes, but all outputs are available")
            print(results)

        logger.info(f'ALL TOTAL tasks completed for pipeline {pipeline_id}')
        logger.info(f'Climate workflow {pipeline_id} finished at {time.time()}')
        return results
        
    # Run workflow(s)
    results = await run_climate_workflow(1)
    logger.info("=========All workflows completed successfully=========")
    logger.info(results)

# Just call it with await in Jupyter
await main()

[90m2026-02-11 13:28:36.576[0m │ [94mINFO[0m │ [38;5;165m[root][0m │ Logger configured successfully - Console: DEBUG, File: disabled (N/A), Structured: disabled, Style: modern
[90m2026-02-11 13:28:36.576[0m │ [94mINFO[0m │ [38;5;165m[execution.backend(concurrent)][0m │ ThreadPoolExecutor execution backend started successfully
[90m2026-02-11 13:28:36.577[0m │ [96mDEBUG[0m │ [38;5;165m[workflow_manager][0m │ Registered signal handler for SIGHUP
[90m2026-02-11 13:28:36.577[0m │ [96mDEBUG[0m │ [38;5;165m[workflow_manager][0m │ Registered signal handler for SIGTERM
[90m2026-02-11 13:28:36.578[0m │ [96mDEBUG[0m │ [38;5;165m[workflow_manager][0m │ Registered signal handler for SIGINT
[90m2026-02-11 13:28:36.578[0m │ [96mDEBUG[0m │ [38;5;165m[workflow_manager][0m │ Started run component
2026-02-11 13:28:36 - __main__ - INFO - Starting climate workflow 1 at 1770834516.5787423
[90m2026-02-11 13:28:36.578[0m │ [94mINFO[0m │ [38;5;165m[main][0m │ Starting 