In [None]:
# check if we can connect to the postgres database

import os
import psycopg2

PASSWORD = os.getenv("POSTGRES_PASSWORD")

conn = psycopg2.connect(
    host="localhost",
    database="postgres",
    user="postgres",
    port=15432,
    password=PASSWORD
)

# list all tables in the database
cursor = conn.cursor()
cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_schema='public'")
tables = cursor.fetchall()
print(tables)

# install postgis extension
cursor.execute("CREATE EXTENSION IF NOT EXISTS postgis;")
conn.commit()


[('geography_columns',), ('geometry_columns',), ('spatial_ref_sys',), ('places',)]


In [2]:
# import pydantic

# class Place(pydantic.BaseModel):
#     names: list[str]
#     geometry: bytearray
#     type: str
#     id: str
#     hierarchy: list[str]
#     properties: dict

In [3]:
from sqlalchemy import Column, String, Text, JSON, ARRAY
from sqlalchemy.orm import declarative_base
from geoalchemy2 import Geometry

Base = declarative_base()

class PlaceORM(Base):
    __tablename__ = "places"

    id = Column(String, primary_key=True)
    names = Column(ARRAY(Text))
    type = Column(Text)
    hierarchy = Column(ARRAY(Text))
    geometry = Column(Geometry(geometry_type='GEOMETRY', srid=4326))
    properties = Column(JSON)

    def to_dict(self    ):
        return {c.name: getattr(self, c.name) for c in self.__table__.columns}



In [4]:
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.dialects.postgresql import insert

engine = create_engine(f"postgresql://postgres:{PASSWORD}@localhost:15432/postgres")
Session = sessionmaker(bind=engine)
session = Session()

# drop the table if it exists
Base.metadata.drop_all(engine)

# create the table if it doesn't exist
Base.metadata.create_all(engine)

# def insert_places(places: list[PlaceORM]):
#     # Get the table from your model
#     table = PlaceORM.__table__

#     # Prepare insert statement with conflict handling for PostgreSQL
#     stmt = insert(table).values([place.to_dict() for place in places])  # Assuming `to_dict()` converts ORM objects to dicts
#     stmt = stmt.on_conflict_do_update(
#         index_elements=['id'],  # Conflict on 'id'
#         set_={col.name: getattr(stmt.excluded, col.name) for col in table.columns if col.name != 'id'}
#     )

#     # Execute the statement
#     session.execute(stmt)
#     session.commit()


def insert_places(places: list[PlaceORM]):
    # Use bulk_save_objects to efficiently perform the bulk upsert
    session.bulk_save_objects(places)
    session.commit()



In [None]:
import duckdb
import time
from geoalchemy2.shape import from_shape
from shapely import wkb
from pprint import pprint

# Define your file paths
file1 = '../data/divisions.parquet'
file2 = '../data/division_areas.parquet'

def process_paginated_data(file1, file2, page_size=1000):
    con = duckdb.connect()

    # Function to process a chunk of data based on LIMIT and OFFSET
    def fetch_and_process_data(offset):
        query = f"""
        SELECT f1.id, f1.names, f1.subtype, f1.hierarchies, f1.country, f2.geometry
        FROM '{file1}' AS f1
        LEFT JOIN '{file2}' AS f2
        ON f1.id = f2.division_id
        ORDER BY f1.id
        LIMIT {page_size} OFFSET {offset}
        """

        # Run the query and get a DataFrame
        start_time = time.time()
        result_df = con.execute(query).df()
        end_time = time.time()
        print(f"Time taken to execute query: {end_time - start_time} seconds")

        if result_df.empty:
            return False  # No more data to process

        start_time = time.time()
        places = []
        for index, row in result_df.iterrows():
            orig_names = row.get("names")
            names = []
            if orig_names:
                primary_name = orig_names.get("primary")
                common_english_name = (orig_names.get("common") or {}).get("en")
                names = [name for name in [primary_name, common_english_name] if name]
            if len(row["hierarchies"]) > 0:
                hierarchies = row["hierarchies"][0]
                hierarchy = [h["name"] for h in hierarchies]

            geometry = from_shape(wkb.loads(bytes(row['geometry'])), srid=4326)

            place = PlaceORM(
                names=names,
                geometry=geometry,
                type=row['subtype'],
                id=row['id'],
                hierarchy=hierarchy,
                properties={
                    "country": row['country'],
                })
            places.append(place)
            # pprint(place.to_dict())
        end_time = time.time()
        print(f"Time taken to process data: {end_time - start_time} seconds")

        start_time = time.time()
        insert_places(places)
        end_time = time.time()
        print(f"Time taken to insert data: {end_time - start_time} seconds")
        return True

    # Initial offset
    offset = 0

    while True:
        # Process data in chunks
        print(f"Processing chunk {offset} - {offset + page_size}")
        if not fetch_and_process_data(offset):
            break
        offset += page_size  # Move to the next chunk

# Call the function with your files and desired page size
process_paginated_data(file1, file2, page_size=10000)


Processing chunk 0 - 10000


Time taken to execute query: 3.1809241771698 seconds
Time taken to process data: 0.8589038848876953 seconds
Time taken to insert data: 2.394068479537964 seconds
Processing chunk 10000 - 20000
Time taken to execute query: 3.076491355895996 seconds
Time taken to process data: 0.8016703128814697 seconds
Time taken to insert data: 2.1773884296417236 seconds
Processing chunk 20000 - 30000
Time taken to execute query: 3.3585617542266846 seconds
Time taken to process data: 0.8011825084686279 seconds
Time taken to insert data: 2.2265303134918213 seconds
Processing chunk 30000 - 40000
Time taken to execute query: 3.3278515338897705 seconds
Time taken to process data: 0.792792558670044 seconds
Time taken to insert data: 2.4132275581359863 seconds
Processing chunk 40000 - 50000
Time taken to execute query: 3.586871385574341 seconds
Time taken to process data: 0.8485774993896484 seconds
Time taken to insert data: 2.5634658336639404 seconds
Processing chunk 50000 - 60000
Time taken to execute query

RuntimeError: Query interrupted

In [9]:
# show one place and its geometry as geojson

from sqlalchemy import select, func

stmt = select(
    PlaceORM.id,
    PlaceORM.names,
    func.ST_AsGeoJSON(PlaceORM.geometry).label("geometry_json")
).limit(1)

results = session.execute(stmt).all()

pprint(results)

# save as geojson

with open("places.geojson", "w") as f:
    f.write(results[0].geometry_json)


[('085000003fffffff0126c753d6cd5249', ['La Gaile'], None)]


TypeError: write() argument must be str, not None

In [7]:
session.commit()

In [17]:
query = "New York"

# search for the query in the database

results = session.execute(select(PlaceORM).where(PlaceORM.names.any(query))).all()

pprint([result[0].to_dict() for result in results])

[{'geometry': None,
  'hierarchy': ['United States', 'Missouri', 'Caldwell County', 'New York'],
  'id': '0850c31cffffffff012426d7a157b4aa',
  'names': ['New York'],
  'properties': {'country': 'US'},
  'type': 'locality'},
 {'geometry': None,
  'hierarchy': ['Sierra Leone',
                'Western Area',
                'Western Area Rural',
                'New York'],
  'id': '08514493bfffffff01deb75ea9daa7f0',
  'names': ['New York'],
  'properties': {'country': 'SL'},
  'type': 'neighborhood'},
 {'geometry': None,
  'hierarchy': ['United States', 'Texas', 'Henderson County', 'New York'],
  'id': '0852ab47bfffffff01bb8c3ae3deb03e',
  'names': ['New York'],
  'properties': {'country': 'US'},
  'type': 'locality'},
 {'geometry': None,
  'hierarchy': ['Colombia', 'Guaviare', 'El Retorno', 'New York'],
  'id': '085196583fffffff0109b8c0db78b88c',
  'names': ['New York'],
  'properties': {'country': 'CO'},
  'type': 'locality'},
 {'geometry': None,
  'hierarchy': ['Sierra Leone', 'South