In [None]:
from copy import deepcopy
import json
import time
import globus_automate_client
import mdf_toolbox

In [None]:
# Create Flows client
native_app_id = "417301b1-5101-456a-8a27-423e71a2ae26"  # Premade native app ID
flows_client = globus_automate_client.create_flows_client(native_app_id)

In [None]:
# Required secret keys for deploying Flow (not in Flow definition JSON)
from getpass import getpass
smtp_user = getpass("SMTP Username: ")
smtp_pass = getpass("SMTP Password: ")
smtp_hostname = "email-smtp.us-east-1.amazonaws.com"
smtp_send_credentials = [{
    # "credential_method": "",
    "credential_type": "smtp",
    "credential_value": {
        "hostname": smtp_hostname,
        "username": smtp_user,
        "password": smtp_pass
    }
}]

In [None]:
# Schemas of different APs for reference
transfer_input_schema = {
    # "deadline": "datetime str",
    "destination_endpoint_id": "str",
    "label": "str",
    "source_endpoint_id": "str",
    # "sync_level": "str 0-3",
    "transfer_items": [{
        "destination_path": "str",
        "recursive": "bool",
        "source_path": "str"
    }]
}
transfer_permission_schema = {
    "endpoint_id": "string",
    "operation": "string",
    "path": "string",
    "permissions": "string",
    "principal": "string",
    "principal_type": "string"
}
curation_input_schema = {
    "curator_emails": "list of str, or False",
    "curator_template": "str or False",  # variables: $landing_page
    "curation_permissions": "list of str",
    "curation_text": "str or False",
    "author_email": "str or False",
    "author_template": "str or False",  # variables: $curation_task_id, $decision, $reason
    "email_sender": "str",
    "send_credentials": [{}]
}
xtract_input_schema = {
    "metadata_storage_ep": "str",
    "eid": "str",
    "dir_path": "str",
    "mapping": "match",  # ?
    "dataset_mdata": {"test1": "test2"},
    "validator_params": {"schema_branch": "master", "validation_info": {"test1": "test2"}},
    "grouper": "matio"  # options are 'directory/matio'
}

In [None]:
# Load MDF Flow definition from JSON
with open("mdf_flow_def.json") as f:
    mdf_flow_def = json.load(f)
# Add required secret keys
mdf_flow_def["definition"]["States"]["ExceptionState"]["Parameters"]["send_credentials"] = smtp_send_credentials
mdf_flow_def["definition"]["States"]["NotifyUserEnd"]["Parameters"]["send_credentials"] = smtp_send_credentials

In [None]:
# Load other configuration variables
# Please set these in the configuration file, not in-line here
with open("mdf_flow_config.json") as f:
    config = json.load(f)
# Permissions (both groups are MDF Connect Admins, for now)
mdf_flow_def["visible_to"] = config["flow_permissions"]
mdf_flow_def["runnable_by"] = config["flow_permissions"]
mdf_flow_def["administered_by"] = config["admin_permissions"]
# Curation and Transfer Loop subflows (see MDF Utility Flows)
mdf_flow_def["definition"]["States"]["UserTransfer"]["ActionUrl"] = config["transfer_loop_url"]
mdf_flow_def["definition"]["States"]["UserTransfer"]["ActionScope"] = config["transfer_loop_scope"]
mdf_flow_def["definition"]["States"]["DataDestTransfer"]["ActionUrl"] = config["transfer_loop_url"]
mdf_flow_def["definition"]["States"]["DataDestTransfer"]["ActionScope"] = config["transfer_loop_scope"]
mdf_flow_def["definition"]["States"]["CurateSubmission"]["ActionUrl"] = config["curation_subflow_url"]
mdf_flow_def["definition"]["States"]["CurateSubmission"]["ActionScope"] = config["curation_subflow_scope"]
# Config for emails
# admin_email gets notified of critical exceptions in the Flow
mdf_flow_def["definition"]["States"]["ExceptionState"]["Parameters"]["destination"] = config["admin_email"]
# sender_email is the address to send emails with (materialsdatafacility@gmail.com)
mdf_flow_def["definition"]["States"]["ExceptionState"]["Parameters"]["sender"] = config["sender_email"]
mdf_flow_def["definition"]["States"]["NotifyUserEnd"]["Parameters"]["sender"] = config["sender_email"]

In [None]:
# Until Xtract AP is operational, mock the dataset entry output for Xtract AP
mock_dataset_entry = {'dc': {'titles': [{'title': 'Base Deploy Testing Dataset'}],
  'creators': [{'creatorName': 'jgaff',
    'familyName': '',
    'givenName': 'jgaff',
    'affiliations': ['UChicago']}],
  'publisher': 'Materials Data Facility',
  'publicationYear': '2020',
  'resourceType': {'resourceTypeGeneral': 'Dataset',
   'resourceType': 'Dataset'}},
 'mdf': {'source_id': '_test_base_deploy_testing_v5.1',
  'source_name': '_test_base_deploy_testing',
  'version': 5,
  'acl': ['public'],
  'scroll_id': 0,
  'ingest_date': '2020-05-06T17:47:05.219450Z',
  'resource_type': 'dataset'},
 'data': {'endpoint_path': 'globus://e38ee745-6d04-11e5-ba46-22000b92c6ec/MDF/mdf_connect/prod/data/_test_base_deploy_testing_v5.1/',
  'link': 'https://app.globus.org/file-manager?origin_id=e38ee745-6d04-11e5-ba46-22000b92c6ec&origin_path=/MDF/mdf_connect/prod/data/_test_base_deploy_testing_v5.1/',
  'total_size': 4709193},
 'services': {}}

mdf_flow_def["definition"]["States"]["Xtract"]["Parameters"]["details"] = {
    "output_link": "https://e38ee745-6d04-11e5-ba46-22000b92c6ec.e.globus.org/MDF/mdf_connect/test_files/mock_feedstock.json",
    "dataset_entry": mock_dataset_entry
}

In [None]:
# These are Flow states that have been disabled or need changes in the main Flow for various reasons.
# Because JSON cannot have comments, they are preserved here.
# This cell is not intended to be run.

# Grant user write permissions on MDF storage (temporarily)
# Disabled because permission-setting was broken
# TODO: Retest - permission-setting possibly fixed
"UserPermissions": {
    "Type": "Pass",
    #"Type": "Action",
    #"ActionUrl": "https://actions.globus.org/transfer/set_permission",
    #"ExceptionOnActionFailure": True,
    "Parameters": {
    #    "endpoint_id.$": "$.mdf_storage_ep",
    #    "operation": "CREATE",
    #    "path.$": "$.mdf_dataset_path",
    #    "permissions": "rw",
    #    "principal.$": # TODO: Change this to context #"$.user_id",
    #    "principal_type": "identity"
    },
    "ResultPath": "$.UserPermissionResult",
    #"WaitTime": 86400,
    "Next": "UserTransfer"
},

# Ditto above
"UndoUserPermissions": {
    "Type": "Pass",
#   "Type": "Action",
#   "ActionUrl": "https://actions.globus.org/transfer/set_permission",
#   "ExceptionOnActionFailure": True
    "Parameters": {},
    "ResultPath": "$.UndoUserPermissionResult",
#   "WaitTime": 86400,
    "Next": "CheckUserTransfer"
},

# Xtract AP not ready yet
"Xtraction": {
    "Type": "Pass",
#                "Type": "Action",
#                "ActionUrl": "https://xtract.materialsdatafacility.org/",
#                #"ActionScope": "https://auth.globus.org/scopes/34284fb1-2eea-4532-a04a-9c8ad1702856/xtract_crawl_and_extract",
#                "ExceptionOnActionFailure": True,
    "Parameters": {
#                    "metadata_storage_ep.$": "$.mdf_storage_ep",
#                    "eid.$": "$.mdf_storage_ep",
#                    "dir_path.$": "$.mdf_dataset_path",
#                    "mapping": "match",  # ?
#                    "dataset_mdata.$": "$.dataset_mdata",
#                    "validator_params.$": "$.validator_params",
#                    # options are 'directory/matio'
#                    "grouper.=": "'directory' if `$.group_by_dir` else 'matio'"
        "details": {
            "output_link": "https://e38ee745-6d04-11e5-ba46-22000b92c6ec.e.globus.org/MDF/mdf_connect/test_files/mock_feedstock.json",
            "dataset_entry": mock_dataset_entry
        }
    },
#                "Catch": [{
#                    "ErrorEquals": ["ActionFailedException"],
#                    "Next": "XtractionFail"
#                }, {
#                    "ErrorEquals": ["States.ALL"],
#                    "Next": "ExceptionState"
#                }],
    "ResultPath": "$.XtractionResult",
#                "WaitTime": 86400,
    "Next": "ChooseCuration"
},
# Re-enable all Xtract together
#            "XtractionFail": {
#                "Type": "ExpressionEval",
#                "Parameters": {
#                    "title": "MDF Submission Failed",
#                    "message.=": ("Your MDF submission `$.source_id` failed during metadata extraction:\n"
#                                  "`$.XtractionResult.details`")
#                },
#                "ResultPath": "$.FinalState",
#                "Next": "ChooseNotifyUserEnd"
#            },

# DCAP broken, not accepting advertised schema
"MDFPublish": {
    "Type": "Pass",
    #"ActionUrl": "https://actions.globus.org/datacite/mint/dc_schema",
    #"ExceptionOnActionFailure": False,
    "Parameters": {
    },
    "ResultPath": "$.MDFPublishResult",
    #"WaitTime": 86400,
    "Next": "ChooseCitrine"
},

# TODO: FuncX function to publish to Citrine
"CitrinePublish": {
    "Type": "Pass",
    #"ActionUrl": "",
    #"ExceptionOnActionFailure": False,
    "Parameters": {
    },
    "ResultPath": "$.CitrinePublishResult",
    #"WaitTime": 86400,
    "Next": "ChooseMRR"
},

# TODO: FuncX function to publish to MRR
"MRRPublish":{
    "Type": "Pass",
    #"ActionUrl": "",
    #"ExceptionOnActionFailure": False,
    "Parameters": {
    },
    "ResultPath": "$.MRRPublishResult",
    #"WaitTime": 86400,
    "Next": "PrepareSearchUpdate"
},

"PrepareSearchUpdate": {
    "Type": "ExpressionEval",
    # TODO: Apply services changes to dataset entry
    "Parameters":{
        "subject.$": "$.source_id",
        "content.=": "`$.XtractionResult.details.dataset_entry`",
        "visible_to.$": "$.dataset_acl",
        "search_index.$": "$.search_index"
    },
    "ResultPath": "$.SearchUpdateInfo",
    "Next": "SearchUpdate"
},

In [None]:
# Deploy MDF Flow
# Deploying recommended over updating - Automate Flow updates are not stable
flow_deploy_res = flows_client.deploy_flow(
    flow_definition=mdf_flow_def["definition"],
    title=mdf_flow_def["title"],
    description=mdf_flow_def["description"],
    visible_to=mdf_flow_def["visible_to"],
    runnable_by=mdf_flow_def["runnable_by"],
    administered_by=mdf_flow_def["administered_by"],
    # TODO: Make rough schema outline into JSONSchema
    input_schema={}, #  mdf_flow_def["schema"],
    validate_definition=True,
    validate_input_schema=True
)
flow_id = flow_deploy_res["id"]
flow_scope = flow_deploy_res["globus_auth_scope"]

# Save Flow ID/scope for future use
with open("mdf_flow_info.json", 'w') as f:
    flow_info = {
        "flow_id": flow_id,
        "flow_scope": flow_scope
    }
    json.dump(flow_info, f)

In [None]:
# Alternatively, to keep the same Flow ID and scope, update the Flow
# Deploying recommended over updating - Automate Flow updates are not stable
with open("mdf_flow_info.json") as f:
    flow_info = json.load(f)
    flow_id = flow_info["flow_id"]
    flow_scope = flow_info["flow_scope"]

flow_update_res = flows_client.update_flow(
    flow_id,
    flow_definition=mdf_flow_def["definition"],
    title=mdf_flow_def["title"],
    description=mdf_flow_def["description"],
    visible_to=mdf_flow_def["visible_to"],
    runnable_by=mdf_flow_def["runnable_by"],
    administered_by=mdf_flow_def["administered_by"],
    # TODO
    input_schema={},
    validate_definition=True,
    validate_input_schema=True)

In [None]:
# Print the ID and scope, which is useful for debugging purposes.
print(flow_id)
print(flow_scope)

In [None]:
# Generate input credentials
# The feedstock auth header is shorter-lived, so it's not set in the Flow defintition.
# However, the header must be from an identity with access to the Xtract output location (currently Petrel)
petrel_auth = mdf_toolbox.login(services=["petrel"], make_clients=False)["petrel"]
feedstock_auth_header = {}
petrel_auth.set_authorization_header(feedstock_auth_header)
feedstock_auth_header = feedstock_auth_header["Authorization"]

# The RunAs token is used to Transfer data from the user as the user's identity.
# It will be given by the MDF user (by logging in with MDF).
# NOTE: Currently, RunAs is not used on the Transfer.
run_as_auth = mdf_toolbox.login(services=[flow_scope], make_clients=False)[flow_scope]
run_as_token = run_as_auth.refresh_token

In [None]:
# Get test input for Flow
with open("mdf_flow_input.json") as f:
    flow_input = json.load(f)

# Add credentials to input
flow_input["_private_feedstock_auth_header"] = feedstock_auth_header
flow_input["_tokens"] = {"MDFUser": run_as_token}

# Create unique source_id for submission (source_id must be unique except for updates)
source_id = "mdf_flow_test_" + str(int(time.time())) + "_v1.1"
flow_input["source_id"] = source_id

In [None]:
# Initiate Flow
# Careful to run this cell only once per test - the first Flow's Action ID will be overwritten and lost
# if this is run a second time
flow_res = flows_client.run_flow(flow_id, flow_scope, flow_input)
flow_res.data

In [None]:
# Check status of Flow
# This cell should be run multiple times, to check the status until the Flow succeeds or fails
status = flows_client.flow_action_status(flow_id, flow_scope, flow_res["action_id"]).data
print(json.dumps(status, indent=4, sort_keys=True))

In [None]:
# Check the Flow Log
# The Log is very verbose and only necessary when debugging errors in Flow execution
flows_client.flow_action_log(flow_id, flow_scope, flow_res["action_id"], limit=100).data