In [None]:
import numpy as np
import pandas as pd
import yaml
from geospark.register import GeoSparkRegistrator
from pyspark import SparkConf, SparkContext
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import FloatType, IntegerType, LongType
from pyspark.sql.window import Window
from tqdm import tqdm

config = yaml.load(open(r"config/testconfig.yaml"), Loader=yaml.FullLoader)
conf = SparkConf().setAll(config["sparkConf"].items())

spark = (
    SparkSession.builder.appName("OSMDistribution")
    .config(conf=conf)
    .enableHiveSupport()
    .getOrCreate()
)

GeoSparkRegistrator.registerAll(spark)

%load_ext autoreload
%autoreload 2

from lib.osmFeatures import *
from lib.distribution import *
from lib.utils import *

## Initialization

In [11]:
# Features table and features
osmFeatureSpecification = [
    "lowLvlBuild",
    "middleLvlBuild",
    "highLvlBuild",
    "lvlBuild",
    "allBuild",
    "adminBuild",
    "industrialObj",
    "publicTransportObj",
    "residentObj",
    "medsObj",
    "hotelObj",
    "entertainObj",
    "retailObj",
    "religiousObj",
    "histObj",
    "parkObj",
    "sportObj",
    "campObj",
    "beachObj",
    "foodObj",
    "militaryObj",
    "prisonObj",
    "parkingObj",
    "roadObj",
]

grid = spark.table(config["tableConf"]["gridBuffers"])

osmGeometry = spark.table(config["tableConf"]["osmGeometryTableName"]).filter(
    col("load_date") == config["tableConf"]["osmLoadDate"]
)

osmTag = spark.table(config["tableConf"]["osmTagTableName"]).filter(
    col("load_date") == config["tableConf"]["osmLoadDate"]
)

allOsmObjectsTbl = f"{dbName}.{prefixName}_all_osm_objects"
osmObjectsTbl = f"{dbName}.{prefixName}_osm_objects"
osmID = ["osm_id", "osm_type"]

allOsmObjects = getOsmObjects(osmGeometry, osmTag, osmFeatureSpecification)

saveTableOverwritePartition(allOsmObjects, 50, osmID, allOsmObjectsTbl)

osmObjects = (
    spark.table(allOsmObjectsTbl)
    .na.drop(how="all", subset=osmFeatureSpecification)
)

saveTableOverwritePartition(osmObjects, 50, osmID, osmObjectsTbl)

featuresTable = f"{dbName}.{prefixName}_osm_objects"

featuresGeometry = "osm_geom_wkt"
gridID = "gid"
gridGeometry = "geom_wkt"

categoricalFeatures = None

# Dictionary tables: which cell buffer intersect which cell
buffersDicts = {
    "": None,
    spark.table(f"{dbName}.{prefixName}_grid_dict_buffer_1km"): "buffer_1km",
    spark.table(f"{dbName}.{prefixName}_grid_dict_buffer_2km"): "buffer_2km",
    spark.table(f"{dbName}.{prefixName}_grid_dict_buffer_3km"): "buffer_3km",
    spark.table(f"{dbName}.{prefixName}_grid_dict_buffer_4km"): "buffer_4km",
    spark.table(f"{dbName}.{prefixName}_grid_dict_buffer_5km"): "buffer_5km",
}

# Tables for writing and their aliases
tblAndAliases = {
    f"{dbName}.{prefixName}_grid_osm_grid": "grid",
    f"{dbName}.{prefixName}_grid_osm_1km": "1km",
    f"{dbName}.{prefixName}_grid_osm_2km": "2km",
    f"{dbName}.{prefixName}_grid_osm_3km": "3km",
    f"{dbName}.{prefixName}_grid_osm_4km": "4km",
    f"{dbName}.{prefixName}_grid_osm_5km": "5km"}

toWriteTbls = list(tblAndAliases.keys())

## Calculation

In [None]:
for tblNumber, bufferTbl in enumerate(buffersDicts.keys()):  
    print(bufferTbl)
    
    bufferData = bufferTbl
    bufferColumn = buffersDicts[bufferTbl]
    
    for featureNumber, feature in enumerate(osmFeatureSpecification): 
        
        numericalFeatures = [feature]
    
        dataFeatures = (
            spark.table(featuresTable)
            .dropDuplicates(["osm_id", "osm_type"])
            .filter(col(f"{feature}") > 0)
            .select(col(f"{feature}"), col(f"{featuresGeometry}"))
            )
        
        gridDistrib = GridDistribution(
            grid,
            gridID,
            gridGeometry,
            dataFeatures,
            featuresGeometry,
            numericalFeatures,
            categoricalFeatures,
            categoricalFunctions=None,
            numericalFunctions=["sum"]
        )
        
        featuresAndGrid = gridDistrib.featuresByGrid()

        gridDistribFeatureLocality = gridDistrib.gridFeaturesAgg(
            featuresAndGrid, bufferData, bufferColumn
        )
        
        if featureNumber == 0:
            gridDistribLocality = gridDistribFeatureLocality
        else:
            gridDistribLocality = gridDistribLocality.join(gridDistribFeatureLocality, gridID, "outer")
    
        
    saveTableOverwritePartition(
        gridDistribLocality, 50, [gridID], toWriteTbls[tblNumber]
    )

toWriteTbl = f"{dbName}.{prefixName}_geo_grid_osm"

OSMJoining = gridDistrib.joining(tblAndAliases)

saveTableOverwritePartition(
    OSMJoining,
    135,
    [gridID],
    toWriteTbl,
)

## Visualisation

In [None]:
import geopandas
import keplergl
from shapely import wkt

### Take only one region

In [None]:
regionID = # insert your value
featuresDF = spark.table(f"{dbName}.{prefixName}_geo_grid_osm")
gridLocal = grid.filter(col("region_id") == regionID)

featuresDF = (
    featuresDF
    .join(gridLocal, [f"{gridID}"], "inner")
    .withColumnRenamed(f"{gridGeometry}", "geometry")
)

print(featuresDF.count())

df = featuresDF.toPandas()

### Transform to geopandas and add to map

In [None]:
for column in df.columns:
    if column != "geometry":
        df[column] = df[column].astype("int")


df["geometry"] = df.apply(lambda x: wkt.loads(str(x["geometry"])), axis=1)

poly_sectors_gdf = geopandas.GeoDataFrame(
    df, crs={"init": "epsg:4326"}, geometry="geometry"
)

map_1 = keplergl.KeplerGl(height=900)
map_1.add_data(
    data=poly_sectors_gdf
)
map_1