# Adjust Invalid Data

## Basic Setup

In [1]:
import os
import sqlite3
from collections import defaultdict
from hashlib import sha256
from pathlib import Path
from pprint import pp

import pandas as pd
from tqdm import tqdm

In [2]:
DATA_DIR = Path('..') / 'data'
INTERIM_DIR = DATA_DIR / '01_interim'

IN_DB = INTERIM_DIR / 'gazetteer_03_idigbio_2020-03-30.db'
OUT_DB = DATA_DIR / 'gazetteer_04_idigbio_2020-03-30.db'

CHUNK = 1_000_000

## Helper functions

In [3]:
def display_all(df):
    with pd.option_context('display.max_rows', None, 'display.max_columns', None):
        display(df)

## Database Setup

In [4]:
if OUT_DB.exists():
    os.remove(OUT_DB)

### Create a Table for Logging Adjusted Values

In [5]:
df = pd.DataFrame(
    columns=['field', 'literal', 'becomes', 'floor', 'ceiling'])

with sqlite3.connect(OUT_DB) as cxn:
    df.to_sql('adjustments', cxn, if_exists='replace', index=False)

### Database Related Functions

Get the list of columns in a table. Skip any columns that require special handling

In [6]:
def dict_factory(cursor, row):
    dict_ = {}
    for idx, col in enumerate(cursor.description):
        dict_[col[0]] = row[idx]
    return dict_

In [7]:
def get_columns(table='gazetteer', db=IN_DB, specials=None):
    if specials is None:
        specials = """coreid hash source locality""".split()

    sql = f'PRAGMA table_info({table});'

    with sqlite3.connect(db) as cxn:
        cxn.row_factory = sqlite3.Row
        columns = [r[1] for r in cxn.execute(sql) if r[1] not in specials]

    return columns

Get a list of all distinct text values for manual filtering

In [8]:
def text_field(field):
    sql = f"""
        select {field}, count(*) as n
          from gazetteer
      group by {field}
      order by n desc, {field}
    """
    with sqlite3.connect(IN_DB) as cxn:
        df = pd.read_sql(sql, cxn)
    return df

### Get Columns

In [9]:
COLUMNS = get_columns()
COLUMNS

['continent',
 'coordinatePrecision',
 'coordinateUncertaintyInMeters',
 'country',
 'countryCode',
 'county',
 'decimalLatitude',
 'decimalLongitude',
 'geodeticDatum',
 'georeferenceSources',
 'higherGeography',
 'island',
 'islandGroup',
 'locationRemarks',
 'maximumDepthInMeters',
 'maximumElevationInMeters',
 'minimumDepthInMeters',
 'minimumElevationInMeters',
 'municipality',
 'stateProvince',
 'verbatimCoordinateSystem',
 'verbatimCoordinates',
 'verbatimDepth',
 'verbatimElevation',
 'waterBody']

## Examine Fields

### continent

**string**

In [10]:
field = 'continent'

df = text_field(field)
df.shape

# display_all(df)

(246, 2)

In [11]:
df = pd.DataFrame(data={'literal': [
    'na',
    'no higher geography data',
    'no higher geography recorded',
    'not determined',
    'not in specify tree',
    'unassigned',
    'undefined',
    'unknown captive',
    'unknown continent',
    'unplaced',
    'unrecorded',
    'unspecified',
]})

df['becomes'] = None
df['field'] = field

with sqlite3.connect(OUT_DB) as cxn:
    df.to_sql('adjustments', cxn, if_exists='append', index=False)

### coordinatePrecision

**string**

**no changes**

In [12]:
field = 'coordinatePrecision'

# df = text_field(field)
# df.shape

# display_all(df)

### coordinateUncertaintyInMeters

**numeric**

In [13]:
field = 'coordinateUncertaintyInMeters'

df = pd.DataFrame(data={
    'field': [field],
    'floor': [0],
    'ceiling': [40_000 / 2 * 1000],
})

with sqlite3.connect(OUT_DB) as cxn:
    df.to_sql('adjustments', cxn, if_exists='append', index=False)

### country

**string**

In [14]:
field = 'country'

df = text_field(field)
df.shape

# display_all(df)

(2665, 2)

In [15]:
df = pd.DataFrame(data={'literal': [
    '0',
    '5',
    '1971',
    '2008',
    '2012',
    'b',
    'i',
    'no aplica',
    'no data',
    'no disponible',
    'testes-2x1',
    'testes-3x2',
    'testes-4x2',
    'testes-4x3',
    'testes-5x3',
    'testes-5x4',
    'testes-6x4',
    'undefined',
    'unknown',
    'unknown captive',
    'unknown country',
]})

df['becomes'] = None
df['field'] = field

with sqlite3.connect(OUT_DB) as cxn:
    df.to_sql('adjustments', cxn, if_exists='append', index=False)

### countryCode

**string**

**no changes**

In [16]:
field = 'countryCode'

df = text_field(field)
df.shape

# display_all(df)

(481, 2)

### county

**string**

In [17]:
field = 'county'

df = text_field(field)
df.shape

# display_all(df)

(55551, 2)

In [18]:
df = pd.DataFrame(data={'literal': [
    'no additional locality info',
    'no additional locality info given',
    'no aplica',
    'no county',
    'no county given',
    'no county record',
    'no data',
    'no aplica',
    'no data',
    'no disponible',
    'no locality info given',
    'no locality record',
    'none',
    'none given',
    'none or unknown',
    'none unknown or numbered census division',
    'unknown',
    'unknown county',
    'unknown e',
    'unknown w',
]})

df['becomes'] = None
df['field'] = field

with sqlite3.connect(OUT_DB) as cxn:
    df.to_sql('adjustments', cxn, if_exists='append', index=False)

### decimalLatitude

**numeric**

In [19]:
field = 'decimalLatitude'

df = pd.DataFrame(data={
    'field': [field],
    'floor': [-90.0],
    'ceiling': [90.0],
})

with sqlite3.connect(OUT_DB) as cxn:
    df.to_sql('adjustments', cxn, if_exists='append', index=False)

### decimalLongitude

**numeric**

In [20]:
field = 'decimalLongitude'

df = pd.DataFrame(data={
    'field': [field],
    'floor': [-180.0],
    'ceiling': [180.0],
})

with sqlite3.connect(OUT_DB) as cxn:
    df.to_sql('adjustments', cxn, if_exists='append', index=False)

### geodeticDatum

**string**

In [21]:
field = 'geodeticDatum'

df = text_field(field)
df.shape

# display_all(df)

(19400, 2)

In [22]:
df = pd.DataFrame(data={'literal': [
    'no disponible',
    'none indicated',
    'not georef',
    'not given',
    'not provided',
    'not recorded',
    'uknown',
    'uniknown',
    'unk',
    'unkknown',
    'unknown',
    'unknwon',
]})

df['becomes'] = None
df['field'] = field

with sqlite3.connect(OUT_DB) as cxn:
    df.to_sql('adjustments', cxn, if_exists='append', index=False)

### georeferenceSources

**string**

**no changes**

In [23]:
field = 'georeferenceSources'

# df = text_field(field)
# df.shape

# display_all(df)

### higherGeography

**string**

In [24]:
field = 'higherGeography'

df = text_field(field)
df.shape

# display_all(df)

(237590, 2)

In [25]:
df = pd.DataFrame(data={'literal': [
    'no data',
    'unknown',
    'unknown captive unknown captive',
    'unknown no data',
    'unknown unknown',
    'unplaced'
]})

df['becomes'] = None
df['field'] = field

with sqlite3.connect(OUT_DB) as cxn:
    df.to_sql('adjustments', cxn, if_exists='append', index=False)

### island

**string**

**no changes**

In [26]:
field = 'island'

# df = text_field(field)
# df.shape

# display_all(df)

### islandGroup

**string**

In [27]:
field = 'islandGroup'

df = text_field(field)
df.shape

# display_all(df)

(1247, 2)

In [28]:
df = pd.DataFrame(data={'literal': [
    'unknown',
]})

df['becomes'] = None
df['field'] = field

with sqlite3.connect(OUT_DB) as cxn:
    df.to_sql('adjustments', cxn, if_exists='append', index=False)

### maximumDepthInMeters

**numeric**

**no changes**

### maximumElevationInMeters

**numeric**

**no changes**

### minimumDepthInMeters

**numeric**

**no changes**

### minimumElevationInMeters

**numeric**

**no changes**

### municipality

**string**

In [29]:
field = 'municipality'

df = text_field(field)
df.shape

# display_all(df)

(64273, 2)

In [30]:
df = pd.DataFrame(data={'literal': [
    'none',
    'unknown',
]})

df['becomes'] = None
df['field'] = field

with sqlite3.connect(OUT_DB) as cxn:
    df.to_sql('adjustments', cxn, if_exists='append', index=False)

### stateProvince

**string**

In [31]:
field = 'stateProvince'

df = text_field(field)
df.shape

# display_all(df)

(24454, 2)

In [32]:
df = pd.DataFrame(data={'literal': [
    'none',
    'none or unknown',
    'unk',
    'unknown',
    'unknown location',
    'unplaced',
    'unplaced state',
    'unrecorded',
    'unspecified',
    'unspecified state',
]})

df['becomes'] = None
df['field'] = field

with sqlite3.connect(OUT_DB) as cxn:
    df.to_sql('adjustments', cxn, if_exists='append', index=False)

### verbatimCoordinateSystem

**string**

**no changes**

### verbatimCoordinates

**string**

**no changes**

### verbatimDepth

**string**

**no changes**

### verbatimElevation

**string**

**no changes**

### waterBody

**string**

In [33]:
field = 'waterBody'

df = text_field(field)
df.shape

# display_all(df)

(19847, 2)

In [34]:
df = pd.DataFrame(data={'literal': [
    'no data',
    'none given',
    'unknown',
]})

df['becomes'] = None
df['field'] = field

with sqlite3.connect(OUT_DB) as cxn:
    df.to_sql('adjustments', cxn, if_exists='append', index=False)

## Adjust Fields

### Get adjustments

In [35]:
SPECIALS = """coreid hash source""".split()
HASH_COLS = get_columns(specials=SPECIALS)
ALL_COLS = SPECIALS + HASH_COLS
HASH_COLS

['locality',
 'continent',
 'coordinatePrecision',
 'coordinateUncertaintyInMeters',
 'country',
 'countryCode',
 'county',
 'decimalLatitude',
 'decimalLongitude',
 'geodeticDatum',
 'georeferenceSources',
 'higherGeography',
 'island',
 'islandGroup',
 'locationRemarks',
 'maximumDepthInMeters',
 'maximumElevationInMeters',
 'minimumDepthInMeters',
 'minimumElevationInMeters',
 'municipality',
 'stateProvince',
 'verbatimCoordinateSystem',
 'verbatimCoordinates',
 'verbatimDepth',
 'verbatimElevation',
 'waterBody']

In [36]:
replace = defaultdict(set)
span = {}

with sqlite3.connect(OUT_DB) as cxn_out:
    cxn_out.row_factory = dict_factory

    for row in cxn_out.execute('select * from adjustments;'):
        field = row['field']
        literal = row['literal']

        if literal:
            replace[field].add(literal)
        else:
            span[field] = (float(row['floor']), float(row['ceiling']))

# print(span)
# replace

### Perform Adjustments and Rehash

In [37]:
create = f"""
    create table if not exists gazetteer ({','.join(ALL_COLS)});
"""
with sqlite3.connect(OUT_DB) as cxn_out:
    cxn_out.execute(create)

In [38]:
select = 'select * from gazetteer;'

insert = f"""
    insert into gazetteer ({','.join(ALL_COLS)})
    values ({','.join(['?'] * len(ALL_COLS))});
"""

In [39]:
batch = []

with sqlite3.connect(IN_DB) as cxn_in:
    cxn_in.row_factory = dict_factory

    with sqlite3.connect(OUT_DB) as cxn_out:

        for in_row in tqdm(cxn_in.execute(select)):
            out_row = []

            for col in ALL_COLS:
                value = in_row[col]

                if col in replace and value in replace[col]:
                    value = None

                elif col in span:
                    adjust = span[col]
                    if value is not None and (value < adjust[0] or value > adjust[1]):
                        value = None

                out_row.append(value)

            l_hash = b'|'.join(str(c).encode() for c in out_row[3:])
            out_row[1] = sha256(l_hash).hexdigest()

            batch.append(out_row)

            if len(batch) >= CHUNK:
                cxn_out.executemany(insert, batch)
                cxn_out.commit()
                batch = []

        if len(batch):
            cxn_out.executemany(insert, batch)
            cxn_out.commit()

16270042it [09:13, 29390.03it/s]


### Delete Row Where the Decimal Latitude or Longitude Is Null

By setting things to null it is now possible that we set the decimal coordinates to null. Remove those records.

In [42]:
sql = """
    delete from gazetteer
     where decimalLatitude is null
        or decimalLongitude is null;
"""

with sqlite3.connect(OUT_DB) as cxn:
    cxn.executescript(sql)
    cxn.commit()

### Create Index on Hash

In [40]:
sql = 'create index hash on gazetteer (hash);'

with sqlite3.connect(OUT_DB) as cxn:
    cxn.executescript(sql)

### Delete Duplicate Gazetteer Records

By setting things to null it is now possible that more records hash to the same value. Remove those new duplicate records.

In [44]:
sql = """
with dupes as (
    select hash, count(*) as n
      from gazetteer
  group by hash
    having n > 1),
keeps as (
    select ROWID as keeper, * 
      from gazetteer
     where hash in (select hash from dupes)
  group by hash
    having min(rowid))
delete from gazetteer
 where hash in (select hash from dupes)
   and ROWID not in (select keeper from keeps)
"""

with sqlite3.connect(OUT_DB) as cxn:
    cxn.executescript(sql)
    cxn.commit()