# Functional data

This notebook links various functional layers to ET cells across GB.

## Population estimates

Population estimates are linked using area weighted interpolation based on building geometry.

In [None]:
import warnings

import geopandas as gpd
import pandas as pd
import numpy as np
import tobler
from time import time
import xarray
import rioxarray
import rasterstats

from dask.distributed import Client, LocalCluster, as_completed
import dask.dataframe as dd

In [None]:
warnings.filterwarnings('ignore', message='.*initial implementation of Parquet.*')

In [None]:
population_est = gpd.read_parquet("../../urbangrammar_samba/functional_data/population_estimates/gb_population_estimates.pq")

In [None]:
chunk = gpd.read_parquet("../../urbangrammar_samba/spatial_signatures/tessellation/tess_0.pq")

In [None]:
xmin, ymin, xmax, ymax = chunk.total_bounds

In [None]:
%%time
ests = tobler.area_weighted.area_interpolate(population_est.cx[xmin:xmax, ymin:ymax], chunk.set_geometry("buildings"), extensive_variables=['population'])

In [None]:
for chunk_id in range(103):
    s = time()
    chunk = gpd.read_parquet(f"../../urbangrammar_samba/spatial_signatures/tessellation/tess_{chunk_id}.pq", columns=["hindex", "buildings"]).set_geometry("buildings")
    xmin, ymin, xmax, ymax = chunk.total_bounds
    ests = tobler.area_weighted.area_interpolate(population_est.cx[xmin:xmax, ymin:ymax], chunk, extensive_variables=['population'])
    pop = pd.DataFrame({'hindex': chunk.hindex.values, "population": ests.population.values})
    pop.to_parquet(f"../../urbangrammar_samba/spatial_signatures/functional/population/pop_{chunk_id}")
    print(f"Chunk {chunk_id} processed sucessfully in {time() - s} seconds.")

## Night lights

Night lights are merged using zonal statistics.

In [None]:
nl = xarray.open_rasterio("../../urbangrammar_samba/functional_data/employment/night_lights_osgb.tif")
nl_clip = nl.rio.clip_box(*chunk.total_bounds)
arr = nl_clip.values
affine = nl_clip.rio.transform()

In [None]:
%%time 
stats_nl = rasterstats.zonal_stats(
    chunk.tessellation, 
    raster=arr[0],
    affine=affine,
    stats=['mean'],
    all_touched=True,
    nodata = np.nan,
)

In [None]:
workers = 8
client = Client(LocalCluster(n_workers=workers, threads_per_worker=1))
client

In [None]:
def _night_lights(chunk_id):
    import rioxarray
    
    s = time()
    
    chunk = gpd.read_parquet(f"../../urbangrammar_samba/spatial_signatures/tessellation/tess_{chunk_id}.pq", columns=["hindex", "tessellation"])
    nl = xarray.open_rasterio("../../urbangrammar_samba/functional_data/employment/night_lights_osgb.tif")
    nl_clip = nl.rio.clip_box(*chunk.total_bounds)
    arr = nl_clip.values
    affine = nl_clip.rio.transform()
    stats_nl = rasterstats.zonal_stats(
        chunk.tessellation, 
        raster=arr[0],
        affine=affine,
        stats=['mean'],
        all_touched=True,
        nodata = np.nan,
    )
    chunk["night_lights"] = [x['mean'] for x in stats_nl]
    chunk[["hindex", "night_lights"]].to_parquet(f"../../urbangrammar_samba/spatial_signatures/functional/night_lights/nl_{chunk_id}")
    
    return f"Chunk {chunk_id} processed sucessfully in {time() - s} seconds."

In [None]:
inputs = iter(range(103))
futures = [client.submit(_night_lights, next(inputs)) for i in range(workers)]
ac = as_completed(futures)
for finished_future in ac:
    # submit new future 
    try:
        new_future = client.submit(_night_lights, next(inputs))
        ac.add(new_future)
    except StopIteration:
        pass
    print(finished_future.result())

## Worplace population by industry

Worplace population is linked using area weighted interpolation based on building geometry.

In [None]:
wpz = gpd.read_parquet('../../urbangrammar_samba/functional_data/employment/workplace/workplace_by_industry_gb.pq')

In [None]:
for chunk_id in range(103):
    s = time()
    chunk = gpd.read_parquet(f"../../urbangrammar_samba/spatial_signatures/tessellation/tess_{chunk_id}.pq", columns=["hindex", "buildings"]).set_geometry("buildings")
    xmin, ymin, xmax, ymax = chunk.total_bounds
    ests = tobler.area_weighted.area_interpolate(wpz.cx[xmin:xmax, ymin:ymax], chunk, extensive_variables=wpz.columns[1:-1].to_list())
    ests['hindex'] = chunk.hindex.values
    ests.drop(columns="geometry").to_parquet(f"../../urbangrammar_samba/spatial_signatures/functional/workplace/pop_{chunk_id}")
    print(f"Chunk {chunk_id} processed sucessfully in {time() - s} seconds.")

## CORINE Land cover

CORINE Land cover is linked using area weighted interpolation based on tessellation geometry.

In [None]:
corine = gpd.read_parquet("../../urbangrammar_samba/functional_data/land_use/corine/corine_gb.pq")

In [None]:
def _dask_binning(corine, cells, n_chunks=512):
    import dask_geopandas as dgpd
    from scipy.sparse import coo_matrix
    
    ids_src, ids_tgt = cells.sindex.query_bulk(corine.geometry, predicate="intersects")
    df = gpd.GeoDataFrame({'clc': corine.geometry.values[ids_src], 'tess': cells.geometry.values[ids_tgt]})
    ddf = dgpd.from_geopandas(df, npartitions=n_chunks)
    areas = ddf.clc.intersection(ddf.tess).area.compute()
    table = coo_matrix(
        (areas, (ids_src, ids_tgt),),
        shape=(corine.shape[0], cells.shape[0]),
        dtype=np.float32,
    )

    table = table.todok()

    return table


def _dask_area_interpolate(corine, cells, n_chunks=512, categorical_variables=None):
    table = _dask_binning(corine, cells, n_chunks)
    
    if categorical_variables:
        categorical = {}
        for variable in categorical_variables:
            unique = corine[variable].unique()
            for value in unique:
                mask = corine[variable] == value
                categorical[f"{variable}_{value}"] = np.asarray(
                    table[mask].sum(axis=0)
                )[0]

        categorical = pd.DataFrame(categorical)
        categorical = categorical.div(cells.area, axis="rows")
    
    return categorical

In [None]:
for chunk_id in range(103):
    s = time()
    chunk = gpd.read_parquet(f"../../urbangrammar_samba/spatial_signatures/tessellation/tess_{chunk_id}.pq", columns=["hindex", "tessellation"])
    xmin, ymin, xmax, ymax = chunk.total_bounds
    ests = _dask_area_interpolate(corine.cx[xmin:xmax, ymin:ymax], chunk, categorical_variables=["Code_18"])
    ests['hindex'] = chunk.hindex.values
    ests.to_parquet(f"../../urbangrammar_samba/spatial_signatures/functional/corine/corine_{chunk_id}.pq")
    print(f"Chunk {chunk_id} processed sucessfully in {time() - s} seconds.")

## Retail centres

CDRC Retail centres are linked as a distance to the nearest one.

In [None]:
retail = gpd.read_file("../../urbangrammar_samba/functional_data/retail_centres/Pre Release.zip!Retail_Centres_UK.gpkg")

In [None]:
workers = 16
client = Client(LocalCluster(n_workers=workers, threads_per_worker=1))
client

In [None]:
def measure_nearest(chunk):
    s = time()
    gdf = gpd.read_parquet(f'../../urbangrammar_samba/spatial_signatures/tessellation/tess_{chunk}.pq')
    b = gdf.total_bounds
    
    initial_buffer = 500
    buffered = gdf.tessellation.buffer(initial_buffer)
    distance = []
    for orig, geom in zip(gdf.tessellation, buffered.geometry):
        query = retail.sindex.query(geom, predicate='intersects')
        b = initial_buffer
        while query.size == 0:
            query = retail.sindex.query(geom.buffer(b), predicate='intersects')
            b += initial_buffer

        distance.append(retail.iloc[query].distance(orig).min())
    gdf['nearest_retail_centre'] = distance
    gdf[['hindex', 'nearest_retail_centre']].to_parquet(f'../../urbangrammar_samba/spatial_signatures/functional/retail_centre/retail_{chunk}.pq')
    
    return f"Chunk {chunk} processed sucessfully in {time() - s} seconds."

In [None]:
inputs = iter(range(103))
futures = [client.submit(measure_nearest, next(inputs)) for i in range(workers)]
ac = as_completed(futures)
for finished_future in ac:
    # submit new future 
    try:
        new_future = client.submit(measure_nearest, next(inputs))
        ac.add(new_future)
    except StopIteration:
        pass
    print(finished_future.result())

## Water

Water is measured as a distance to the nearest one.

In [None]:
from sqlalchemy import create_engine
from shapely.geometry import box
from shapely.ops import polygonize

user = os.environ.get('DB_USER')
pwd = os.environ.get('DB_PWD')
host = os.environ.get('DB_HOST')
port = os.environ.get('DB_PORT')

db_connection_url = f"postgres+psycopg2://{user}:{pwd}@{host}:{port}/built_env"

In [None]:
def measure_nearest(chunk):
    s = time()
    gdf = gpd.read_parquet(f'../../urbangrammar_samba/spatial_signatures/tessellation/tess_{chunk}.pq')
    b = gdf.total_bounds
    engine = create_engine(db_connection_url)
    sql = f'SELECT * FROM gb_coastline_2016 WHERE ST_Intersects(geometry, ST_MakeEnvelope({b[0]}, {b[1]}, {b[2]}, {b[3]}, 27700))'
    coastline = gpd.read_postgis(sql, engine, geom_col='geometry')
    sql = f'SELECT * FROM openmap_surfacewater_area_200824 WHERE ST_Intersects(geometry, ST_MakeEnvelope({b[0]}, {b[1]}, {b[2]}, {b[3]}, 27700))'
    water = gpd.read_postgis(sql, engine, geom_col='geometry')
    
    sql = f'SELECT * FROM gb_coastline_2016'
    coastline = gpd.read_postgis(sql, engine, geom_col='geometry')

    polys = polygonize(coastline.geometry)
    land = gpd.GeoSeries(polys, crs=27700)
    sea = box(*land.total_bounds).difference(land.geometry.unary_union)
    
    target = water.geometry
    target.loc[len(water)] = sea
    target = gpd.clip(target, box(*b))
    
    initial_buffer = 500
    buffered = gdf.tessellation.buffer(initial_buffer)
    distance = []
    for orig, geom in zip(gdf.tessellation, buffered.geometry):
        query = target.sindex.query(geom, predicate='intersects')
        b = initial_buffer
        while query.size == 0:
            query = target.sindex.query(geom.buffer(b), predicate='intersects')
            b += initial_buffer

        distance.append(target.iloc[query].distance(orig).min())
    gdf['nearest_water'] = distance
    gdf[['hindex', 'nearest_water']].to_parquet(f'../../urbangrammar_samba/spatial_signatures/functional/water/water_{chunk}.pq')
    
    return f"Chunk {chunk} processed sucessfully in {time() - s} seconds."