# funcX Tutorial

funcX is a Function-as-a-Service (FaaS) platform for science that enables you to convert almost any computing resource into a high-performance function serving device. Deploying a funcX endpoint will integrate your resource into the function serving fabric, allowing you to dynamically send, monitor, and receive results from function invocations. funcX is built on top of Parsl, allowing you to connect your endpoint to large compute resources via traditional batch queues, where funcX will dynamically provision, use, and release resources on-demand to fulfill function requests.

Here we provide an example of using funcX to register a function and run it on a publicly available tutorial endpoint.

We start by creating a funcX client to interact with the service.

In [1]:
from funcx.sdk.client import FuncXClient

fxc = FuncXClient()

Here we define the tutorial endpoint to be used in this demonstration. Because the tutorial endpoint is Kubernetes-based, we select a simple python3.6 container that will be used during execution.

In [2]:
def funcx_sum(items):
    return sum(items)

In [3]:
func_uuid = fxc.register_function(funcx_sum,
                                  description="A sum function")
print(func_uuid)

b62ef266-e8fa-4c9c-8983-1bffe59fb8c7


In [19]:
payload = [1, 2, 3, 4, 100]

endpoint_uuid = '84457364-560e-40f8-8161-1c7321733eec'

res = fxc.run(payload, endpoint_id=endpoint_uuid, function_id=func_uuid)
print(res)

86caa222-92fe-4130-9b71-91f292d1f90c


In [None]:
fxc.get_task_status(res)

In [20]:
fxc.get_result(res)

Exception: Task pending

In [None]:
def dlhub_test(data):
    import sys, time, os
    sys.path.append("/app")
    os.chdir("/app")
    sys.stdout = open(os.devnull, "w")
    sys.stderr = open(os.devnull, "w")
    os.environ["PYTHONWARNINGS"]= "ignore::DeprecationWarning"
    start = time.time()
    global shim
    if 'shim' not in globals():
        import dlhub_shim
        shim = dlhub_shim
    res = []
    for d in data:
        x = shim.run(d)
        res.append(x)
    end = time.time()
    return (res, (end - start) * 1000)

In [None]:
func_uuid = fxc.register_function(dlhub_test,
                                  description="A sum function")
print(func_uuid)

In [None]:
data = [{'composition_object': 'gANjcHltYXRnZW4uY29yZS5jb21wb3NpdGlvbgpDb21wb3NpdGlvbgpxACmBcQF9cQIoWA4AAABh\nbGxvd19uZWdhdGl2ZXEDiVgHAAAAX25hdG9tc3EER0AUAAAAAAAAWAUAAABfZGF0YXEFfXEGKGNw\neW1hdGdlbi5jb3JlLnBlcmlvZGljX3RhYmxlCkVsZW1lbnQKcQdYAgAAAEFscQiFcQlScQpHQAAA\nAAAAAABoB1gBAAAAT3ELhXEMUnENR0AIAAAAAAAAdXViLg==\n'}]
payload = [data for i in range(10)]

endpoint_uuid = '18db8ad9-ab50-4ab6-b01d-655b4b5d97f3'

res = fxc.run(payload, endpoint_id=endpoint_uuid, function_id=func_uuid)
print(res)

In [None]:
"import time
start = time.time()
while time.time() - start <= 300:
    a = fxc.get_task_status(res)
    if 'status' in a:
        continue
    elif 'result' in a:
        res = a['result']
        failed = False
        break
    elif 'exception' in a:
        res = a['exception']
        failed = True
    time.sleep(1)
    
from funcx.serialize import FuncXSerializer
fxs = FuncXSerializer()
fxs.deserialize(res)"

In [None]:
fxs.deserialize(res)[0]

In [None]:
from funcx.serialize import FuncXSerializer
fxs = FuncXSerializer()
a = b'105\n04\ngANYCQAAAGZ1bmN4X3N1bXEAWCwAAABkZWYgZnVuY3hfc3VtKGl0ZW1zKToKICAgIHJldHVybiBz\ndW0oaXRlbXMpCnEBhnECLg==\n32\n01\ngANdcQAoSwFLAksDSwRLCmWFcQEu\n12\n01\ngAN9cQAu\n'
a = a.decode()
fxs.unpack_and_deserialize(a)