This notebook provides an example schema to follow, when merging data to the gazetteer with feature records. In this script we are merging data from Geonames. The same script can be used to load data from other sources such as OpenStreetMap. Change the field names accordingly to your datasource when loading data from other source.

### Connect to the database

In [None]:
import psycopg2

conn = psycopg2.connect(
    database = "biowhere-gazetteer",
    user = "postgres",
    host = "127.0.0.1",
    password = "<password>",
    port = 1234)

cur = conn.cursor()

### Load geonames to postgresql table using CSV
Geonames data for New Zealand can be downloaded from [here](https://download.geonames.org/export/dump/)

In [None]:
import geopandas as gpd
from sqlalchemy import create_engine
from geoalchemy2 import Geometry, WKTElement

# Load the CSV file into a GeoDataFrame and generate the 'geometry' column
gdf = gpd.read_file('./path/to/geonames.csv')
gdf['geometry'] = gpd.points_from_xy(gdf.longitude, gdf.latitude, crs="EPSG:4326")

# Define database connection details
db_url = "postgresql://postgres:<password>@localhost:<port>/biowhere-gazetteer"
table_name = "geonames"

# Create the database engine
engine = create_engine(db_url)

# Use GeoAlchemy2 to prepare geometries for PostGIS
gdf['geometry'] = gdf['geometry'].apply(lambda geom: WKTElement(geom.wkt, srid=4326))

# Rename the 'geometry' column to match PostGIS conventions
gdf = gdf.rename(columns={'geometry': 'geom'})

# Write the GeoDataFrame to PostGIS
gdf.to_sql(
    table_name,
    engine,
    if_exists='replace',
    index=False,
    dtype={'geom': Geometry('POINT', srid=4326)}
)

print(f"Data successfully written to the '{table_name}' table in PostGIS.")

### Normalize geonames feature codes to detect duplicates

In [None]:
# Mapping GeoNames feature codes to more common feature types with associated feature classes
GEONAMES_FEATURE_CODE_MAPPING = {
    # Administrative (A)
    'ADM1': 'First-order Administrative Division',
    'ADM2': 'Second-order Administrative Division',
    'ADM3': 'Third-order Administrative Division',
    'ADM4': 'Fourth-order Administrative Division',
    'ADMD': 'Administrative Division',

    # Populated places (P)
    'PPL': 'Populated Place',
    'PPLA':'Seat of a First-order Administrative Division',
    'PPLC': 'Capital of a Political Entity',
    'PPLF': 'Farm Village',

    # Hydrographic (H)
    'STM': 'Stream',
    'LKE': 'Lake',
    'PND': 'Pond',
    'CNLA': 'Artificial Canal',

    # Terrain (T)
    'MT': 'Mountain',
    'HLL': 'Hill',
    'PK': 'Peak',
    'VAL': 'Valley',

    # Spot features (S)
    'AIRP': 'Airport',
    'SCH': 'School',
    'HSP': 'Hospital',
    'CH': 'Church',
}

# Mapping feature class codes to normalized names
GEONAMES_FEATURE_CLASS_MAPPING = {
    'A': 'Administrative Region',
    'H': 'Hydrographic',
    'L': 'Area',
    'P': 'Populated Place',
    'R': 'Road / Railroad',
    'S': 'Spot Feature',
    'T': 'Terrain',
    'U': 'Undersea',
    'V': 'Vegetation'
}

def normalize_feature_code(feature_code, feature_class=None):
    """
    Normalize a GeoNames feature code into a human-readable feature type.
    If the feature code is not found, use the feature class to derive the normalized class.

    Parameters:
        feature_code (str): GeoNames feature code.
        feature_class (str, optional): GeoNames feature class code.

    Returns:
        str: Normalized feature description
    """
    if feature_code in GEONAMES_FEATURE_CODE_MAPPING:
        return GEONAMES_FEATURE_CODE_MAPPING.get(feature_code)
    elif feature_class:
        return GEONAMES_FEATURE_CLASS_MAPPING.get(feature_class)
    else:
        return feature_code

### Identify the duplicates records with current gazetteer entries


In [None]:
def query_nearby_geometries(cur, geometry_wkt):
    """
    Queries all spatial tables for geometries within radius_km of the given WKT geometry.
    """
    radius_km = 2
    tables = [
        "SpatialGeometryRepresentation_point",
        "SpatialGeometryRepresentation_line",
        "SpatialGeometryRepresentation_polygon",
    ]
    all_results = []
    for table in tables:
        query = f"""
            SELECT *
            FROM {table}
            WHERE ST_Distance(
                ST_Centroid(geometry)::geography,
                ST_Centroid(ST_GeomFromText(%s, 4326))::geography
            ) < %s
        """
        cur.execute(query, (geometry_wkt, radius_km * 1000))
        all_results.extend(cur.fetchall())
    return all_results


In [None]:
def fetch_feature_infor_from_name(cur, name):
    records = []
    cur.execute("""
        SELECT feature_id FROM FeatureName WHERE featurename = %s
    """, (name,))
    feature_ids = cur.fetchall()
    for feature_id in feature_ids:
        feature_id = feature_id[0]
        cur.execute("""
            SELECT featureclass FROM FeatureType WHERE feature_id = %s
        """, (feature_id,))
        feature_type_row = cur.fetchone()
        feature_class = feature_type_row[0] if feature_type_row else None
        cur.execute("""
            SELECT id FROM SpatialGeometryRepresentation WHERE feature_id = %s
        """, (feature_id,))
        geometry_id = cur.fetchone()[0]
        geometry = None

        cur.execute("""
            SELECT geometry FROM SpatialGeometryRepresentation_point WHERE spatialgeometryrepresentation_id = %s
        """, (geometry_id,))
        result = cur.fetchone()
        if result and result[0]:
            geometry = load_wkb(bytes.fromhex(result[0]))

        if geometry is None:
            cur.execute("""
                SELECT geometry FROM SpatialGeometryRepresentation_polygon WHERE spatialgeometryrepresentation_id = %s
            """, (geometry_id,))
            result = cur.fetchone()
            if result and result[0]:
                geometry = load_wkb(bytes.fromhex(result[0]))

        if geometry is None:
            cur.execute("""
                SELECT geometry FROM SpatialGeometryRepresentation_line WHERE spatialgeometryrepresentation_id = %s
            """, (geometry_id,))
            result = cur.fetchone()
            if result and result[0]:
                geometry = load_wkb(bytes.fromhex(result[0]))
        if geometry:
            centroid = geometry.centroid
            coords = (centroid.y, centroid.x)
            latitude = coords[0]
            longitude = coords[1]
            records.append((feature_id, feature_class, geometry, latitude, longitude))
    # print(f"records: {records}")
    return records

In [None]:
def fetch_feature_info(cur, geometry_id):
        """
        Fetches feature_id, name, and category info for a given geometry ID.
        """
        cur.execute("""
            SELECT feature_id FROM SpatialGeometryRepresentation WHERE id = %s
        """, (geometry_id,))
        feature_id = cur.fetchone()[0]

        cur.execute("""
            SELECT featureClass FROM FeatureType WHERE feature_id = %s
        """, (feature_id,))
        feature_class = cur.fetchone()[0]

        cur.execute("""
            SELECT featureName FROM FeatureName WHERE feature_id = %s
        """, (feature_id,))
        feature_name = cur.fetchone()[0]

        return feature_id, feature_class, feature_name

In [None]:
import subprocess
import os
from shapely.wkb import loads as load_wkb
from geopy.distance import geodesic
import pandas as pd


def check_duplicates_with_geonames(rows, batch_num):
    """
    For each geonames record, queries nearby spatial geometries within a specified radius,
    extracts attributes, and performs duplicate detection using a downstream prediction script.
    """
    all_predictions_df = pd.DataFrame()
    all_duplicates_df = pd.DataFrame()
    data_records = []
    for count, row in enumerate(rows, start=1):
        print(f"Processing row {count}: {row}")

        geonameid, name, alternatenames, feature_class, feature_code, geometry_hex, latitude, longitude = row
        left_geometry = load_wkb(bytes.fromhex(geometry_hex))
        left_coords = (latitude, longitude)
        left_category = normalize_feature_code(feature_code, feature_class)

        with conn.cursor() as cur:
            nearby_records = query_nearby_geometries(cur, left_geometry.wkt)
            print(f"Found {len(nearby_records)} nearby records.")

            for record in nearby_records:
                record_geometry = load_wkb(bytes.fromhex(record[2]))
                centroid = record_geometry.centroid
                right_coords = (centroid.y, centroid.x)

                distance_km = (
                    geodesic(left_coords, right_coords).kilometers
                    if None not in left_coords and None not in right_coords
                    else None
                )

                feature_id, right_category, right_name = fetch_feature_info(cur, record[5])

                data_records.append({
                    "geonameid": geonameid,
                    "feature_id": feature_id,
                    "left_name": name,
                    "left_category": left_category,
                    "left_geometry": left_geometry,
                    "left_latitude": latitude,
                    "left_longitude": longitude,
                    "right_name": right_name,
                    "right_category": right_category,
                    "right_geometry": record_geometry,
                    "right_latitude": right_coords[0],
                    "right_longitude": right_coords[1],
                    "distance_km": distance_km
                })

            existing_keys = set(
                (r["feature_id"], r["right_name"])
                for r in data_records
            )
            similar_name_records = fetch_feature_infor_from_name(cur, name)
            print(f"Found {len(similar_name_records)} similar name records.")
            for record in similar_name_records:
                feature_id, right_category, right_geometry, right_latitude, right_longitude = record
                right_coords = (right_latitude, right_longitude)
                distance_km = (
                    geodesic(left_coords, right_coords).kilometers
                    if None not in left_coords and None not in right_coords
                    else None
                )

                data_records.append({
                    "geonameid": geonameid,
                    "feature_id": feature_id,
                    "left_name": name,
                    "left_category": left_category,
                    "left_geometry": left_geometry,
                    "left_latitude": latitude,
                    "left_longitude": longitude,
                    "right_name": name,
                    "right_category": right_category,
                    "right_geometry": right_geometry,
                    "right_latitude": right_latitude,
                    "right_longitude": right_longitude,
                    "distance_km": distance_km
                })

    # Save input data
    df_input = pd.DataFrame(data_records)
    # print(df_input)
    input_pkl = f"tmp/{batch_num}_input.pkl"
    df_input.to_pickle(input_pkl)
    # df_input.to_csv(f"tmp/{batch_num}_input.csv")

    # Run the prediction script
    output_pkl = f"tmp/{batch_num}_output.pkl"
    predict_command = (
        f"python predict.py "
        f"--input_file {input_pkl} "
        f"--output_file {output_pkl} "
        f"--input_crs epsg:4326 "
        f"--use_crs epsg:3857 "
        f"--run_att_aff "
        f"--device cpu "
        f"--language_model bert"
    )
    try:
        # Run the command with subprocess and capture output
        result = subprocess.run(
            predict_command,
            check=True,
            timeout=1000,
            capture_output=True,
            text=True
        )
    except subprocess.TimeoutExpired:
        print(f"Batch {batch_num}: Prediction script timed out after 360 seconds")
        return False
    except subprocess.CalledProcessError as e:
        print(f"Batch {batch_num}: Prediction script failed with exit code {e.returncode}")
        print("Error output:")
        print(e.stderr)
        return False
    except Exception as e:
        print(f"Batch {batch_num}: Unexpected error: {str(e)}")
        return False


    # Load and process prediction results
    if os.path.exists(output_pkl):
        df_output = pd.read_pickle(output_pkl)
        all_predictions_df = pd.concat([all_predictions_df, df_output], ignore_index=True)

        duplicates_df = df_output[df_output['prediction'] == 1]
        if duplicates_df.empty:
            print("No duplicates found.")
        else:
            print("Duplicates detected.")
            all_duplicates_df = pd.concat([all_duplicates_df, duplicates_df], ignore_index=True)
    else:
        print("Prediction output file not found.")

    # Save results
    file_to_save = f"tmp/{batch_num}_prediction.csv"
    all_duplicates_df.to_csv(file_to_save, index=False)
    return None

In [None]:
BATCH_SIZE = 1000

# Step 1: Execute a single query to get up to 10,000 rows in order
cur.execute("""
    SELECT geonameid, name, alternatenames, feature_class, feature_code, geom, latitude, longitude
    FROM geonames_manawatu
    ORDER BY geonameid
""")
batch_num = 1

# Step 2: Process in batches of 1,000
# Processing as batches due to the limitations in duplicate detection algorithm
while True:
    batch = cur.fetchmany(BATCH_SIZE)
    if batch_num != 13:
        batch_num += 1
        continue
    if not batch:
        break
    # batch = batch[:-1]
    print(f"Processing the {(batch_num)} batch...")
    check_duplicates_with_geonames(batch, batch_num)
    batch_num += 1
    break

print("Finished processing.")
cur.close()
conn.close()

# Merge csvs with duplicates together
df1 = pd.read_csv('tmp/1_predictions.csv')
df2 = pd.read_csv('tmp/2_predictions.csv')
df3 = pd.read_csv('tmp/3_predictions.csv')
duplicates = pd.concat([df1, df2, df3])

### Merge records from Geonames to the gazetteer

#### Load records from GeoNames

In [None]:
import pandas as pd
from sqlalchemy import create_engine

# Connection setup
engine = create_engine("postgresql://postgres:<password>@127.0.0.1:<port>/biowhere-gazetteer")

# Read the geonames source table into a DataFrame
geonames_df = pd.read_sql("SELECT * FROM geonames", engine)

#### Filter the new records and records that refer to existing features in the gazetteer

In [None]:
duplicate_ids = set(duplicates['geonameid'])
duplicate_ids = set(str(x) for x in duplicate_ids)

print(f"Number of geonames records: {len(geonames_df)}")

df_duplicates = geonames_df[geonames_df['geonameid'].isin(duplicate_ids)]
df_new = geonames_df[~geonames_df['geonameid'].isin(duplicate_ids)]

print(f"Number of duplicates: {len(df_duplicates)}")
print(f"Number of new records: {len(df_new)}")

#### Merge data from Geonames to the gazetteer

In [None]:
import requests
import xml.etree.ElementTree as ET

# Get the Maori name of a place if present
def get_geonames_maori_name(geonames_id):
    base_url = "http://api.geonames.org/get"
    feature_id = geonames_id
    username = "quaketext"
    style = "FULL"
    params = {"lang": "en"}
    url = f"{base_url}?geonameId={feature_id}&username={username}&style={style}"
    if params:
        url += "&" + "&".join(f"{key}={value}" for key, value in params.items())

    response = requests.get(url)
    mi_name = ''
    if response.status_code == requests.codes.ok:
        xml_data = response.text
        root = ET.fromstring(xml_data)
        mi_name = root.find(".//alternateName[@lang='mi']")
        if mi_name!=None:
            mi_name = mi_name.text
    else:
        print(f"Error: {response.status_code} - {response.reason}")
    return mi_name

##### Add new features from Geonames

In [None]:
def add_geonames_data(df_batch):
    for index, row in df_batch.iterrows():
        print(f"Processing row {index} with geonameid {row['geonameid']}")
        # Add feature
        cur.execute("INSERT INTO Feature (featureDescription, inferredFlag, lastUpdateDate, lastUpdateUser) VALUES (NULL, NULL, current_timestamp, 'AF') RETURNING id")
        feature_id = cur.fetchone()[0]
        print(f"  Inserted Feature with id: {feature_id}")

        # Set feature name
        maori_name = get_geonames_maori_name(row['geonameid'])
        if maori_name:
            cur.execute("INSERT INTO FeatureName (featureName, language, lastUpdateDate, lastUpdateUser, feature_id) VALUES (%s, 'mi', current_timestamp, 'AF', %s) " "RETURNING id", (maori_name, feature_id))
            print(f"  Inserted Maori FeatureName: {maori_name}")
            feature_name_id = cur.fetchone()[0]
        elif row['name'] != maori_name:
            cur.execute("INSERT INTO FeatureName (featureName, language, lastUpdateDate, lastUpdateUser, feature_id) VALUES (%s, NULL, current_timestamp, 'AF', %s) " "RETURNING id", (row['name'], feature_id))
            print(f"  Inserted default FeatureName: {row['name']}")
            feature_name_id = cur.fetchone()[0]

        # set geometry
        cur.execute("INSERT INTO SpatialGeometryRepresentation (lastUpdateDate, lastUpdateUser, timePeriod, spatialAccuracy, feature_id, localityDescription_id) VALUES (current_timestamp, 'AF', NULL, NULL, %s, NULL) RETURNING id", ( feature_id,))
        spatial_geometry_representation_id = cur.fetchone()[0]
        print(f"  Inserted SpatialGeometryRepresentation with id: {spatial_geometry_representation_id}")

        sql = f"INSERT INTO spatialgeometryrepresentation_point (geodeticReferenceSystem, geometry, lastUpdateDate, lastUpdateUser, spatialGeometryRepresentation_id) VALUES (%s, %s, current_timestamp, 'AF', %s)"
        cur.execute(sql, ('EPSG 4326', row['geom'], spatial_geometry_representation_id))
        print("  Inserted geometry with EPSG 4326")

        # type
        print(f"feature class : {row['feature_class']}, feature code: {row['feature_code']}")
        fclass = row['feature_class']
        fcode = row['feature_code']
        feature_type = normalize_feature_code(fcode, fclass)
        cur.execute("INSERT INTO FeatureType (classificationScheme, featureClass, feature_id, lastUpdateDate, lastUpdateUser) VALUES ('geonames_feature_class', %s, %s, current_timestamp, 'AF') RETURNING id", (feature_type, feature_id))
        feature_type_id = cur.fetchone()[0]
        print(f"  Inserted FeatureType with class: {feature_type}")


        # source
        cur.execute("INSERT INTO Source (externalSourceId, source, spatialGeometryRepresentation_id, featureType_id, featureName_id, localityDescription_id, lastUpdateDate, lastUpdateUser) VALUES (%s, 'Geonames', %s, %s, %s, NULL, current_timestamp, 'AF')", (row['geonameid'], spatial_geometry_representation_id, feature_type_id, feature_name_id))
        print(f"  Inserted Source for geonameid: {row['geonameid']}")

        print(f"Finished processing row {index}\n")
    conn.commit()
    print("changes commited to the database.\n")

In [None]:
add_geonames_data(df_new)


#### Merge data from geonames to the existing features

In [None]:
def add_geonames_data_duplicate(df_duplicates):
    for index, row in df_duplicates.iterrows():
        try:
            print(f"---\nProcessing row {index} with geonameid {row['geonameid']}")
            geoname_id = row['geonameid']
            feature_name_id = None

            duplicate_entry = duplicates[duplicates['geonameid'] == int(geoname_id)]

            if duplicate_entry.empty:
                print(f"Warning: No matching feature found for geonameid {geoname_id}")
                continue

            feature_id = str(duplicate_entry.iloc[0]['feature_id'])
            print(f"Found existing feature_id: {feature_id}")

            # Check for existing feature name
            cur.execute("SELECT featurename FROM FeatureName WHERE feature_id = %s", (str(feature_id),))
            featurename_row = cur.fetchone()
            print(f"Existing FeatureName row: {featurename_row}")

            if not featurename_row or featurename_row[0] != row['name']:
                print(f"Inserting new FeatureName: {row['name']}")
                cur.execute("""
                    INSERT INTO FeatureName (featureName, language, lastUpdateDate, lastUpdateUser, feature_id)
                    VALUES (%s, NULL, current_timestamp, 'AF', %s)
                    RETURNING id
                """, (row['name'], feature_id))
                feature_name_id = cur.fetchone()[0]
            else:
                print("FeatureName already exists and matches the name. Skipping insert.")

            # Add Māori name if available
            maori_name = get_geonames_maori_name(geoname_id)
            if maori_name:
                cur.execute("""
                    SELECT id FROM FeatureName
                    WHERE featureName = %s AND feature_id = %s
                """, (maori_name, feature_id))
                maori_name_existed = cur.fetchone()
                if not maori_name_existed:
                    print(f"Inserting Māori name: {maori_name}")
                    cur.execute("""
                        INSERT INTO FeatureName (featureName, language, lastUpdateDate, lastUpdateUser, feature_id)
                        VALUES (%s, 'mi', current_timestamp, 'AF', %s)
                        RETURNING id
                    """, (maori_name, feature_id))
                    feature_name_id = cur.fetchone()[0]
                else:
                    print("Māori name already exists. Skipping insert.")

            # Insert FeatureType
            normalize_feature_code = normalize_feature_code(row['feature_code'], row['feature_class'])
            print(f"Inserting FeatureType: {normalize_feature_code}")
            cur.execute("""
                INSERT INTO FeatureType (classificationScheme, featureClass, feature_id, lastUpdateDate, lastUpdateUser)
                VALUES ('geonames_feature_class', %s, %s, current_timestamp, 'AF')
                RETURNING id
            """, (normalize_feature_code, feature_id))
            feature_type_id = cur.fetchone()[0]

            # Check and insert SpatialGeometryRepresentation if needed
            print("Checking for existing spatial geometry...")
            cur.execute("""
                SELECT sgr.id
                FROM SpatialGeometryRepresentation_point sgrp
                JOIN SpatialGeometryRepresentation sgr
                    ON sgrp.spatialGeometryRepresentation_id = sgr.id
                WHERE sgr.feature_id = %s AND sgrp.geometry = %s
            """, (feature_id, row['geom']))
            spatial_geometry_representation_id = None
            existing_geometry = cur.fetchone()

            if existing_geometry:
                print("Spatial geometry already exists.")
            else:
                print("Inserting new spatial geometry representation.")
                cur.execute("""
                    INSERT INTO SpatialGeometryRepresentation
                    (lastUpdateDate, lastUpdateUser, timePeriod, spatialAccuracy, feature_id, localityDescription_id)
                    VALUES (current_timestamp, 'AF', NULL, NULL, %s, NULL)
                    RETURNING id
                """, (feature_id,))
                spatial_geometry_representation_id = cur.fetchone()[0]

                cur.execute("""
                    INSERT INTO SpatialGeometryRepresentation_point
                    (geodeticReferenceSystem, geometry, lastUpdateDate, lastUpdateUser, spatialGeometryRepresentation_id)
                    VALUES (%s, %s, current_timestamp, 'AF', %s)
                """, ('EPSG 4326', row['geom'], spatial_geometry_representation_id))

            # Insert into Source
            print("Inserting Source entry.")
            cur.execute("""
                INSERT INTO Source
                (externalSourceId, source, spatialGeometryRepresentation_id, featureType_id, featureName_id, localityDescription_id, lastUpdateDate, lastUpdateUser)
                VALUES (%s, 'Geonames', %s, %s, %s, NULL, current_timestamp, 'AF')
            """, (geoname_id, spatial_geometry_representation_id, feature_type_id, feature_name_id))

        except Exception as e:
            print(f"Error processing row {index} (geonameid {row['geonameid']}): {e}")
    conn.commit()

In [None]:
add_geonames_data_duplicate(df_duplicates)
cur.close()
conn.close()