In [52]:
import argparse
import pathlib
import os
import json
import subprocess
import uuid
import requests
from funcx.sdk.client import (FuncXClient, TaskPending)
from globus_sdk import (TransferClient, TransferAPIError, NativeAppAuthClient, TransferClient, 
                        RefreshTokenAuthorizer)
import globus_sdk

In [8]:
# UUID of Petrel
petrel_uuid = f"4f99675c-ac1f-11ea-bee8-0e716405a293"
auth_code = f"1afsU4wp0Opkkm3rL4rVTMqMVuIPp8"

In [9]:
# DO NOT RUN THIS CELL!!!

# As in the Platform_Introduction_Native_App_Auth notebook, do the Native App Grant Flow
CLIENT_ID = "3b1925c0-a87b-452b-a492-2c9921d3bd14"
SCOPES = "openid profile email urn:globus:auth:scope:transfer.api.globus.org:all urn:globus:auth:scope:auth.globus.org:view_identities"

# Create an auth client for this app
auth_client = NativeAppAuthClient(CLIENT_ID)
auth_client.oauth2_start_flow(refresh_tokens=True, requested_scopes=SCOPES)

# Authenticate the auth client with auth code
authorize_url = auth_client.oauth2_get_authorize_url()
print("Please go to this URL and login: {0}".format(authorize_url))

# Acquire tokens after providing auth code
auth_code = input("Please enter the code you get after login here: ").strip()

Please go to this URL and login: https://auth.globus.org/v2/oauth2/authorize?client_id=3b1925c0-a87b-452b-a492-2c9921d3bd14&redirect_uri=https%3A%2F%2Fauth.globus.org%2Fv2%2Fweb%2Fauth-code&scope=openid+profile+email+urn%3Aglobus%3Aauth%3Ascope%3Atransfer.api.globus.org%3Aall+urn%3Aglobus%3Aauth%3Ascope%3Aauth.globus.org%3Aview_identities&state=_default&response_type=code&code_challenge=0hBslQ5lBwNkPH6wmvxT5F-mwHBhDmu6mKwHrWz0Pu0&code_challenge_method=S256&access_type=offline
Please enter the code you get after login here: TC3UeDJgqcC3YwgOoQoMHRO5TgTO26


In [10]:
# Token Response
token_response = auth_client.oauth2_exchange_code_for_tokens(auth_code)
print(token_response)

{
  "auth.globus.org": {
    "scope": "email urn:globus:auth:scope:auth.globus.org:view_identities openid profile",
    "access_token": "AgM8OkGV3mvw9vj7PyKeg94VVd1oja8wX1rj23Np99X7mkrg6EcOCz2qqVVQ5W85qm06wOzNJ0NvQJfvydP66HyzmPUQ4kohzd8kc4wN3",
    "refresh_token": "AgwOONDVOy04YJ7VNy9Egl96vvw4rlJoEVy0qz99jEJOxNQpjDsJU3eN4MJemPp5rrdEynqXeQJY32BoWX01Qe14l2Or",
    "token_type": "Bearer",
    "expires_at_seconds": 1641579693,
    "resource_server": "auth.globus.org"
  },
  "transfer.api.globus.org": {
    "scope": "urn:globus:auth:scope:transfer.api.globus.org:all",
    "access_token": "AgjPbGPPdj5oxmkrml6Woa4m6Ep8226azzYJ1eEMYNJaVjOKG3u9C49Q88d9dOo6EWk4p7P64NJQ1mfpdrG00IJxk6",
    "refresh_token": "AgQNOBp00aNqm1Bvm49rW8zYpn4G69N9KMya5k430jV9eQG97zCwUMQb61WWNJvqgO3z3pzBzD7p8alXjNPVKeYBYVj9a",
    "token_type": "Bearer",
    "expires_at_seconds": 1641579693,
    "resource_server": "transfer.api.globus.org"
  }
}


In [11]:
# let's get stuff for the Globus Transfer/Auth services
globus_transfer_data = token_response.by_resource_server["transfer.api.globus.org"]
globus_auth_data = token_response.by_resource_server["auth.globus.org"]
# the refresh token and access token, often abbreviated as RT and AT
transfer_rt = globus_transfer_data["refresh_token"]
transfer_at = globus_transfer_data["access_token"]
transfer_expiry = globus_transfer_data["expires_at_seconds"]

# Now we've got the data we need, but what do we do?
# That "GlobusAuthorizer" from before is about to come to the rescue
authorizer = RefreshTokenAuthorizer(
    transfer_rt, auth_client, access_token=transfer_at, expires_at=transfer_expiry
)

# and try using `tc` to make TransferClient calls. Everything should just
# work -- for days and days, months and months, even years
tc = TransferClient(authorizer=authorizer)
if tc is not None: print ("Transfer client acquired.")

Transfer client acquired.


In [12]:
DEBUG = False

In [13]:
def hello_world():
    return 'Hello World!'

def sleep_30():
    from time import sleep
    sleep(30)
    return 'Woke up after 30 seconds'

# Start up a funcx client -- only do this once!
fxc = FuncXClient(timeout=60)

# Register functions
hello_world_uuid = fxc.register_function(hello_world)
sleep_30_uuid = fxc.register_function(sleep_30)

In [14]:
class UserOptions:
  def configure_ep(self):
    # Check whether TC and Name are provided
    if tc is None:
        print ("Cannot create Xtract Endpoint -- no globus transfer client provided")
        return False
    if self.configure is None:
      print ("Cannot create Xtract Endpoint -- no endpoint name provided.")
      return False
    
    # Load user options stored locally
    if os.path.isfile("/Users/joaovictor/" + self.configure +  "/config2.json"):
      print ("Found config file in /Users/joaovictor/" + self.configure +  "/config2.json") 
      data = None
      with open("/Users/joaovictor/" + self.configure +  "/config2.json") as f:
        data = json.loads(f.read())
        for key in data.keys():
          if DEBUG: print (key, data[key])
          if hasattr (self, key):
            setattr (self, key, data[key])
    else:
      print ("No config file found or provided.")
    
    if not self.globus_eid:
      print("Cannot create Xtract Endpoint -- missing globus_eid")
      return False
    if not self.local_metadata:
      print("Cannot create Xtract Endpoint -- missing local metadata")
      return False
    if not self.metadata_write_dir:
      print("Cannot create Xtract Endpoint -- missing metadata write directory")
      return False
    print ("Created Xtract Endpoint!")
    return True

  def __init__(self):
    self.tc = None
    self.configure = None
    self.globus_eid = None
    self.funcx_eid = None
    self.local_metadata = None
    self.metadata_write_dir = None

In [25]:
'''
Checks whether a funcx endpoint and globus endpoint are online, returning the result as a dictionary.
Might be good to perform some basic error checking, particularly around funcx_response

TODO: Catch problems in funcx-endpoint -- timeout of 30 seconds
      Catch problems using TransferAPIError
'''
from time import sleep

def funcx_is_online(funcx_eid:str, func_uuid:str):
    timeout = 60
    funcx_response = fxc.run(endpoint_id=ops.funcx_eid, function_id=hello_world_uuid)
    fxc_result = None
    if not funcx_eid:
        return "Error - Funcx Endpoint ID not provided."
    if not func_uuid:
        return "Error - Function UUID not provided."
    time_elapsed = 0
    while time_elapsed < timeout:
        try:
            fxc_result = fxc.get_result(funcx_response)
        except TaskPending as error:
            if DEBUG: print(f"Task incomplete -- time elapsed: {time_elapsed}")
            sleep(5)
            time_elapsed += 5
            if DEBUG: print(f"About to continue to next!")
            continue
        if DEBUG: print(f"About to call break and get out!")
        break
    if fxc_result is not None and fxc_result == 'Hello World!':
        if DEBUG: print (f"Task complete -- result: {fxc_result}")
        return "OK"
    if fxc_result is None:
        return f"{error.reason}"
    elif fxc_result != 'Hello World!':
        return f"Error -- Expected: \'Hello World!\', but received: \'{fxc_result}\'"
    
def globus_is_online(globus_eid:str):
    if ops.globus_eid is None:
        return "Error - Globus Endpoint ID not provided."
    try:
        endpoint = tc.get_endpoint(ops.globus_eid)
    except TransferAPIError as error:
        return f"Error -- Code: {error.code} {error.message}"
    
    if DEBUG: print (endpoint)
    if endpoint is not None: # and endpoint["DATA"][0]["is_connected"]:
        return "OK"
    else: 
        return "Not OK"
    
def is_online(funcx_eid:str, func_uuid:str, globus_eid:str):
    res = dict()
    res["funcx"] = funcx_is_online(funcx_eid, func_uuid)
    res["globus"] = globus_is_online(globus_eid)
    return res

In [65]:
def fetch_containers(ops:UserOptions, source:str, request:dict):
    print (f"Fetching containers:")
    print (f"Destination: {ops.globus_eid}")
    print (f"Source: {source}")
    
    expected = {'funcx': 'OK', 'globus': 'OK'}
    print (is_online (ops.funcx_eid, hello_world_uuid, ops.globus_eid))
    
    if is_online (ops.funcx_eid, hello_world_uuid, ops.globus_eid) != expected:
        print ("Error -- run xtract_cli configure first!")
    else:
        print ("Xtract_CLI is properly configured.")
        

    print (request)
    
    tc = ops.tc
    print (tc)
    
    tdata = globus_sdk.TransferData(ops.tc, source,
                                    ops.globus_eid,
                                    label="cloning xtract python image",
                                    sync_level="checksum")
    tdata.add_item (request["source_path"], request["destination_path"])
    
    transfer_result = tc.submit_transfer(tdata)
    print("task_id =", transfer_result["task_id"])

In [62]:
ops = UserOptions()
ops.tc = tc
ops.configure = 'test'
ops.configure_ep()

Found config file in /Users/joaovictor/test/config2.json
Created Xtract Endpoint!


True

In [63]:
is_online (ops.funcx_eid, hello_world_uuid, ops.globus_eid)

{'funcx': 'OK', 'globus': 'OK'}

In [66]:
DESTINATION = "/~/Desktop/containers/xtract-python2.img"
SOURCE = "/XtractContainerLibrary/xtract-python.img"

transfer_request = {
    "DATA_TYPE": "transfer_item",
    "source_path": SOURCE,
    "destination_path": DESTINATION,
    "recursive": False
}

fetch_containers(ops, petrel_uuid, transfer_request)

Fetching containers:
Destination: 71f9aca8-6929-11ec-b2c3-1b99bfd4976a
Source: 4f99675c-ac1f-11ea-bee8-0e716405a293
{'funcx': 'OK', 'globus': 'OK'}
Xtract_CLI is properly configured.
{'DATA_TYPE': 'transfer_item', 'source_path': '/XtractContainerLibrary/xtract-python.img', 'destination_path': '/~/Desktop/containers/xtract-python2.img', 'recursive': False}
<globus_sdk.transfer.client.TransferClient object at 0x7f873732dca0>
task_id = 76b60106-6e5b-11ec-bdef-55fe55c2cfea
