# Using Xtract to index research artifacts stored on Jetstream


### This Xtract-Jetstream demo illustrates how to crawl, extract metadata from, and ingest metadata for any Globus Endpoint.

#### We begin by importing important libraries. Of note, we use the `mdf_toolbox` library as a wrapper for Globus Auth. 

In [22]:
import time
import json
import requests
import mdf_toolbox

## Step 0: Configuration

#### Here we provide configuration details for our metadata extraction job, including specifications for both Globus and funcX.

In [23]:
# JETSTREAM: Globus endpoint and directory path AND funcX endpoint where the data reside
source_ep_path_1 = "/MDF/mdf_connect/prod/data/" 
local_mdata_dir = "/home/tskluzac/mdata"
source_ep_id = "e38ee745-6d04-11e5-ba46-22000b92c6ec"  # Globus ID for Jetstream instance 1
funcx_ep_id = "e1398319-0d0f-4188-909b-a978f6fc5621"  # funcX ID for Jetstream instance 1



# PETREL: Globus endpoint and file path at which we want to archive metadata documents
mdata_ep_id = "4f99675c-ac1f-11ea-bee8-0e716405a293"  # Xtract Petrel EP: 4f...93
remote_mdata_dir = "/home/my_metadata"

# CRAWLER URL:
eb_crawl_url = "http://xtractcrawler5-env.eba-akbhvznm.us-east-1.elasticbeanstalk.com"
# eb_crawl_url = "http://127.0.0.1:5000"
# eb_extract_url = "http://127.0.0.1:5000"
# eb_crawl_url = "http://xtractservice2-env.eba-xh7cjv4i.us-east-1.elasticbeanstalk.com"
#eb_extract_url = "http://xtractservice2-env.eba-xh7cjv4i.us-east-1.elasticbeanstalk.com"

# GROUPER: Grouping strategy we want to use for grouping. "file_is_group" means each file is a distinct data entity. 
grouper = "file_is_group"

## Step 1: Login 

Here we use `mdf_toolbox` to request tokens from Globus Auth. When fresh tokens are needed, users will authenticate with their Globus ID by following the directions in the STDOUT. Notable auth scopes are as follows: 

* **openid**: provides username for identity.
* **search**: interact with Globus Search
* **petrel**: read or write data on Petrel. Not needed if no data going to Petrel.
* **transfer**: needed to crawl the Globus endpoint and transfer metadata to its final location.
* **funcx_scope**: needed to orchestrate the metadata exraction at the given funcX endpoint.

The following code block initializes all of the tokens.

In [24]:
print("Authenticating...")
funcx_scope = "https://auth.globus.org/scopes/facd7ccc-c5f4-42aa-916b-a0e270e2c2a9/all"
search_all = "urn:globus:auth:scope:search.api.globus.org:all"
auths = mdf_toolbox.login(
    services=[
        "openid",
        "data_mdf",
        "search",
        "petrel",
        "transfer",
        search_all,
        "dlhub",
        funcx_scope,
    ],
    app_name="Foundry",
    make_clients=True,
    no_browser=False,
    no_local_server=False,
    # force=True
)
print("Authentication successful!")
print(auths)

Authenticating...
Authentication successful!
{'urn:globus:auth:scope:search.api.globus.org:all': <globus_sdk.search.client.SearchClient object at 0x7fbaf858b3c8>, 'search': <globus_sdk.search.client.SearchClient object at 0x7fbaf858d898>, 'data_mdf': <globus_sdk.authorizers.refresh_token.RefreshTokenAuthorizer object at 0x7fbaf858d2b0>, 'petrel': <globus_sdk.authorizers.refresh_token.RefreshTokenAuthorizer object at 0x7fbaf858d4a8>, 'dlhub': <globus_sdk.authorizers.refresh_token.RefreshTokenAuthorizer object at 0x7fbaf858d518>, 'transfer': <globus_sdk.transfer.client.TransferClient object at 0x7fbaf858dbe0>, 'https://auth.globus.org/scopes/facd7ccc-c5f4-42aa-916b-a0e270e2c2a9/all': <globus_sdk.authorizers.refresh_token.RefreshTokenAuthorizer object at 0x7fbaf858d668>, 'openid': <globus_sdk.authorizers.refresh_token.RefreshTokenAuthorizer object at 0x7fbaf858d6a0>}


## Step 2: Crawl
Crawling, behind the scenes, will scan a Globus directory breadth-first (using globus_ls), first extracting physical metadata such as path, size, and extension. Next, since the *grouper* we selected is 'file_is_group', the crawler will simply create `n` single-file groups. 

The crawl is **non-blocking**, and the crawl_id here will be used to execute and monitor downstream extraction processes. 

In [25]:
crawl_url = f'{eb_crawl_url}/crawl'
print(f"Crawl URL is : {crawl_url}")

first_ep_dict = {
    'repo_type': 'GLOBUS',
    'eid': source_ep_id,
    'dir_paths': [source_ep_path_1], # Can add more than one path to this list. 
    'grouper': grouper
}

crawl_tokens = {'Transfer': auths['transfer'].authorizer.access_token, 
          'Authorization': f"Bearer {auths['transfer'].authorizer.access_token}", 
          'FuncX': auths[funcx_scope].access_token}  # , # 'Search': auths['search'].authorizer.access_token, 
          # OpenID': auths['openid'].access_token}

crawl_req = requests.post(crawl_url, json={'endpoints': [first_ep_dict], 'tokens': crawl_tokens})
print(crawl_req.content)
crawl_id = json.loads(crawl_req.content)['crawl_id']
print(f"Crawl ID: {crawl_id}")

Crawl URL is : http://xtractcrawler5-env.eba-akbhvznm.us-east-1.elasticbeanstalk.com/crawl
b'{"crawl_id":"4c5ea953-85e1-4dda-a03c-0cb048b32800","status":"200 (OK)"}\n'
Crawl ID: 4c5ea953-85e1-4dda-a03c-0cb048b32800


We can get crawl status, seeing how many groups have been identified in the crawl. 

Note that measuring the total files yet to crawl is impossible, as the BFS may not have discovered all files yet, and Globus does not yet have a file counting feature for all directories and subdirectories. I.e., we know when we're done, but we do not know until we get there. 

**Warning:** it currently takes up to 30 seconds for a crawl to start. *Why?* Container warming time. 

In [26]:
while True: 
    crawl_status = requests.get(f'{eb_crawl_url}/get_crawl_status', json={'crawl_id': crawl_id})
    print(crawl_status)
    crawl_content = json.loads(crawl_status.content)
    print(f"Crawl Status: {crawl_content}")
    
    # Break the loop if we collected 
    if crawl_content['crawl_status'] == 'complete':
        break
    time.sleep(2)

<Response [200]>
Crawl Status: {'crawl_id': '4c5ea953-85e1-4dda-a03c-0cb048b32800', 'crawl_status': 'initializing'}
<Response [200]>
Crawl Status: {'crawl_id': '4c5ea953-85e1-4dda-a03c-0cb048b32800', 'crawl_status': 'initializing'}
<Response [200]>
Crawl Status: {'crawl_id': '4c5ea953-85e1-4dda-a03c-0cb048b32800', 'crawl_status': 'initializing'}
<Response [200]>
Crawl Status: {'crawl_id': '4c5ea953-85e1-4dda-a03c-0cb048b32800', 'crawl_status': 'initializing'}
<Response [200]>
Crawl Status: {'crawl_id': '4c5ea953-85e1-4dda-a03c-0cb048b32800', 'crawl_status': 'initializing'}
<Response [200]>
Crawl Status: {'crawl_id': '4c5ea953-85e1-4dda-a03c-0cb048b32800', 'crawl_status': 'initializing'}
<Response [200]>
Crawl Status: {'crawl_id': '4c5ea953-85e1-4dda-a03c-0cb048b32800', 'crawl_status': 'initializing'}
<Response [200]>
Crawl Status: {'crawl_id': '4c5ea953-85e1-4dda-a03c-0cb048b32800', 'crawl_status': 'initializing'}
<Response [200]>
Crawl Status: {'crawl_id': '4c5ea953-85e1-4dda-a03c-0cb

<Response [200]>
Crawl Status: {'bytes_crawled': 10603240234, 'crawl_id': '4c5ea953-85e1-4dda-a03c-0cb048b32800', 'crawl_status': 'crawling', 'files_crawled': 108664, 'groups_crawled': 107560}
<Response [200]>
Crawl Status: {'bytes_crawled': 10616993586, 'crawl_id': '4c5ea953-85e1-4dda-a03c-0cb048b32800', 'crawl_status': 'crawling', 'files_crawled': 108725, 'groups_crawled': 108725}
<Response [200]>
Crawl Status: {'bytes_crawled': 10618917853, 'crawl_id': '4c5ea953-85e1-4dda-a03c-0cb048b32800', 'crawl_status': 'crawling', 'files_crawled': 108803, 'groups_crawled': 108803}
<Response [200]>
Crawl Status: {'bytes_crawled': 10620589837, 'crawl_id': '4c5ea953-85e1-4dda-a03c-0cb048b32800', 'crawl_status': 'crawling', 'files_crawled': 108847, 'groups_crawled': 108847}
<Response [200]>
Crawl Status: {'bytes_crawled': 10673230669, 'crawl_id': '4c5ea953-85e1-4dda-a03c-0cb048b32800', 'crawl_status': 'crawling', 'files_crawled': 138100, 'groups_crawled': 108911}
<Response [200]>
Crawl Status: {'by

KeyboardInterrupt: 

## Step 3a. You can directly flush the crawl metadata via REST 

#### Why? Downloading crawl metadata is useful for many file organization tasks, such as: 
- I want a list of all files on my file system
- I want to know the total size (GB) of a folder
- I want to tally files by extension

#### Currently Foundry uses Xtract to create a list of all files in user-submitted folders. Check it out here: 
TODO: LINK TO FOUNDRY. 

**Caution**: if you flush the crawl metadata (3a), **you may not** extract metadata from them (3b). If you want to do both, you must launch two separate crawl jobs. 

In [26]:
# while True:
#     req = requests.get(f'{eb_url}/fetch_crawl_mdata', json={'crawl_id': crawl_id, 'n': 100})
#     print(req.content)
#     time.sleep(1)

In [27]:
# print(f"Tokens: {tokens}")

# # HERE WE WILL TEST CONFIGURING OUR ENDPOINT. 
# config_status = requests.post(f"{eb_url}/configure_ep/{funcx_ep_id}", json={'headers': fx_headers, 
#                                                                             'timeout': 25, 
#                                                                             'ep_name': 'tyler_test_ep_2', 
#                                                                             'globus_eid': '12345', 
#                                                                             'xtract_path':'/Users/tylerskluzacek/.xtract',
#                                                                             'local_download_path': 'foobar',
#                                                                             'local_mdata_path': '/Users/tylerskluzacek/Desktop/metadata'
#                                                                      })
# config_content = json.loads(config_status.content)
# print(f"Returned: {config_content}")


## Step 3b: Xtract

Next we launch a non-blocking metadata extraction workflow that will automatically find all groups generated from our crawl_id, ship parsers to our endpoint as funcX, transfer the file (if necessary), and extract/send back metadata to the central Xtract service. This will just run constantly until the crawl is done and there are crawled groups left to extract. 

In [28]:
fx_headers = {'Authorization': f"Bearer {auths[funcx_scope].access_token}",
             'Search': auths['search'].authorizer.access_token,
             'Openid': auths['openid'].access_token}

xtract = requests.post(f'{eb_extract_url}/extract', json={
    'crawl_id': crawl_id, 
    'tokens': fx_headers, 
    'local_mdata_path': local_mdata_dir, 
    'remote_mdata_path': remote_mdata_dir})
print(f"Xtract response (should be 200): {xtract}")


Xtract response (should be 200): <Response [200]>


In [36]:
xtract_status = requests.get(f'{eb_extract_url}/get_extract_status', json={'crawl_id': crawl_id})
print(f"Xtract Status: {json.loads(xtract_status.content)['status']}")
print(f"Xtract Counters: {json.loads(xtract_status.content)['counters']}")

Xtract Status: SCHEDULED
Xtract Counters: {'cumu_orch_enter': 1000, 'cumu_pulled': 1000, 'cumu_scheduled': 1000, 'cumu_to_schedule': 1000, 'flagged_unknown': 0, 'fx': {'failed': 0, 'pending': 0, 'success': 395}}


## Step 4 (optional): Globus Search ingest

#### In this step we create (and name) a Globus Search index for our data.



In [53]:
# eb_extract_url = 'http://127.0.0.1:5000'

search_index = "ce2d9637-ad96-423f-99bc-935de889f640"

fx_headers = {'Authorization': f"Bearer {auths[funcx_scope].access_token}",
             'Search': auths[search_all].authorizer.access_token,
             'Openid': auths['openid'].access_token}

search_info = {
    'dataset_mdata': {'organizer':  'Tyler J. Skluzacek'},
    'search_index_id': search_index,
    'mdata_dir': local_mdata_dir,  
    'tokens': fx_headers
}

resp = requests.post(f'{eb_extract_url}/ingest_search', json=search_info)
print(resp)




<Response [500]>


## Step 5: Metadata transfer (archive)

#### Metadata, by default, are stored on the filesystem of the machine on which they were extracted. Here we can move them to a Globus endpoint of our choosing. 

Here I will push the metadata to ALCF's Petrel data store and opt not to DELETE them from Jetstream. 

In [None]:
tokens = {"Transfer": auths["petrel"].access_token}

while True:
    xtract_status = requests.post(f'{eb_extract_url}/offload_mdata', json={
        'crawl_id': crawl_id, 
        'tokens': crawl_tokens, 
        'source_ep': source_ep_id, 
        'mdata_ep': mdata_ep_id, 
        'delete_source': False})

    response = json.loads(xtract_status.content)
    print(response['status'])
    if response['status'] == 'SUCCESS':
        break

## Step 6: Let's query the index!

In [None]:
# COMING SOON.