This notebook works through the process of adding county-level government units to the GeoKB using the government units source from the USGS National Map. While there are other sources we could work against, this provides a way to demonstrate operating against National Map staged data in the AWS cloud, something we often need to do with other data assets such as Lidar point clouds for processing custom DEMs.

In [28]:
import boto3
import awswrangler as wr
import geopandas as gpd
import pandas as pd
from io import BytesIO
import zipfile
from joblib import Parallel, delayed
from tqdm import tqdm

from utils import sparql_query, property_lookup

from wikibaseintegrator.wbi_config import config as wbi_config
from wikibaseintegrator import WikibaseIntegrator, wbi_login, wbi_helpers
from wikibaseintegrator.datatypes import Item, String, ExternalID, URL, GlobeCoordinate

In [43]:
s3_resource = boto3.resource('s3')

wbi_config['MEDIAWIKI_API_URL'] = os.environ["WC_MEDIAWIKI_API_URL"]
wbi_config['SPARQL_ENDPOINT_URL'] = os.environ['WC_SPARQL_ENDPOINT_URL']
wbi_config['WIKIBASE_URL'] = os.environ['WC_WIKIBASE_URL']

# Use bot account for this specific task
geokb_auth = wbi_login.Login(
    user=os.environ["WC_BOT_INIT"],
    password=os.environ["WC_PASS_INIT"]
)
wbi = WikibaseIntegrator(login=geokb_auth)

### TNM Staged Products

There is lots of stuff in the StagedProducts folder of the prd-tnm bucket that could be interesting in other use cases. the following gets us a list of the zip files containing geopackage forms of the state files. I'm going this route instead of processing the national file to use the more open standard format (national is only in an Esri GDB).

In [7]:
tnm_gov_units = wr.s3.list_objects('s3://prd-tnm/StagedProducts/GovtUnit/GPKG')
tnm_state_gpkg = [i for i in tnm_gov_units if i.endswith('.zip')]
tnm_state_gpkg[:5]

['s3://prd-tnm/StagedProducts/GovtUnit/GPKG/GOVTUNIT_Alabama_State_GPKG.zip',
 's3://prd-tnm/StagedProducts/GovtUnit/GPKG/GOVTUNIT_Alaska_State_GPKG.zip',
 's3://prd-tnm/StagedProducts/GovtUnit/GPKG/GOVTUNIT_American_Samoa_State_GPKG.zip',
 's3://prd-tnm/StagedProducts/GovtUnit/GPKG/GOVTUNIT_Arizona_State_GPKG.zip',
 's3://prd-tnm/StagedProducts/GovtUnit/GPKG/GOVTUNIT_Arkansas_State_GPKG.zip']

The first function reads a zip file into memory and then loads it into geodataframe for processing. This saves having to extract the file to disc. It would be better if we used gzip or another file compressor as those can be read directly with some tools without going through this kind of process. This is not a super clean engineered function, and it could use a bunch of error trapping. I just happen to know that this is a pretty stable source and this should work for my immediate purposes. These data don't change real often, but we'll have to harden this eventually.

The second is just a simple helper for the final push of a new item to the GeoKB.

In [33]:
def gdf_from_s3_zip(path):
    # Parse out the bucket and key
    bucket = path.split("/")[2]
    key = "/".join(path.split("/")[3:])
    
    # Get the file object
    file_obj = s3_resource.Object(
        bucket_name=bucket, 
        key=key
    )
    # Read the file object into a buffer
    buffer = BytesIO(file_obj.get()["Body"].read())
    # Put the zip in the buffer
    z = zipfile.ZipFile(buffer)
    
    # Figure out the gpkg file name
    gpkg_file = next((i for i in z.namelist() if i.endswith('.gpkg')), None)
    
    if gpkg_file is None:
        return
    # Pull the file into a geodataframe
    return gpd.read_file(z.open(gpkg_file))

def get_unit_geo(s3_path):
    # Get the gpkg
    state_units = gdf_from_s3_zip(s3_path)

    # Add a centroid so we can populate coordinate location
    state_units['coordinate_location'] = state_units.geometry.apply(lambda x: x.centroid)
    state_units['lon'] = state_units['coordinate_location'].x
    state_units['lat'] = state_units['coordinate_location'].y

    # Add just the data we need
    prepped_data = pd.DataFrame(state_units[[
            "stco_fipscode",
            "gnis_name",
            "lon",
            "lat"
        ]]
    )
    
    return prepped_data

def add_item(label, description, aliases, claims):
    item = wbi.item.new()

    item.labels.set(language='en', value=label)
    item.descriptions.set(language='en', value=description)
    item.aliases.set(language='en', values=aliases)
    item.claims.add(claims)

    item.write()

# GeoKB Properties

We need to know what properties we are using to put information from this or any source into the GeoKB. I put a function in the utils.py that gets all properties via SPARQL and puts them in a simple dict so we can call "P" identifiers by name. For this case, we'll be using a number of the ExternalID properties like the FIPS codes along with the GNIS identifier.

I'm going to leave off the related wikidata item for now. We can use Wikidata's own use of FIPS codes and GNIS identifiers to pull back items claiming to be associated with those, but it is just a claim. We need to trust but verify if we want to leverage the linkage to do anything like get additional characteristics.

From the gov units schema, we are really just focused on the following properties:

* stco_fipscode - gives us a linkage to other systems and assets
* county_name - short name of the county used as an alias
* gnis_id - gives us a linkage to other systems and assets
* gnis_name - primary label with better context
* coordinate_location - computed centroid of the county boundary entered for informational purposes

In [9]:
geokb_props = property_lookup(wbi_config['SPARQL_ENDPOINT_URL'])
geokb_props

{'instance of': 'P1',
 'subclass of': 'P2',
 'reference item': 'P3',
 'reference url': 'P4',
 'reference statement': 'P5',
 'coordinate location': 'P6',
 'publication date': 'P7',
 'subject matter': 'P8',
 'ranking': 'P9',
 'ISO 3166-1 alpha-2 code': 'P10',
 'located in the administrative territorial entity': 'P11',
 'ISO 3166-2 code': 'P12',
 'FIPS 5-2 alpha': 'P13',
 'FIPS 5-2 numeric': 'P14',
 'corresponding wikidata property': 'P15',
 'related wikidata item': 'P16',
 'element symbol': 'P17',
 'SEDAR Identifier': 'P18',
 'MRDS commodity code': 'P19',
 'USGS Thesaurus ID': 'P20',
 'GNIS ID': 'P21',
 'FIPS 6-4': 'P22'}

In [13]:
# We need our identifiers for states so we can establish the linkage
# This will also drive what gov units we get, because we don't yet have the territories
query_us_states = """
PREFIX wd: <%(wb_domain)s/entity/>
PREFIX wdt: <%(wb_domain)s/prop/direct/>

SELECT DISTINCT ?item ?itemLabel ?fips_code
WHERE { ?item  wdt:P1  wd:Q229 . 
       ?item wdt:P14 ?fips_code .
        SERVICE wikibase:label 
          { bd:serviceParam  wikibase:language  "en" . } 
      }
""" % {'wb_domain': os.environ['WC_WIKIBASE_URL']}

geokb_states = sparql_query(
    endpoint=wbi_config['SPARQL_ENDPOINT_URL'],
    query=query_us_states,
    output='dict'
)
geokb_states[:5]

[{'item': 'https://geokb.wikibase.cloud/entity/Q230',
  'itemLabel': 'Michigan',
  'fips_code': '26'},
 {'item': 'https://geokb.wikibase.cloud/entity/Q231',
  'itemLabel': 'Louisiana',
  'fips_code': '22'},
 {'item': 'https://geokb.wikibase.cloud/entity/Q232',
  'itemLabel': 'Oklahoma',
  'fips_code': '40'},
 {'item': 'https://geokb.wikibase.cloud/entity/Q233',
  'itemLabel': 'California',
  'fips_code': '06'},
 {'item': 'https://geokb.wikibase.cloud/entity/Q234',
  'itemLabel': 'Georgia',
  'fips_code': '13'}]

### Entity/claim processing

A simple serial loop may seem like a dumb way to do this in committing information to the GeoKB. There are some interesting processing ideas for handling all of this a lot faster and taking advantage of parallel processing. This [project](https://github.com/UB-Mannheim/RaiseWikibase), in particular, is something I'm looking into. However, we are still spinning all this up and working through the limitations of our system. For things like this where we're only dealing with a pretty small number of records, it's not that big a deal to take a little time to complete.

In [7]:
# Set universal properties
claim_instance_of = Item(
    prop_nr=geokb_props['instance of'],
    value='Q481' # U.S. County
)

In [None]:
for st in geokb_states[1:]: # I ran one state as a test to make sure it all worked
    print("STARTED PROCESSING:", st['itemLabel'])
    geokb_id = st['item'].split('/')[-1]

    # Get the appropriate S3 file to process
    gov_unit_s3_path = next((i for i in tnm_state_gpkg if st['itemLabel'].replace(' ', '_') in i), None)
    
    # Set up an http reference
    http_ref = gov_unit_s3_path.replace('s3://prd-tnm/', 'https://prd-tnm.s3.amazonaws.com/')
    
    # Set up reference source to add to claims
    ref_source = URL(
        prop_nr=geokb_props['reference url'],
        value=http_ref
    )
    
    claim_county_in_state = Item(
        prop_nr=geokb_props['located in the administrative territorial entity'],
        value=geokb_id,
        references=[[ref_source]]
    )  
    
    # Get the gpkg
    state_units = gdf_from_s3_zip(gov_unit_s3_path)
    # Add a centroid so we can populate coordinate location
    # state_units['coordinate_location'] = state_units.geometry.apply(lambda x: x.centroid)
    # state_units['lon'] = state_units['coordinate_location'].x
    # state_units['lat'] = state_units['coordinate_location'].y
    
    print("PULLED AND PREPPED SOURCE DATA", gov_unit_s3_path)

    for index, row in state_units.iterrows():
        print("PROCESSING CLAIMS AND ADDING:", row.gnis_name)
        # Set county fips code
        claim_county_fips = ExternalID(
            prop_nr=geokb_props['FIPS 6-4'],
            value=row.stco_fipscode,
            references=[[ref_source]]
        )
        
        # Set GNIS code
        claim_county_gnis = ExternalID(
            prop_nr=geokb_props['GNIS ID'],
            value=row.gnis_id,
            references=[[ref_source]]
        )
        
        # Set coordinate location
        # claim_location = GlobeCoordinate(
        #     prop_nr=geokb_props['coordinate location'],
        #     latitude=row.lat,
        #     longitude=row.lon,
        #     references=[[ref_source]]
        # )
        
        # Send it
        add_item(
            label=row.gnis_name, # I prefer this form for the context
            description=f"a county in {st['itemLabel']}",
            aliases=row.county_name,
            claims=[
                claim_instance_of,
                claim_county_in_state,
                claim_county_fips,
                claim_county_gnis
            ]
        )



### Fixing a mistake

I messed up when I ran this yesterday and didn't include the claim_location. So, I need to run a different process to update items with coordinate location. This also points out how this kind of process really needs to be developed. For reference sources like this in the GeoKB, we need to be able to run something periodically based on the type of information and frequency of change. We need to be able to both establish a baseline and come back to improve on that baseline over time. I'm working through that more in some of the other notebooks so far.

Here I'm going to start from the perspective of what's already in the GeoKB and go back to my source material from the TNM staged data to get county geometry, generate centroid point coordinates, and then send it to the existing items as a new claim.

In [22]:
geokb_county_query = """
PREFIX wd: <%(wb_domain)s/entity/>
PREFIX wdt: <%(wb_domain)s/prop/direct/>

SELECT ?county ?countyLabel ?state ?stateLabel ?fipps
WHERE { 
    ?county  wdt:P1  wd:Q481 . 
    ?county wdt:P11 ?state .
    OPTIONAL {
      ?county wdt:P22 ?fipps .
    }
    SERVICE wikibase:label { bd:serviceParam  wikibase:language  "en" . } 
}
""" % {'wb_domain': os.environ['WC_WIKIBASE_URL']}

geokb_counties = sparql_query(
    endpoint=os.environ['WC_SPARQL_ENDPOINT_URL'],
    query=geokb_county_query,
    output='dataframe'
)
geokb_counties.head()

Unnamed: 0,county,countyLabel,state,stateLabel,fipps
0,https://geokb.wikibase.cloud/entity/Q482,Missaukee County,https://geokb.wikibase.cloud/entity/Q230,Michigan,26113
1,https://geokb.wikibase.cloud/entity/Q483,Delta County,https://geokb.wikibase.cloud/entity/Q230,Michigan,26041
2,https://geokb.wikibase.cloud/entity/Q484,Van Buren County,https://geokb.wikibase.cloud/entity/Q230,Michigan,26159
3,https://geokb.wikibase.cloud/entity/Q485,Berrien County,https://geokb.wikibase.cloud/entity/Q230,Michigan,26021
4,https://geokb.wikibase.cloud/entity/Q486,Gladwin County,https://geokb.wikibase.cloud/entity/Q230,Michigan,26051


Now I have a choice, I can pull all the data I need to work with from the prd-tnm bucket, run through and get everything prepped, and then send it to the GeoKB, or I can pull it state-by-state like I did above. I'll see how the former works.

This would be a great place to work up a parallel process to simply grab each source file, process to get what we want, and send to an accumulator. I took a quick crack at that approach but I need to brush up on Dask and S3 to figure some stuff out that I'm doing wrong. In the meantime, the data prep process is really fast.

In [34]:
%%time
source_files = []
for state_name in geokb_counties.stateLabel.unique():
    st = state_name.replace(' ', '_')
    gov_unit_s3_path = next((i for i in tnm_state_gpkg if st in i), None)
    if gov_unit_s3_path is not None:
        source_files.append(get_unit_geo(gov_unit_s3_path))

CPU times: user 33.1 s, sys: 7.56 s, total: 40.7 s
Wall time: 1min 24s


In [37]:
df_unit_coords = pd.concat(source_files)
df_unit_coords.head(10)

Unnamed: 0,stco_fipscode,gnis_name,lon,lat
0,26113,Missaukee County,-85.094682,44.33732
1,26041,Delta County,-86.870597,45.791636
2,26159,Van Buren County,-86.306415,42.285106
3,26021,Berrien County,-86.685421,41.995778
4,26051,Gladwin County,-84.388246,43.990674
5,26063,Huron County,-82.855508,43.910068
6,26121,Muskegon County,-86.535234,43.28993
7,26035,Clare County,-84.847861,43.987892
8,26111,Midland County,-84.388109,43.646857
9,26015,Barry County,-85.308968,42.595028


In [39]:
df_geokb_county_coords = pd.merge(
    left=geokb_counties,
    right=df_unit_coords,
    how="left",
    left_on="fipps",
    right_on="stco_fipscode"
)

In [44]:
for index, row in df_geokb_county_coords.iterrows():
    # Get source data path so we can build a reference
    gov_unit_s3_path = next((i for i in tnm_state_gpkg if row.stateLabel.replace(' ', '_') in i), None)
    
    # Set up an http reference
    http_ref = gov_unit_s3_path.replace('s3://prd-tnm/', 'https://prd-tnm.s3.amazonaws.com/')
    
    # Set up reference source to add to claims
    ref_source = URL(
        prop_nr=geokb_props['reference url'],
        value=http_ref
    )
    
    # Build coordinate location claim
    claim_location = GlobeCoordinate(
        prop_nr=geokb_props['coordinate location'],
        latitude=row.lat,
        longitude=row.lon,
        references=[[ref_source]]
    )
    
    # Get the county item
    item = wbi.item.get(row.county.split("/")[-1])
    
    # Add the claim
    item.add_claims(claim_location)
    
    item.write()
    


Service unavailable (HTTP Code 502). Sleeping for 60 seconds.
Service unavailable (HTTP Code 502). Sleeping for 60 seconds.
Service unavailable (HTTP Code 502). Sleeping for 60 seconds.
Service unavailable (HTTP Code 502). Sleeping for 60 seconds.
Service unavailable (HTTP Code 502). Sleeping for 60 seconds.
Service unavailable (HTTP Code 502). Sleeping for 60 seconds.


On the processing front, my big time sink right now is on commitments to Wikibase. This should go faster than it is. Wikibase (and all the Wikiverse stuff) is awesome in that it is a big exciting open source project that has lots of activity and interest. That's great, but it also means that big group has come up with a whole diversity of approaches and supporting software. Some of that works really well in some circumstances but not in others. Even here, I've settled into using WikbaseIntegrator as my foundation because I ran into issues with pyWikibot. Jonathan Oliveros with Xentity ran into issues with WBI and is using pyWikibot. They are very different in approach and syntax to doing the same work.

I think there's already some built out capacity for pushing millions of things in short order into Wikibase I need to research before trying to simply parallelize what I've got here.