In [172]:
from funcx.sdk.client import FuncXClient
import time


fxc = FuncXClient()
fxc.throttling_enabled = False
print(fxc.base_url, fxc.throttling_enabled)

https://funcx.org/api/v1 False


# Functions to work with endpoints

In [188]:
endpoints = ['709118de-1103-463f-8425-281eb93b55ff',   #Theta covid19
             '67e95158-8bda-4b1f-a0ef-31a1626eba00',   #Ryan ep
             'a59434ad-9a25-4378-9682-b5110e4eaa48',   #Ryan ep Theta queue (5 nodes)
            ]

In [189]:
def get_endpoints_status(endpoints):
    endpoint_status = {}
    idle_workers = {}
    for ep in endpoints:
        endpoint_status[ep] = fxc.get_endpoint_status(ep)[0]
        idle_workers[ep] = endpoint_status[ep]['idle_workers']
    return endpoint_status, idle_workers

# fx functions

In [214]:
def warm_endpoint(sleep_time=10):
    """A funcX function to warm the endpoint 
    and create workers"""
    import time
    time.sleep(sleep_time)
    return 1

In [215]:
funcx_warm = fxc.register_function(warm_endpoint,
                                   description="A funcx warming function for covid19")

print(funcx_warm)

337b60b3-1ea5-4fec-b261-fa75487faa08


In [204]:
def funcx_runner(index, filename, batchsize, index_start, index_end, workdir=None, out_file=None):
    """funcX function to perform work"""
    from candle_apps.candle_node_local import funcx_node_local
    return funcx_node_local(filename, index, batchsize, index_start, index_end,
                            workdir=workdir, out_file=out_file)

In [205]:
func_uuid2 = fxc.register_function(funcx_runner,
                                   description="A funcx function for covid19")

print(func_uuid2)

0d3379e3-3bd4-4876-8311-970fd0223bf1


# Gerate workloads

In [206]:
def generate_batch(filename, start=0, batchsize=10, max_batches=10):
    counter = 0
    if max_batches == 0:
        max_batches = 999999999

    x = 'Hello'
    batch_index = []
    with open(filename) as current:
        batch_index.append(current.tell())
        counter += 1

        while x and counter < max_batches:
            counter += 1
            for i in range(batchsize):
                x = current.readline()

            batch_index.append(current.tell())
        return batch_index

In [207]:
func_uuid1 = fxc.register_function(generate_batch,
                                   description="A funcx function for generating batch")

print(func_uuid1)

d76e4283-d42d-4570-ac79-761ef1ea5c17


In [264]:
res1 = fxc.run(filename='/home/rchard/src/covid19/ScreenPilot/ena+db.can',
               endpoint_id='67e95158-8bda-4b1f-a0ef-31a1626eba00',
               function_id=func_uuid1)

print(res1)

d6ce0787-91c9-4cb0-8ddf-e64f10a9ba8f


In [265]:
batch_index = fxc.get_result(res1)
print(batch_index)

[0, 2321, 3153, 3480, 4006, 4429, 4950, 5357, 5994, 6402]


# Distribute work

Define configs for the two eps

In [266]:
configs = {'709118de-1103-463f-8425-281eb93b55ff': {'filename': '/home/zzli/candle/ScreenPilot/ena+db.can',
                                                    'workdir': '/tmp/zzli/',
                                                    'out_path': '/home/zzli/candle/ScreenPilot/'},
           '67e95158-8bda-4b1f-a0ef-31a1626eba00': {'filename': '/home/rchard/src/covid19/ScreenPilot/ena+db.can',
                                                    'workdir': '/tmp/rchard/',
                                                    'out_path': '/home/rchard/src/covid19/ScreenPilot/'},
           'a59434ad-9a25-4378-9682-b5110e4eaa48': {'filename': '/home/rchard/src/covid19/ScreenPilot/ena+db.can',
                                                    'workdir': '/tmp/rchard/',
                                                    'out_path': '/home/rchard/src/covid19/ScreenPilot/'},
          }

Submit workloads

In [267]:
def submit_job(endpoint_uuid, idle_workers, idx):
    exec_config = configs[endpoint_uuid]
    res = fxc.run(filename=exec_config['filename'],
              index=idx,
              batchsize=10,
              index_start=0,
              index_end=1000,
              workdir=exec_config['workdir'],
              out_file=f'{exec_config["out_path"]}data-{idx}',
              endpoint_id=endpoint_uuid,
              function_id=func_uuid2)
    return res

In [268]:
# stats, idle = get_endpoints_status(endpoints)

In [None]:
def warm_ep(endpoint_uuid, stats):
    """Check if current workers < max workers. 
    If so, launch warming tasks.
    
    Note: this assumes one worker per block.
    """
    cur_nodes = stats[endpoint_uuid]['managers']
    max_nodes = stats[endpoint_uuid]['max_blocks']
    outstanding_tasks = stats[endpoint_uuid]['outstanding_tasks']['RAW']
    
    diff_nodes = max_nodes - cur_nodes
    warming_jobs = outstanding_tasks - cur_nodes
    # Deal with the case of no outstanding tasks    
    if outstanding_tasks == 0:
        warming_jobs = 0
    to_warm = max_nodes - (cur_nodes + warming_jobs)
    
    print(f'max: {max_nodes}, cur: {cur_nodes}, warming: {warming_jobs}, to_warm {to_warm}')
    if to_warm > 0:
        for x in range(0, (to_warm)):
            print(f"Sending warming function to {endpoint_uuid}")
            res1 = fxc.run(sleep_time=10,
                           endpoint_id=endpoint_uuid,
                           function_id=funcx_warm)

In [None]:
def do_work(batch_index):
    task_ids = []
    # iterate over the workloads
    while len(batch_index) > 0:
        # get idle workers
        stats, idle = get_endpoints_status(endpoints)
        # submit jobs to each ep
        for ep, idle_workers in idle.items():
            # submit a job for each idle worker
            for x in range(idle_workers):
                try:
                    idx = batch_index.pop()
                    print(f'submitting {idx} to {ep}')
                    task_ids.append(submit_job(ep, idle, idx))
                except IndexError as e:
                    print('Finished!')
                    return task_ids
            # now try warming the rest of the nodes
            warm_ep(ep, stats)
        time.sleep(60)
    return task_ids

In [None]:
task_ids = do_work(batch_index)

submitting 4950 to 709118de-1103-463f-8425-281eb93b55ff
0
max: 1, cur: 1, warming: 0, to_warm 0
submitting 4429 to 67e95158-8bda-4b1f-a0ef-31a1626eba00
submitting 4006 to 67e95158-8bda-4b1f-a0ef-31a1626eba00
0
max: 1, cur: 1, warming: 0, to_warm 0
30
max: 5, cur: 0, warming: 30, to_warm -25
submitting 3480 to 709118de-1103-463f-8425-281eb93b55ff
0
max: 1, cur: 1, warming: 0, to_warm 0
submitting 3153 to 67e95158-8bda-4b1f-a0ef-31a1626eba00
submitting 2321 to 67e95158-8bda-4b1f-a0ef-31a1626eba00
0
max: 1, cur: 1, warming: 0, to_warm 0
30
max: 5, cur: 0, warming: 30, to_warm -25


In [243]:
print(task_ids)

['57a87c06-208c-4782-b975-c03b3c665340', '2d53ab85-591a-40d4-b4ee-d4664c3db858', '28333b44-6393-421c-bac7-f876e46f8277', '1e9c2ed3-235f-4c34-96a5-c49656e7bd4c', '049198ee-e268-473c-8b60-81895c9f45c0', '262bca59-bb0d-4270-bcb8-e66f71f7b752', '3da41d12-3532-4aee-9f31-55b49d625157', '0ea8b80d-5653-4da2-88d0-5fedd4d549db', '4c7e8860-8046-485b-a494-5a1b60a083f2', 'ecc72e85-7d19-4cef-a73a-cca8426bd875']


In [187]:
x = fxc.get_batch_status(task_ids)
print(x)
try:
    print(x[res[0]]['exception'].reraise())
except:
    pass

{'3f2debb3-9337-475b-b312-c3afbc2d2ca7': {'pending': 'False', 'exception': <parsl.app.errors.RemoteExceptionWrapper object at 0x7f4d693175f8>}, 'e6ee18a4-3752-45e9-a42d-9260aa067141': {'pending': 'False', 'result': '/home/rchard/src/covid19/ScreenPilot/data-5994'}, 'd8d30d94-ac6f-4222-83bf-5e3a84db63ef': {'pending': 'False', 'result': '/home/rchard/src/covid19/ScreenPilot/data-5357'}, '1807abd5-db2a-4ab8-a62c-a71f087a840a': {'pending': 'False', 'exception': <parsl.app.errors.RemoteExceptionWrapper object at 0x7f4d69317898>}, 'ed4846b4-e2a7-467b-8a1f-03f05c39e618': {'pending': 'False', 'result': '/home/rchard/src/covid19/ScreenPilot/data-4429'}, '3510869a-9b6a-4271-8137-223b87dffb3c': {'pending': 'False', 'result': '/home/rchard/src/covid19/ScreenPilot/data-4006'}, '71ce51ec-d183-4ae7-b3fe-508de001acd9': {'pending': 'False', 'exception': <parsl.app.errors.RemoteExceptionWrapper object at 0x7f4d69322ac8>}, '81926816-0f4e-4b3e-9b4d-fa76d670f8d0': {'pending': 'False', 'result': '/home/rcha