# Using Xtract to extract MaterialsIO metadata from MDF files

This Xtract-MDF demo illustrates how to crawl, extract metadata from, and store metadata for any Globus HTTPS-accessible repository. 

Moreover, users can execute metadata extraction workflows on any machine running a funcX endpoint, whether it's ANL's Cooley, a laptop, or the cloud (for this demo, we use an EC2 instance). 

In [24]:
from fair_research_login import NativeClient
import requests
import pickle
import json

# Globus endpoint and directory path to where the data live (default: MDF data repo on petrel#researchdatalanalytics).
# source_ep_id = "e38ee745-6d04-11e5-ba46-22000b92c6ec"  # where the data are
# source_ep_path = "/MDF/mdf_connect/prod/data/_test_mayer_situ_observation_g20o995_v1.1"
# source_ep_path = "/MDF/mdf_connect/prod/data/klh_1_v1/exposure1_jpg.jpg"
# source_ep_path = "/MDF/mdf_connect/prod/data/h2o_13_v1-1/split_xyz_files/watergrid_60_HOH_180__0.7_rOH_1.8_vario_PBE0_AV5Z_delta_PS_data"
# source_ep_path = "/MDF/mdf_connect/prod/data/khazana_vasp_v2/OUTCARS"
# source_ep_path = "/MDF/mdf_connect/prod/data/_test_einstein_9vpflvd_v1.1"
# source_ep_path = "/MDF/mdf_connect/prod/data"
# source_ep_path = "/mdf_open/charge_ice_prappl_2020_v1.1/ChargeIce/Spectral-data/ChargeIce_TypeI"
source_ep_path = "/mdf_open/tsopanidis_wha_amaps_v1.1/Activation Maps/Activation Maps for the WHA Dataset"
# source_ep_path = "/mdf_open/kearns_biofilm_rupture_location_v1.1/Biofilm Images/Paper Images/Isofluence Images (79.4)/THY+75mM AA"

# source_ep_id = "e38ee745-6d04-11e5-ba46-22000b92c6ec"
source_ep_id = "82f1b5c6-6e9b-11e5-ba47-22000b92c6ec"
dest_ep_id = "1adf6602-3e50-11ea-b965-0e16720bb42f"  # where they will be extracted

# Globus endpoint and file path at which we want to store metadata documents
mdata_ep_id = "5113667a-10b4-11ea-8a67-0e35e66293c2"
mdata_path = "/projects/DLHub/mdf_metadata"  # TODO: Add exception if you put slash at the end of the mdata_path. 

# FuncX endpoint at which we want the metadata extraction to occur. Does NOT have to be same endpoint as the data.
#. funcx_ep_id = "6045fcfb-c3ef-48db-9b32-5b50fda15144"  # Path to funcX running on JetStream. 
funcx_ep_id = "82ceed9f-dce1-4dd1-9c45-6768cf202be8"  # River k8s cluster. 


# URLs where the Xtract service (AWS Elastic Beanstalk) AND the metadata poller service (AWS EC2) run
#eb_url = "http://xtractv1-env-2.p6rys5qcuj.us-east-1.elasticbeanstalk.com"
eb_url = "http://127.0.0.1:5000"
# eb_url = "http://xtract-crawler-4.eba-ghixpmdf.us-east-1.elasticbeanstalk.com"
# poller_url = "http://ec2-54-173-234-195.compute-1.amazonaws.com"

# Grouping strategy we want to use for grouping. This will, by default, use all .group() functions from matio parsers.
grouper = "matio"

## Step 1: Login 

Here we request tokens from Globus Auth coming from three separate scopes. When fresh tokens are needed, tthe NativeClient will provide a link at which the user can authenticate with their Globus ID, providing a box at which to paste the Authentication Code. The scopes are as follows: 

* **petrel_https_server**: needed to access the MDF data on Petrel. Will need to change if processing data off-Petrel. 
* **transfer_token**: needed to crawl the Globus endpoint and transfer metadata to its final location. 
* **funcx_token**: needed to orchestrate the metadata exraction at the given funcX endpoint.

Additionally we package the tokens as *headers* that we can easily ship with later requests. 

In [25]:
client = NativeClient(client_id='7414f0b4-7d05-4bb6-bb00-076fa3f17cf5')
tokens = client.login(
    requested_scopes=['https://auth.globus.org/scopes/56ceac29-e98a-440a-a594-b41e7a084b62/all', 
                      'urn:globus:auth:scope:transfer.api.globus.org:all',
                     "https://auth.globus.org/scopes/facd7ccc-c5f4-42aa-916b-a0e270e2c2a9/all", 
                     'email', 'openid'],
    no_local_server=True,
    no_browser=True)

auth_token = tokens["petrel_https_server"]['access_token']
transfer_token = tokens['transfer.api.globus.org']['access_token']
funcx_token = tokens['funcx_service']['access_token']


headers = {'Authorization': f"Bearer {auth_token}", 'Transfer': transfer_token, 'FuncX': funcx_token, 'Petrel': auth_token}
print(f"Headers: {headers}")

Headers: {'Authorization': 'Bearer AgkqlKr76gk2mplQwJ5BKbN5xPP7bDzQON9da1j96k1YlexKBzTOCOqo33advKPVy0ByBMN7XBnvWVcjqm5BEsYVw7', 'Transfer': 'AgVGq0egpP6qQdg52nMEM216V1MdGYgdXMjwmm5d0WQdpmlwDjSbCXOJWbvOVPdXa1GYVkXEpwk1gdUlK1jnwi9w0r', 'FuncX': 'AgbMDqq3MBoGP2Q80pe2MgXyJrKzVpgOmEK6m80rp40kJeWJbC5CE7Gya4VVdmrz7z5kajKQbXQwvIDaXwOzhNVBo', 'Petrel': 'AgkqlKr76gk2mplQwJ5BKbN5xPP7bDzQON9da1j96k1YlexKBzTOCOqo33advKPVy0ByBMN7XBnvWVcjqm5BEsYVw7'}


## Step 2: Crawl
Crawling, behind the scenes, will scan a Globus directory breadth-first (using globus_ls), first extracting physical metadata such as path, size, and extension. Next, since the *grouper* we selected is 'matio', the crawler will execute matio's `get_groups_by_postfix()` function on all file names in a directory in order to return groups for each of matio's parsers (besides *generic* and *noop*). 

The crawl will run as a non-blocking thread, and return a crawl_id that will be used extensively to track progress of our metadata extraction workflow.

In [27]:
# TODO: Adjust this to the Google Drive model!!!

crawl_url = f'{eb_url}/crawl'
print(f"Crawl URL is : {crawl_url}")
crawl_req = requests.post(f'{eb_url}/crawl', json={'repo_type': "GLOBUS", 'eid': source_ep_id, 'dir_path': source_ep_path, 'Transfer': transfer_token, 'Authorization': funcx_token,'grouper': grouper})
print(crawl_req.content)
crawl_id = json.loads(crawl_req.content)['crawl_id']
print(f"Crawl ID: {crawl_id}")

Crawl URL is : http://127.0.0.1:5000/crawl
b'{"crawl_id":"d7ad1b6a-fa6c-4929-bca9-a40b8a716959"}\n'
Crawl ID: d7ad1b6a-fa6c-4929-bca9-a40b8a716959


We can get crawl status, seeing how many groups have been identified in the crawl. 

Note that measuring the total files yet to crawl is impossible, as the BFS may not have discovered all files yet, and Globus does not yet have a file counting feature for all directories and subdirectories. I.e., we know when we're done, but we don't know until we get there. 

In [28]:
crawl_id
crawl_status = requests.get(f'{eb_url}/get_crawl_status', json={'crawl_id': crawl_id})
print(crawl_status)
crawl_content = json.loads(crawl_status.content)
print(f"Crawl Status: {crawl_content}")

<Response [200]>
Crawl Status: {'bytes_crawled': 333373564, 'crawl_id': 'd7ad1b6a-fa6c-4929-bca9-a40b8a716959', 'files_crawled': 810, 'group_crawled': 4860}


## Step 3: Xtract

Next we launch a non-blocking metadata extraction workflow that will automatically find all groups generated from our crawl_id, ship parsers to our endpoint as funcX, transfer the file (if necessary), and extract/send back metadata to the central Xtract service. This will just run constantly until the crawl is done and there are crawled groups left to extract. 

In [133]:
xtract = requests.post(f'{eb_url}/extract', json={'crawl_id': crawl_id,
                                                  ''
                                                  'headers': json.dumps(headers),
                                                  'funcx_eid': funcx_ep_id, 
                                                  'source_eid': source_ep_id,
                                                  'dest_eid': dest_ep_id,
                                                  'mdata_store_path': mdata_path})
print(f"Xtract response (should be 200): {xtract}")

Xtract response (should be 200): <Response [200]>


In [708]:
xtract_status = requests.get(f'{eb_url}/get_extract_status', json={'crawl_id': crawl_id})
xtract_content = json.loads(xtract_status.content)
print(f"Xtract Status: {xtract_content}")

KeyboardInterrupt: 

## Step 4: Access / Flush

We might want to flush all new metadata blobs to a separate Globus endpoint. Here we initialize a results poller that creates a file of each metadata attribute to a folder at this path: `<mdata_path>/<crawl_id>/<group_id>`

In [246]:
poller = requests.post(f'{poller_url}/', json={'crawl_id': crawl_id, 'mdata_ep_id': mdata_ep_id, 'Transfer': transfer_token})
print(f'Flush Status: {poller}')

Flush Status: <Response [200]>
