In [None]:
import time
import yaml
import os
from collections import deque

from apwf.tools import fx_comp_helper, fx_daq_helper, executor_helper 
from apwf.flows import automate_helper

from funcx.sdk.client import FuncXClient
from globus_automate_client import create_flows_client


configfile = f"{os.getcwd()}/ptycho_wf_config.yml"
wf_config= yaml.safe_load(open(configfile))

apwfLogger = executor_helper.setup_logger(wf_config)

In [None]:
# Globus Online Endpoints
src_endpoint = wf_config['globus_endpoints']['aps_mona4_gcp']
dest_endpoint = wf_config['globus_endpoints']['alcf_theta']

# FuncX endpoint at ThetaGPU (ALCF) and Prisma (APS)
compute_fx_endpoint = wf_config['funcx_endpoints']['alcf_polaris_gpu']
daq_fx_endpoint = wf_config['funcx_endpoints']['aps_mona4_daq']

print(compute_fx_endpoint)

In [None]:
fxc = FuncXClient()

In [None]:
# Register setup functions to funcx service
daq_wf_fxid = fxc.register_function(fx_daq_helper.daq_wf_prepare)
comp_wf_fxid = fxc.register_function(fx_comp_helper.comp_wf_prepare)

## Prepare workflow environment ##
# Executor
res = executor_helper.executor_wf_prepare(wf_config['executor']['activated_iids_file_path'],
                                          wf_config['executor']['log_folder_path'],
                                          create_activated_iid_file=True, create_log_folder=True, 
                                          cleanup_activated_iid_file=True, cleanup_log_folder=False)
if res != 0: apwfLogger.error(f"Unable to setup executor folders/files: {res}")
else: apwfLogger.info("Executor folders/files are ready.")

# Data acquisition
rid = fxc.run(f"{wf_config['flow']['source']['root_folder_path']}/{wf_config['flow']['source']['input_folder_prefix']}",
              f"{wf_config['flow']['source']['root_folder_path']}/{wf_config['flow']['source']['input_position_folder_prefix']}",
              f"{wf_config['flow']['source']['root_folder_path']}/{wf_config['flow']['source']['output_folder_prefix']}",
              probe_file_path=None, create_output_folder=True, cleanup_output_folder=False,
              endpoint_id=daq_fx_endpoint, function_id=daq_wf_fxid)
res = automate_helper.fx_get_result(rid, fxc)
if res != 0: apwfLogger.error(f"Unable to setup data acquisition machine folders/files: {res}")
else: apwfLogger.info("Data acquisition machine's folders/files are ready.")

# Compute machine
rid = fxc.run(f"{wf_config['flow']['destination']['root_folder_path']}/{wf_config['flow']['destination']['input_folder_prefix']}",
              f"{wf_config['flow']['destination']['root_folder_path']}/{wf_config['flow']['destination']['output_folder_prefix']}",
              f"{wf_config['flow']['task']['executable']['params']['log_folder_path']}",
              create_input_folder=True, create_output_folder=True, create_log_folder=True, 
              cleanup_input_folder=True, cleanup_output_folder=True, cleanup_log_folder=True,
              endpoint_id=compute_fx_endpoint, function_id=comp_wf_fxid)
res = automate_helper.fx_get_result(rid, fxc)
if res != 0: apwfLogger.error(f"Unable to setup compute machine folders/files: {res}")
else: apwfLogger.info("Compute machines' folders/files are ready.")


In [None]:
# FuncX function definition for remote Polaris GPU reconstruction call
func_ptycho_uuid = fxc.register_function(fx_comp_helper.ptycho_func)
#func_ptycho_uuid = fxc.register_function(fx_comp_helper.ptycho_funcx_gpu_pin)
print(func_ptycho_uuid)

In [None]:
# FuncX function definition for remote DAQ machine directory calls
fx_func_get_file_paths_uuid = fxc.register_function(fx_daq_helper.get_file_paths)
fx_func_get_folder_paths_uuid = fxc.register_function(fx_daq_helper.get_folder_paths)
print(fx_func_get_file_paths_uuid)
print(fx_func_get_folder_paths_uuid)

In [None]:
# Globus Automate flow definition
flow_definition = automate_helper.ptycho_flow_definition

# Create a flow with the above flow definition
flows_client = create_flows_client()
flow = flows_client.deploy_flow(flow_definition, input_schema={}, title=wf_config['flow']['title'], keywords=wf_config['flow']['keywords'])
flow_id = flow['id']
flow_scope = flow['globus_auth_scope']

In [None]:
src_config = wf_config['flow']['source']
dest_config = wf_config['flow']['destination']
task_config = wf_config['flow']['task']
exec_params_config = task_config['executable']['params']

wfc = executor_helper.EConfigs(src_config, dest_config, wf_config['flow']['only_larger_than_scanid'])

while True:
    # Get the remote folders at source input folder
    src_input_folder_paths_regex = f"{wfc.src_wf_root_path}/{wfc.src_input_folder_prefix}/*"
    rid = fxc.run(src_input_folder_paths_regex,
                  endpoint_id=daq_fx_endpoint,
                  function_id=fx_func_get_folder_paths_uuid)
    src_input_folder_paths = automate_helper.fx_get_result(rid, fxc, delay=wf_config['funcx_req_delay'])

    # Get the remote folders at source output folder
    src_output_folder_paths_regex = f"{wfc.src_wf_root_path}/{wfc.src_output_folder_prefix}/*"
    rid = fxc.run(src_output_folder_paths_regex,
                  endpoint_id=daq_fx_endpoint,
                  function_id=fx_func_get_folder_paths_uuid)
    ex_src_output_folder_paths = automate_helper.fx_get_result(rid, fxc, delay=wf_config['funcx_req_delay'])

    # Get the activated iids and find the iids that need to be processed
    activated_iids = executor_helper.read_activated_iids(wf_config['executor']['activated_iids_file_path'])
    diff_iids = executor_helper.get_unprocessed_iids(activated_iids, src_input_folder_paths, ex_src_output_folder_paths, wfc.add_larger_than)
    
    # If there is no iid/scan to be processed sleep, and then re-check 
    if(len(diff_iids)==0):
        apwfLogger.debug(f"There is no folder to be processed, "
                      f"sleeping {wf_config['data_generation_delay']} secs.")
        time.sleep(wf_config['data_generation_delay'])
        continue
    
    # There are scans to be processed, generate paths
    paths = executor_helper.WFPaths(wfc, src_input_folder_paths, diff_iids).plist
     
    # Generate flow inputs according to the paths and other configuration info.
    flow_inputs = executor_helper.WFFlowsInputs(src_endpoint, dest_endpoint, compute_fx_endpoint, 
                                            func_ptycho_uuid, task_config, exec_params_config, 
                                            wf_config['flow']['sample_name'], paths).filist

    ### Initiate/dispatch Flows ###
    q1 = deque()
    for i in range(len(flow_inputs)):
        label = str(flow_inputs[i]['iid']) + '-' + "-".join(wf_config['flow']['keywords'])
        flow_action = flows_client.run_flow(flow_id, flow_scope, flow_inputs[i], label=label)
        q1.append(flow_action)
        executor_helper.append_activated_iids([flow_inputs[i]['iid']], wf_config['executor']['activated_iids_file_path'])
        apwfLogger.info(f"{i}: Flow {flow_inputs[i]['iid']} initiated and added to q1: {flow_action['action_id']}")

In [None]:
# Cancel all flows
for flow_action in q1:
    flows_client.flow_action_cancel(flow_id, flow_scope, flow_action['action_id'])