In [15]:
from pylinkedcmd import usgsweb
import pandas as pd
from joblib import Parallel, delayed
import tqdm
import requests
from collections import Counter
from iteration_utilities import unique_everseen
import os
import pickle

usgs_web = usgsweb.UsgsWeb()

In working through the data, I found cases where profiles have been created that reference people but use email addresses that essentially mean we have no way of getting any further details about those people as individuals. This means we really don't need to pull their records into our process. I set those here as external to the codebase, but we can think about that differently as needed.

In [2]:
non_id_emails = [
    "ask@usgs.gov",
    "astro_outreach@usgs.gov"
]

This function from the usgsweb module gets the current maximum page in the Drupal-based pagination structure that is set up for the full profile listing and tees up the individual URLs to be hit in the inventory scraping routine.

In [3]:
inventory_urls = usgs_web.get_staff_inventory_pages()
print(len(inventory_urls))
display(inventory_urls[:5])

284


['https://www.usgs.gov/connect/staff-profiles?page=0',
 'https://www.usgs.gov/connect/staff-profiles?page=1',
 'https://www.usgs.gov/connect/staff-profiles?page=2',
 'https://www.usgs.gov/connect/staff-profiles?page=3',
 'https://www.usgs.gov/connect/staff-profiles?page=4']

Since we are going after basic web pages, it is reasonable to parallelize the process to grab up at least a few pages at a time for processing. In the next two code blocks I set up an accumulator function and then run a number of threads to run the scraper on the inventory URLs. Everything goes into an in memory list of dictionaries called staff_inventory.

When shifting to a message queue/lambda architecture for this, the URLs generated above become messages on the queue, and then lambda handles the multiprocessing. We will then need to look to some other data store to house the information scraped from the inventory (more on that below).

In [4]:
staff_inventory = list()

def accumulator(url):
    staff_list = usgs_web.get_staff_listing(url)
    if isinstance(staff_list, list):
        staff_inventory.extend([i for i in staff_list if "email" in i and i["email"] not in non_id_emails])
    else:
        print(type(staff_list))

In [5]:
Parallel(n_jobs=20, prefer="threads")(
    delayed(accumulator)
    (
        i
    ) for i in tqdm.tqdm(inventory_urls)
)

100%|██████████| 284/284 [00:42<00:00,  6.66it/s]


[None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,

The main thing the inventory process gives us at this point is the basic identification of a staff member and the link to their profile page. The name, title, organization name/link, email, and telephone number are all useful bits of information (though I'm not retaining telephone at this point - who uses phones for a voice call anymore?), but we also get those from the profile page themselves.

In a message/lambda architecture, these individual records could go on another queue for scraping the actual individual profile pages, but note the limitation documented below.

In [6]:
print(len(staff_inventory))
display(staff_inventory[:5])

6391


[{'name': 'Melanie Addington',
  'title': 'Engineering Technician',
  'organization_name': 'USGS Water Resources Mission Area',
  'organization_link': 'https://www.usgs.gov/mission-areas/water-resources',
  'email': 'maddingt@usgs.gov',
  'profile': 'https://usgs.gov/staff-profiles/melanie-addington',
  'telephone': '228-688-1960'},
 {'name': 'Jason A Addison, Ph.D.',
  'title': 'Research Geologist',
  'organization_name': 'Geology, Minerals, Energy, and Geophysics Science Center',
  'organization_link': 'https://www.usgs.gov/centers/gmeg',
  'email': 'jaddison@usgs.gov',
  'profile': 'https://usgs.gov/staff-profiles/jason-a-addison',
  'telephone': '650-329-5271'},
 {'name': 'Songie Adebiyi',
  'title': 'Administrative Officer',
  'organization_name': 'Western Fisheries Research Center',
  'organization_link': 'https://www.usgs.gov/centers/wfrc',
  'email': 'sadebiyi@usgs.gov',
  'profile': 'https://usgs.gov/staff-profiles/songie-adebiyi',
  'telephone': '206-526-6287'},
 {'name': 'Mi

When running the entire inventory together, I found that there are some problems in the underlying profile system where multiple profile pages/records have been created for what is determined to be the same person based on uniqueness of emails within this collection. In the following process, I determine where these cases exist and then run a parallel process to get the overall length of the content from the individual pages assigned to an individual. I use this to determine which profile URL we want to treat as the right one for each given case. It's stupid to have to do this, and I hope the folks running the Profiles system will clean things up eventually.

This may create a problem for us in running everything to this point in an individual message paradigm as we would have no way of determining the specific URL among several to be used until we have everything all together. The processes are simple and quick enough not to exceed AWS Lambda limits, though, so we could run everything through this next point as a single process before building another queue of the actual profile page URLs we need to process fully.

In [8]:
unique_on_profile = [i for i in unique_everseen(staff_inventory)]
profile_records = list()
check_url_content_length = list()
for email, count in Counter([i["email"] for i in unique_on_profile]).most_common():
    if count > 1:
        check_url_content_length.extend([i["profile"] for i in unique_on_profile if i["email"] == email])
    else:
        profile_records.extend([i for i in unique_on_profile if i["email"] == email])

In [9]:
url_headers = list()

def header_accumulator(url):
    url_headers.append({
        "url": url,
        "content-length": len(requests.get(url).content)
    })

In [10]:
Parallel(n_jobs=20, prefer="threads")(
    delayed(header_accumulator)
    (
        i
    ) for i in tqdm.tqdm(check_url_content_length)
)

100%|██████████| 190/190 [00:15<00:00, 12.54it/s]


[None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,

Taking the results of getting content length, this is the process I worked out for determining which profile to actually assign to a given logical entity. The end result is a set of profile records that could now go on a queue for individual asynchronous processing.

In [11]:
for email, count in Counter([i["email"] for i in unique_on_profile]).most_common():
    if count > 1:
        common_urls = [i["profile"] for i in unique_on_profile if i["email"] == email]
        max_size = max([i["content-length"] for i in url_headers if i["url"] in common_urls])
        best_url = next((i["url"] for i in url_headers if i["url"] in common_urls and i["content-length"] == max_size), None)
        profile_records.append(
            next((i for i in unique_on_profile if i["profile"] == best_url), None)
        )

In [12]:
print(len(profile_records))
display(profile_records[10:15])

6247


[{'name': 'Pamela Agnew',
  'title': 'Workforce Data and Analysis Program Manager',
  'organization_name': 'Administration',
  'organization_link': 'https://www.usgs.gov/about/organization/science-support/human-capital',
  'email': 'pagnew@usgs.gov',
  'profile': 'https://usgs.gov/staff-profiles/pamela-agnew',
  'telephone': '703-648-7435'},
 {'name': 'Sean Kamran Ahdi',
  'title': 'Research Geophysicist',
  'organization_name': None,
  'organization_link': None,
  'email': 'sahdi@usgs.gov',
  'profile': 'https://usgs.gov/staff-profiles/sean-kamran-ahdi',
  'telephone': '303-273-8500'},
 {'name': 'Elizabeth Ahearn',
  'title': 'Lead Hydrologist',
  'organization_name': 'New England Water Science Center',
  'organization_link': 'https://www.usgs.gov/centers/new-england-water',
  'email': 'eaahearn@usgs.gov',
  'profile': 'https://usgs.gov/staff-profiles/elizabeth-ahearn',
  'telephone': '860-291-6745'},
 {'name': 'Erik Ahl',
  'title': 'Cartographer',
  'organization_name': 'National Ge

I ended up running into a number of issues due to the vagaries of trying to do anything over HTTP. The full set of processing would not necessarily complete every time I've tried to run it through, usually due to an HTTP connection pool error of some kind. This is not necessarily the result of some kind of throttling resulting from me sending multiple simultaneous requests but could be introduced by some other bottleneck. I'm hoping that the use of the message queue will help in this as I think we can run things in a guaranteed result mode of some kind, possibly with a "dead letter queue" to handle complete failures. For running here, I cache records into pickle files and then start over with those contents if something blows up.

The scrape_profile function will return a number of different errors if it encounters problems in processing. For my purposes here, I dump these to a list of errors. In a message queuing architecture, we may want to drop these onto another queue for some other handling. Right now, I'm just ignoring these until I determine what do to about whatever errors were encountered that make the records unusable for our current purposes.

In [24]:
if os.path.exists("profile_data.p"):
    profile_data = pickle.load(open("profile_data.p", "rb"))
else:
    profile_data = list()

if os.path.exists("profile_errors.p"):
    profile_errors = pickle.load(open("profile_errors.p", "rb"))
else:
    profile_errors = list()

def profile_accumulator(inventory_record):
    scraped_profile = usgs_web.scrape_profile(
        inventory_record["profile"],
        inventory_content=inventory_record
    )
    if "error" in scraped_profile:
        profile_errors.append(inventory_record)
    elif "entity" in scraped_profile:
        profile_data.append(scraped_profile)
    else:
        profile_errors.append(scraped_profile)


In [25]:
try:
    Parallel(n_jobs=40, prefer="threads")(
        delayed(profile_accumulator)
        (
            i
        ) for i in tqdm.tqdm([i for i in profile_records if i["profile"] not in [p["entity"]["reference"] for p in profile_data]])
    )
except Exception as e:
    pickle.dump(profile_data, open("profile_data.p", "wb"))
    pickle.dump(profile_errors, open("profile_errors.p", "wb"))
    pickle.dump(profile_misses, open("profile_misses.p", "wb"))
    print(e)

100%|██████████| 6247/6247 [13:20<00:00,  7.80it/s]


In [26]:
pickle.dump(profile_data, open("profile_data.p", "wb"))
pickle.dump(profile_errors, open("profile_errors.p", "wb"))
pickle.dump(profile_misses, open("profile_misses.p", "wb"))

The resulting data from profile scraping is now essentially suitable for creation of our final entities and statements/claims. The claims are unique to this particular dataset with other claims coming in from other sources, and so those can be placed directly into a claims index. If this is the first time an entity record has turned up in our system, then this is the best possible record we have for that record. However, if a record already exists for this entity, determined by identifiers, then we will want to evaluate this new information and decide what to do with it. In general, we are treating Profile Page information as pretty much best representation when there is a conflict in other sources. However, with the lack of basic quality control/validation in the Profiles application, we still end up with messiness like extra spaces in names that can't all be reasonably corrected for.

One of the main things we will introduce to the entity records through other processes will be additional identifiers matched to the individual. I've been updating the identifiers object in the documents to incorporate these new identifiers, and I've also experimented with going back through related claims on entities as subjects and updated those subject_identifiers to include the full slate of discovered identifiers. This makes queries simpler but is not absolutely necessary as we can start from identifier or come back to identifier in the entity objects and get to all claims based on all available identifiers.

In [29]:
print(len(profile_errors))
print(len(profile_data))
display(profile_data[:5])
display(profile_errors)

2
6245


[{'entity': {'reference': 'https://usgs.gov/staff-profiles/garrett-akie',
   'entity_created': '2020-11-20T16:11:49.173829',
   'entity_source': 'USGS Profile Page',
   'instance_of': 'Person',
   'name': 'Garrett Akie',
   'url': ['https://usgs.gov/staff-profiles/garrett-akie'],
   'identifiers': {'email': 'gakie@usgs.gov', 'orcid': '0000-0002-6356-7106'}},
  'claims': [{'claim_created': '2020-11-20T16:11:49.173843',
    'claim_source': 'USGS Profile Page',
    'reference': 'https://usgs.gov/staff-profiles/garrett-akie',
    'date_qualifier': '2020-11-20T16:11:49.173844',
    'subject_instance_of': 'Person',
    'subject_identifiers': {'email': 'gakie@usgs.gov',
     'orcid': '0000-0002-6356-7106'},
    'subject_label': 'Garrett Akie',
    'property_label': 'organization affiliation',
    'object_instance_of': 'Organization',
    'object_label': 'Colorado Water Science Center',
    'object_identifiers': {'url': 'https://www.usgs.gov/centers/co-water'}},
   {'claim_created': '2020-11-2

[{'profile': 'https://usgs.gov/staff-profiles/travis-owen-culp',
  'date_cached': '2020-11-20T16:14:45.008482',
  'display_name': 'Travis Owen Culp, MA Leadership',
  'profile_image_url': 'https://prd-wret.s3.us-west-2.amazonaws.com/assets/palladium/production/s3fs-public/styles/content_grid/public/thumbnails/image/20191209_071005%20%281%29.jpg',
  'organization_name': 'Dakota Water Science Center',
  'organization_link': 'https://www.usgs.gov/centers/dakota-water',
  'email': None,
  'orcid': None,
  'body_content_links': [],
  'expertise': [],
  'scraped_body_html': None},
 {'profile': 'https://usgs.gov/staff-profiles/gregory-shellenbarger',
  'date_cached': '2020-11-20T16:25:12.061455',
  'display_name': 'Gregory Shellenbarger',
  'profile_image_url': None,
  'organization_name': None,
  'organization_link': None,
  'email': None,
  'orcid': None,
  'body_content_links': [],
  'expertise': ['hydrology',
   'estuarine ecosystems',
   'wetland ecosystems',
   'sediment transport',
   

Once we scrape up our full set of records, further problems crop up with uniqueness of what should be reasonably persistent unique identifiers (email and orcid). These seem to be due to problems in the information content itself that needs cleanup. If we have all of our data together, we can check for that and make a choice about duplicates here. When we move to a message queue architecture, we will need to introduce this in the logic that decides where and how to route records. Since we will need someone to go in and actually correct something in the underlying records to uniquely identify them to the individual, we should probably put them into their own index so we can expose the records through an application somewhere. To this end, it might be best to put everything that comes out of the scraping process into a queue and then write a lambda handler that checks against the live index of unique entities and sends anything in question to another index.

The only problem there is that we could end up with a first-in-wins situation where the first-in record might be innaccurate. In the case shown below with the current dataset, we have an incorrect ORCID entered for one of these two individuals creating the collision. If we go look at the ORCID for 0000-0001-7520-6669, we can see that it applies to Margaret Lamont and not Raymond Carthy, but we have no way of knowing that until we run that check.

Another strategy we might employ is to essentially queue up a given tranche of new data coming into our system, check for duplicates like we have here, and then route dups off to their own space and more accurate records into our master set.

In [30]:
duplicate_orcids = [orcid for orcid, count in Counter([i["entity"]["identifiers"]["orcid"] for i in profile_data if "orcid" in i["entity"]["identifiers"]]).items() if count > 1]
duplicate_emails = [email for email, count in Counter([i["entity"]["identifiers"]["email"] for i in profile_data if "email" in i["entity"]["identifiers"]]).items() if count > 1]

unique_profiles = list()
held_profiles = list()
for item in profile_data:
    if "email" in item["entity"]["identifiers"] and item["entity"]["identifiers"]["email"] in duplicate_emails:
        held_profiles.append(item)
    elif "orcid" in item["entity"]["identifiers"] and item["entity"]["identifiers"]["orcid"] in duplicate_orcids:
        held_profiles.append(item)
    else:
        unique_profiles.append(item)

In [32]:
pickle.dump(unique_profiles, open("unique_profiles.p", "wb"))

In [34]:
[i["entity"] for i in held_profiles]

[{'reference': 'https://usgs.gov/staff-profiles/raymond-carthy',
  'entity_created': '2020-11-20T16:13:06.867421',
  'entity_source': 'USGS Profile Page',
  'instance_of': 'Person',
  'name': 'Raymond Carthy, Ph.D.',
  'url': ['https://usgs.gov/staff-profiles/raymond-carthy',
   'http://link.springer.com/article/10.1007/s12237-013-9741-x'],
  'identifiers': {'email': 'rayc@usgs.gov', 'orcid': '0000-0001-7520-6669'}},
 {'reference': 'https://usgs.gov/staff-profiles/margaret-lamont',
  'entity_created': '2020-11-20T16:18:10.802131',
  'entity_source': 'USGS Profile Page',
  'instance_of': 'Person',
  'name': 'Margaret Lamont, Ph.D.',
  'url': ['https://usgs.gov/staff-profiles/margaret-lamont'],
  'identifiers': {'email': 'mlamont@usgs.gov',
   'orcid': '0000-0001-7520-6669'}}]