In [1]:
import time
import json
import datetime
import globus_sdk

from globus_sdk import TimerJob
from globus_compute_sdk import Executor
from globus_sdk.experimental.globus_app import UserApp

from globus_sdk.utils import slash_join

In [2]:
CLIENT_ID = "c781864e-a9c9-482e-8db8-d58ac5962a86"
my_app = UserApp("crocus-user-app", client_id=CLIENT_ID)

flows_client = globus_sdk.FlowsClient(app=my_app)

In [3]:
compute_endpoint = "04551444-f53f-4629-ab58-fdfcad88ce93"

In [4]:
wxt_function = "b4930dc7-6c35-4cdc-910d-82765073b706"
aqt_function = "1c4aea51-52a0-4235-a5d2-cff872abc001"

In [5]:
gce = Executor(endpoint_id=compute_endpoint)

## Test the WXT and AQT Plugins

In [6]:
# Prepare payload for ESGF ingest-wxt
wxt_data = {
    "ndays": 1,
    "y": 2024,
    "m": 8,
    "d": 2,
    "site": 'NEIU',
    "hours": 1,
    "odir": "/Users/mgrover/git_repos/esgf-crocus-globus-flows/data/NEIU/wxt"
}

# Start the task
future = gce.submit_to_registered_function(wxt_function, kwargs=wxt_data)

In [7]:
# Wait and print the result
result = future.result()
print(result)

['/Users/mgrover/git_repos/esgf-crocus-globus-flows/data/NEIU/wxt/crocus-NEIU-wxt-a1_20240802_000000.nc']


In [8]:
# Prepare payload for ESGF ingest-aqt
aqt_data = {
    "ndays": 1,
    "y": 2024,
    "m": 8,
    "d": 2,
    "site": 'NEIU',
    "hours": 1,
    "odir": "/Users/mgrover/git_repos/esgf-crocus-globus-flows/data/NEIU/aqt"
}

# Start the task
future = gce.submit_to_registered_function(aqt_function, kwargs=aqt_data)

# Wait and print the result
result = future.result()
print(result)

['/Users/mgrover/git_repos/esgf-crocus-globus-flows/data/NEIU/aqt/crocus-NEIU-aqt-a1-20240802-000000.nc']


In [9]:
flow_definition = {
    "Comment": "A Node-Level Processing Workflow for WXT and AQT",
    "StartAt": "TransferInput",
    "States": {
        "TransferInput": {
            "Comment": "Transfer input data",
            "Type": "Action",
            "ActionUrl": "https://transfer.actions.globus.org/transfer",
            "Parameters": {
                "source_endpoint.$": "$.input.source.id",
                "destination_endpoint.$": "$.input.destination.id",
                "DATA": [
                    {
                        "source_path.$": "$.input.source.path",
                        "destination_path.$": "$.input.destination.path",
                        "recursive": True,
                    }
                ]
            },
            "ResultPath": "$.TransferFiles",
            "WaitTime": 300,
            "Next": "ProcessWXT"
        },
        "ProcessWXT": {
            "Comment": "Collect WXT data from Sage",
            "Type": "Action",
            "ActionUrl": "https://compute.actions.globus.org/",
            "Parameters": {
                "endpoint.$": "$.input.compute_endpoint",
                "function.$": "$.input.wxt_function",
                "kwargs.$": "$.input.wxt_kwargs"
            },
            "ResultPath": "$.WXT_output",
            "WaitTime": 600,
            "Next": "ProcessAQT"
        },
        "ProcessAQT": {
            "Comment": "Collect WXT data from Sage",
            "Type": "Action",
            "ActionUrl": "https://compute.actions.globus.org/",
            "Parameters": {
                "endpoint.$": "$.input.compute_endpoint",
                "function.$": "$.input.aqt_function",
                "kwargs.$": "$.input.aqt_kwargs"
            },
            "ResultPath": "$.AQT_output",
            "WaitTime": 600,
            "End": True
        },
    }
}

In [10]:
flow = flows_client.create_flow(title="CROCUS Flow", definition=flow_definition, input_schema={})
# flow = flows_client.update_flow(flow_id=flow_id, title="CROCUS Flow", definition=flow_definition, input_schema={})

In [11]:
flow_input = {
    "input": {
        "source": {
            "id": "03e6a23b-fb93-11ef-985b-0207be7ee3a1",
            "path": "/Users/mgrover/git_repos/esgf-crocus-globus-flows/data"
        },
        "destination": {
            "id": "7917c286-d1e5-40be-b33b-edd29f3a1448",
            "path": "/projects/crocus/test-crocus-flows/"
        },
        "compute_endpoint": compute_endpoint,
        "wxt_kwargs": wxt_data,
        "wxt_function": wxt_function,
        "aqt_kwargs": aqt_data,
        "aqt_function": aqt_function,
    }
}

In [12]:
flow_id = flow['id']
flow_id

'874b34ae-2472-4792-96f9-00b7c0f7c858'

In [13]:
specific_flow_client = globus_sdk.SpecificFlowClient(
    flow_id=flow_id,
    app=my_app,
)

In [14]:
run = specific_flow_client.run_flow(
  body=flow_input,
  label="CROCUS Example",
  tags=['CROCUS', 'example']
)


Please authenticate with Globus here:
-------------------------------------
https://auth.globus.org/v2/oauth2/authorize?client_id=c781864e-a9c9-482e-8db8-d58ac5962a86&redirect_uri=https%3A%2F%2Fauth.globus.org%2Fv2%2Fweb%2Fauth-code&scope=https%3A%2F%2Fauth.globus.org%2Fscopes%2Feec9b274-0c81-4334-bdc2-54e90e689b9a%2Fall+openid+https%3A%2F%2Fauth.globus.org%2Fscopes%2F874b34ae-2472-4792-96f9-00b7c0f7c858%2Fflow_874b34ae_2472_4792_96f9_00b7c0f7c858_user&state=_default&response_type=code&code_challenge=Ug0x4YDFB-JJUTxWfSq2biiowxXI4kuztMwa9H-FkiM&code_challenge_method=S256&access_type=online&prefill_named_grant=crocus-user-app+on+anlvpn090.evs.anl.gov
-------------------------------------



Enter the resulting Authorization Code here:  0iF0RSxSReiidPsMpYGyfORbv5MXf5


In [15]:
# Get run details
# run = flows_client.get_run(run_id)

run_id = run['run_id']
run_status = run['status']
print("This flow can be monitored in the Web App:")
print(f"https://app.globus.org/runs/{run_id}")
print(f"Flow run started with ID: {run_id} - Status: {run_status}")

# Poll the Flow ser/vice to check on the status of the flow
while run_status == 'ACTIVE':
    time.sleep(5)
    run = flows_client.get_run(run_id)
    run_status = run['status']
    print(f'Run status: {run_status}')
    
# Run completed
print(json.dumps(run.data, indent=2))

This flow can be monitored in the Web App:
https://app.globus.org/runs/9cb1fab0-3744-4fa6-88f2-ae582c4e7468
Flow run started with ID: 9cb1fab0-3744-4fa6-88f2-ae582c4e7468 - Status: ACTIVE
Run status: INACTIVE
{
  "run_id": "9cb1fab0-3744-4fa6-88f2-ae582c4e7468",
  "action_id": "9cb1fab0-3744-4fa6-88f2-ae582c4e7468",
  "flow_id": "874b34ae-2472-4792-96f9-00b7c0f7c858",
  "flow_title": "CROCUS Flow",
  "flow_last_updated": "2025-03-12T19:43:20.728549+00:00",
  "start_time": "2025-03-12T19:44:48.030353+00:00",
  "status": "INACTIVE",
  "display_status": "INACTIVE",
  "details": {
    "code": "ConsentRequired",
    "state_name": "TransferInput",
    "description": "Flow run requires user intervention to proceed. For state 'TransferInput': Missing required data_access consent",
    "required_scope": "https://auth.globus.org/scopes/874b34ae-2472-4792-96f9-00b7c0f7c858/flow_874b34ae_2472_4792_96f9_00b7c0f7c858_user[*urn:globus:auth:scope:transfer.api.globus.org:all[*https://auth.globus.org/sc

# Add an input schema

This gives us the "Start" button on the flows GUI.

In [16]:
input_schema = {
    "required": [
        "input"
    ],
    "properties": {
        "input": {
            "type": "object",
            "required": [
                "source",
                "destination",
                "compute_endpoint",
                "wxt_function",
                "wxt_kwargs",
                "aqt_function",
                "aqt_kwargs"
            ],
            "properties": {
                "source": {
                    "type": "object",
                    "title": "Select source collection and path",
                    "description": "The source collection and path (path MUST end with a slash)",
                    "format": "globus-collection",
                    "required": [
                        "id",
                        "path"
                    ],
                    "properties": {
                        "id": {
                            "type": "string",
                            "format": "uuid"
                        },
                        "path": {
                            "type": "string"
                        }
                    },
                    "additionalProperties": False
                },
                "destination": {
                    "type": "object",
                    "title": "Select destination collection and path",
                    "description": "The destination collection and path (path MUST end with a slash); default collection is 'Globus Tutorials on ALCF Eagle'",
                    "format": "globus-collection",
                    "required": [
                        "id",
                        "path"
                    ],
                    "properties": {
                        "id": {
                            "type": "string",
                            "format": "uuid"
                        },
                        "path": {
                            "type": "string"
                        }
                    },
                    "additionalProperties": False
                },
                "compute_endpoint": {
                    "type": "string",
                    "format": "uuid",
                    "default": compute_endpoint,
                    "title": "Globus Compute Endpoint ID",
                    "description": "The UUID of the Globus Compute endpoint where the function will run"
                },
                "wxt_function": {
                    "type": "string",
                    "format": "uuid",
                    "default": wxt_function,
                    "title": "Globus Compute Function ID",
                    "description": "The UUID of the function to invoke; must be registered with the Globus Compute service"
                },
                "wxt_kwargs": {
                    "type": "object",
                    "title": "Function Inputs",
                    "description": "Inputs to pass to the function",
                    "properties":  {
                        "ndays": {
                            "type": "integer",
                            "default": 1
                        },
                        "y": {
                            "type": "integer",
                            "default": 2024
                        },
                        "m": {
                            "type": "integer",
                            "default": 8
                        },
                        "d": {
                            "type": "integer",
                            "default": 1
                        },
                        "site": {
                            "type": "string",
                            "default": "NU"
                        },
                        "hours": {
                            "type": "integer",
                            "default": 1
                        },
                        "odir": {
                            "type": "string",
                            "default": "/home/rchard/src/CROCUS/output/"
                        },
                    },
                    "additionalProperties": False
                },
                "aqt_function": {
                    "type": "string",
                    "format": "uuid",
                    "default": aqt_function,
                    "title": "Globus Compute Function ID",
                    "description": "The UUID of the function to invoke; must be registered with the Globus Compute service"
                },
                "aqt_kwargs": {
                    "type": "object",
                    "title": "Function Inputs",
                    "description": "Inputs to pass to the function",
                    "properties":  {
                        "ndays": {
                            "type": "integer",
                            "default": 1
                        },
                        "y": {
                            "type": "integer",
                            "default": 2024
                        },
                        "m": {
                            "type": "integer",
                            "default": 8
                        },
                        "d": {
                            "type": "integer",
                            "default": 1
                        },
                        "site": {
                            "type": "string",
                            "default": "NU"
                        },
                        "hours": {
                            "type": "integer",
                            "default": 1
                        },
                        "odir": {
                            "type": "string",
                            "default": "/home/rchard/src/CROCUS/output/"
                        },
                    },
                    "additionalProperties": False
                }
            },
            "additionalProperties": False
        }
    },
    "additionalProperties": False
}

In [17]:
flow = flows_client.update_flow(flow_id=flow_id, title="CROCUS Flow", definition=flow_definition, input_schema=input_schema)

# Configure a Timer

This will automate the invocation of the flow each day.

In [29]:
from globus_sdk.scopes import TimerScopes, FlowsScopes
from globus_sdk import TimerClient

In [30]:
flow_scope = specific_flow_client.scopes.user
end_scope = f"{TimerScopes.timer}[{flow_scope}]"

timer_client = TimerClient(app=my_app, app_scopes=end_scope)

In [31]:
callback_url = slash_join(specific_flow_client.base_url, f"/flows/{flow_id}/run")

Remove the Y/M/D fields so the flow runs for the current date.

In [25]:
timer_input = {
    'input': {
        'source': 
        {
            'id': '6c54cade-bde5-45c1-bdea-f4bd71dba2cc',
            'path': '/home/share/godata/'
        },
        'destination': {
            'id': '31ce9ba0-176d-45a5-add3-f37d233ba47d',
            'path': '/~/test/'
        },
        'compute_endpoint': compute_endpoint,
        'wxt_kwargs': {
            'ndays': 1,
            'site': 'NU',
            'hours': 1,
            'odir': '/home/rchard/src/CROCUS/output/'
        },
        'wxt_function': wxt_function,
        'aqt_kwargs': {
            'ndays': 1,
            'site': 'NU',
            'hours': 1,
            'odir': '/home/rchard/src/CROCUS/output/'
        },
        'aqt_function': aqt_function,
    }
}

In [None]:
timer = TimerJob(
    callback_url=callback_url,
    callback_body={"body": timer_input, "label": "CROCUS Timer Flow"},
    start=datetime.datetime.utcnow(),
    interval=datetime.timedelta(seconds=300),
    scope=flow_scope,
    name="CROCUS Flow Timer",
)

In [None]:
response = timer_client.create_job(timer)

In [None]:
response

In [None]:
timer_client.get_job(response.get('job_id')).data

## Delete the timer

In [None]:
timer_client.delete_job(response.get('job_id'))

## Run the Timer for All Sites

In [32]:
def create_timer_for_site(site):
    timer_input = {
    'input': {
        'source': 
        {
            'id': '6c54cade-bde5-45c1-bdea-f4bd71dba2cc',
            'path': '/home/share/godata/'
        },
        'destination': {
            'id': '31ce9ba0-176d-45a5-add3-f37d233ba47d',
            'path': '/~/test/'
        },
        'compute_endpoint': compute_endpoint,
        'wxt_kwargs': {
            'ndays': 1,
            'site': site,
            'hours': 1,
            'odir': '/home/rchard/src/CROCUS/output/'
        },
        'wxt_function': wxt_function,
        'aqt_kwargs': {
            'ndays': 1,
            'site': site,
            'hours': 1,
            'odir': '/home/rchard/src/CROCUS/output/'
        },
        'aqt_function': aqt_function,
    }
    }
    return timer_input

In [33]:
global_sites = ['NU',
                'CSU',
                'NEIU',
                'ATMOS',
                'UIC',
                'NEIU_CCICS',
                "BIG",
                'HUM',
                "DOWN",
                "SHEDD"]

In [34]:
jobs = []
for site in global_sites:
    flow_input = create_timer_for_site(site)
    timer = TimerJob(
        callback_url=callback_url,
        callback_body={"body": flow_input, "label": f"CROCUS Timer Flow {site}"},
        start=datetime.datetime.utcnow(),
        interval=datetime.timedelta(seconds=300),
        scope=flow_scope,
        name=f"CROCUS Flow Timer {site}",
    )
    response = timer_client.create_job(timer)
    job_id = timer_client.get_job(response.get('job_id')).data
    print(job_id)
    jobs.append(job_id)


Please authenticate with Globus here:
-------------------------------------
https://auth.globus.org/v2/oauth2/authorize?client_id=c781864e-a9c9-482e-8db8-d58ac5962a86&redirect_uri=https%3A%2F%2Fauth.globus.org%2Fv2%2Fweb%2Fauth-code&scope=https%3A%2F%2Fauth.globus.org%2Fscopes%2F524230d7-ea86-4a52-8312-86065a9e0417%2Ftimer%5Bhttps%3A%2F%2Fauth.globus.org%2Fscopes%2F874b34ae-2472-4792-96f9-00b7c0f7c858%2Fflow_874b34ae_2472_4792_96f9_00b7c0f7c858_user%5D+https%3A%2F%2Fauth.globus.org%2Fscopes%2Feec9b274-0c81-4334-bdc2-54e90e689b9a%2Fall+openid+https%3A%2F%2Fauth.globus.org%2Fscopes%2F874b34ae-2472-4792-96f9-00b7c0f7c858%2Fflow_874b34ae_2472_4792_96f9_00b7c0f7c858_user&state=_default&response_type=code&code_challenge=Q1SwO1r7Wpyw9XnkSpfJxnsZmjJIlYfGpfWCY_M1kes&code_challenge_method=S256&access_type=online&prefill_named_grant=crocus-user-app+on+anlvpn090.evs.anl.gov
-------------------------------------



Enter the resulting Authorization Code here:  vKh5q8Qnw0FmZk7NtpEUEkgsMeDoKR


{'name': 'CROCUS Flow Timer NU', 'stop_after': None, 'interval': 300.0, 'scope': 'https://auth.globus.org/scopes/874b34ae-2472-4792-96f9-00b7c0f7c858/flow_874b34ae_2472_4792_96f9_00b7c0f7c858_user', 'callback_url': 'https://flows.automate.globus.org/flows/874b34ae-2472-4792-96f9-00b7c0f7c858/run', 'callback_body': {'body': {'input': {'source': {'id': '6c54cade-bde5-45c1-bdea-f4bd71dba2cc', 'path': '/home/share/godata/'}, 'destination': {'id': '31ce9ba0-176d-45a5-add3-f37d233ba47d', 'path': '/~/test/'}, 'compute_endpoint': '04551444-f53f-4629-ab58-fdfcad88ce93', 'wxt_kwargs': {'ndays': 1, 'site': 'NU', 'hours': 1, 'odir': '/home/rchard/src/CROCUS/output/'}, 'wxt_function': 'b4930dc7-6c35-4cdc-910d-82765073b706', 'aqt_kwargs': {'ndays': 1, 'site': 'NU', 'hours': 1, 'odir': '/home/rchard/src/CROCUS/output/'}, 'aqt_function': '1c4aea51-52a0-4235-a5d2-cff872abc001'}}, 'label': 'CROCUS Timer Flow NU'}, 'start': '2025-03-12T20:04:07+00:00', 'inactive_reason': None, 'job_id': 'd6f1f5c9-18c9-4c

TimersAPIError: ('POST', 'https://timer.automate.globus.org/jobs/', 'Bearer', 400, 'JOB_RATE_LIMIT', 'new job would exceed limit of 1 average job execution(s) per minute; remove jobs with shorter intervals/more frequent execution to free up allocation')