In [None]:
import geopandas as gpd 
from geopandas import GeoDataFrame
import pandas as pd
import osmnx as ox 
import os 
import typing
import glob


In [None]:
CATEGORIES = {"building": {"building": True}, 
              "territories": {"boundary": "administrative", "admin_level": "2"}, 
              "landuse": {"landuse": True},
              "highway": {"highway": True},
              "fire_station": {"amenity": ["fire_station"]}, 
              "waterway": {"waterway": True}, 
              "substation": {"power": ["substation"]}
}

In [None]:
ox.config(use_cache=True, log_console=False, timeout=720)

In [None]:
def _append_poi_category(gdf_poi: GeoDataFrame,
                         tags: typing.Dict[str, list], 
                         col_poi_id: str = "id_poi"
                         ) -> GeoDataFrame: 
    """Append poi category from tag search to POI locations
    Args:
        gdf_poi (GeoDataFrame): POI locations
        tags (typing.Dict[str, list]): mapping containing osm key as key and list of osm values as value
        col_poi_id (str, optional): name of column uniquely identifying each POI. Defaults to "id_poi".
    Returns:
        GeoDataFrame: POI locations with POI category appended
    """    
    for key in tags: 
        if key not in set(gdf_poi.columns): 
            gdf_poi[key] = [None] * gdf_poi.shape[0]
    
    poi_category = pd.concat([gdf_poi.set_index(col_poi_id)[key] for key in tags]).dropna()
    allowed_values = [v for val in tags.values() for v in val]
    poi_category = poi_category[poi_category.isin(allowed_values)]
    poi_category = poi_category.groupby(col_poi_id).first()
    poi_category.name = "poi_cat"
    
    gdf_poi = gdf_poi.merge(poi_category, how="left", left_on=col_poi_id, right_index=True, validate="1:1")
    return gdf_poi


def _postprocess_osm_data(gdf_osm: gpd.GeoDataFrame, 
                          city_osm: str,
                          category: str,
                          cols_relevant: typing.List[str],
                          )  -> gpd.GeoDataFrame: 
    """Postprocess OSM POI locations: appending poi id and poi category, appending lat/lon coordinates, transform POI to 
    EPSG:25832
    
    Args:
        gdf_osm (GeoDataFrame): POI locations
        tags (typing.Dict[str, list]): mapping containing osm key as key and list of osm values as value
        cols_relevant (typing.List[str], optional): list of relevant columns. If not specified all columns are returned. Defaults to ["geometry", "id_poi", "category", "longitude", "latitude", "city"].

    Returns:
        GeoDataFrame: [description]
    """    
    gdf_osm["city"] = city_osm
    gdf_osm.reset_index(drop=False, inplace=True)
    
    gdf_osm["osm_id"] = gdf_osm["element_type"] + "/" + gdf_osm["osmid"].astype(str)
    gdf_osm.drop(columns=["ways", "nodes"], errors="ignore", inplace=True)
        
    if cols_relevant is None: 
        cols_relevant = gdf_osm.columns 
    
    repr_point =  gdf_osm.representative_point()
    gdf_osm["longitude"] = repr_point.x
    gdf_osm["latitude"] = repr_point.y
    gdf_osm["category"] = category 

    gdf_osm.to_crs("EPSG:25832", inplace=True)
    
    return gdf_osm[cols_relevant]

def _get_osm_category(category: str,
                      tags: dict, 
                      city: str, 
                      cols_relevant=["geometry", "osm_id", "category", "longitude", "latitude", "city"]
                      )  -> gpd.GeoDataFrame: 
    gdf_osm = ox.geometries_from_place(f"{city}, Germany", tags=tags)
    gdf_osm = _postprocess_osm_data(gdf_osm, city_osm=city, category=category, cols_relevant=cols_relevant)
    
    return gdf_osm


def retrieve_osm_data(categories: dict, 
                      dir_save: str = "./data",
                      city: str = "Essen", 
                      cols_relevant=["geometry", "osm_id", "category", "longitude", "latitude", "city"]): 
    for cat, tags in categories.items(): 
        print(cat)
        gdf_osm = _get_osm_category(category=cat, tags=tags, city=city, cols_relevant=cols_relevant)
        fullpath_save = os.path.join(dir_save, f"{cat}.geojson")
        
        gdf_osm.to_file(fullpath_save, driver="GeoJSON")

In [None]:
CATEGORIES = {"building": {"building": True}, 
              "territories": {"boundary": "administrative", "admin_level": "2"}, 
              "landuse": {"landuse": True},
              "highway": {"highway": True},
              "fire_station": {"amenity": ["fire_station"]}, 
              "waterway": {"waterway": True}, 
              "substation": {"power": ["substation"]}
}

In [None]:
%%time
retrieve_osm_data(categories=CATEGORIES)

In [None]:
import configparser 
import sqlalchemy

CONFIG = "config.cfg"
ENGINE = "engine"
URL = "url"
PORT = "port"
USERNAME = "db_username"
PASSWORD = "password"
DBNAME = "db_name"
DRIVER = "driver" 
FSTR_POSTGIS_CONNECT = f"{{{ENGINE}}}://{{{USERNAME}}}:{{{PASSWORD}}}@{{{URL}}}:{{{PORT}}}/{{{DBNAME}}}"
config = configparser.ConfigParser()
config.read(CONFIG)

engine = sqlalchemy.create_engine(FSTR_POSTGIS_CONNECT.format(**config["postgis"]), echo=False)

In [None]:
osm_files = glob.glob("./data/territories.geojson")

In [None]:
schema = "deep_dive" 

engine.execute(f"CREATE SCHEMA IF NOT EXISTS {schema};")
for of in osm_files:
    gdf_osm = gpd.read_file(of)
    table_name = gdf_osm["category"].iloc[0]
    
    gdf_osm.to_postgis(con=engine, name=table_name, schema=schema, if_exists="replace")