In [1]:
from dask.distributed import Client
client = Client("tcp://172.17.0.2:8786")

In [2]:
client

0,1
Client  Scheduler: tcp://172.17.0.2:8786  Dashboard: http://172.17.0.2:8787/status,Cluster  Workers: 1  Cores: 28  Memory: 84.28 GB


In [3]:
import os

import geopandas as gpd
import pandas as pd
import numpy as np
import momepy as mm
import networkx as nx
import dask.dataframe as dd

from sqlalchemy import create_engine

In [35]:
def check(xy):
    
    from itertools import combinations
    import collections

    import pygeos
    import numpy as np
    import pandas as pd
    import geopandas as gpd
    import momepy as mm

    from shapely.ops import polygonize
    from scipy.spatial import Voronoi


    # helper functions
    def get_ids(x, ids):
        return ids[x]


    mp = np.vectorize(get_ids, excluded=["ids"])


    def dist(p1, p2):
        return np.sqrt(((p1[0] - p2[0]) ** 2) + ((p1[1] - p2[1]) ** 2))


    def get_verts(x, voronoi_diagram):
        return voronoi_diagram.vertices[x]


    def _average_geometry(lines, poly=None, distance=2):
        """
        Returns average geometry.


        Parameters
        ----------
        lines : list
            LineStrings connected at endpoints forming a closed polygon
        poly : shapely.geometry.Polygon
            polygon enclosed by `lines`
        distance : float
            distance for interpolation

        Returns list of averaged geometries
        """
        if not poly:
            polygons = list(polygonize(lines))
            if len(polygons) == 1:
                poly = polygons[0]
            else:
                raise ValueError("given lines do not form a single polygon")
        # get an additional line around the lines to avoid infinity issues with Voronoi
        extended_lines = [poly.buffer(distance).exterior] + lines

        # interpolate lines to represent them as points for Voronoi
        points = np.empty((0, 2))
        ids = []

        pygeos_lines = pygeos.from_shapely(extended_lines)
        lengths = pygeos.length(pygeos_lines)
        for ix, (line, length) in enumerate(zip(pygeos_lines, lengths)):
            pts = pygeos.line_interpolate_point(
                line, np.linspace(0.1, length - 0.1, num=int((length - 0.1) // distance))
            )  # .1 offset to keep a gap between two segments
            points = np.append(points, pygeos.get_coordinates(pts), axis=0)
            ids += [ix] * len(pts)

            # here we might also want to append original coordinates of each line
            # to get a higher precision on the corners, but it does not seem to be
            # necessary based on my tests.

        # generate Voronoi diagram
        voronoi_diagram = Voronoi(points)

        # get all rigdes and filter only those between the two lines
        pts = voronoi_diagram.ridge_points
        mapped = mp(pts, ids=ids)

        # iterate over segment-pairs
        edgelines = []
        for a, b in combinations(range(1, len(lines) + 1), 2):
            mask = (
                np.isin(mapped[:, 0], [a, b])
                & np.isin(mapped[:, 1], [a, b])
                & (mapped[:, 0] != mapped[:, 1])
            )
            rigde_vertices = np.array(voronoi_diagram.ridge_vertices)
            verts = rigde_vertices[mask]

            # generate the line in between the lines
            edgeline = pygeos.line_merge(
                pygeos.multilinestrings(get_verts(verts, voronoi_diagram))
            )
            snapped = pygeos.snap(edgeline, pygeos_lines[a], distance)
            edgelines.append(snapped)
        return edgelines


    def consolidate(network, distance=2, epsilon=2, filter_func=None, **kwargs):
        """
        Consolidate edges of a network, takes care of geometry only. No
        attributes are preserved at the moment.

        The whole process is split into several steps:
        1. Polygonize network
        2. Find polygons which are likely caused by dual lines and other
           geometries to be consolidated.
        3. Iterate over those polygons and generate averaged geometry
        4. Remove invalid and merge together with new geometry.

        Step 2 needs work, this is just a first attempt based on shape and area
        of the polygon. We will have to come with clever options here and
        allow their specification, because each network will need different
        parameters.

        Either before or after these steps needs to be done node consolidation,
        but in a way which does not generate overlapping geometries.
        Overlapping geometries cause (unresolvable) issues with Voronoi.

        Parameters
        ----------
        network : GeoDataFrame (LineStrings)

        distance : float
            distance for interpolation

        epsilon : float
            tolerance for simplification

        filter_func : function
            function which takes gdf of polygonized network and returns mask of invalid
            polygons (those which should be consolidated)

        **kwargs
            Additional kwargs passed to filter_func
        """

        # polygonize network
        polygonized = polygonize(network.geometry)
        geoms = [g for g in polygonized]
        gdf = gpd.GeoDataFrame(geometry=geoms, crs=network.crs)

        # filter potentially incorrect polygons
        mask = filter_func(gdf, **kwargs)
        invalid = gdf.loc[mask]

        sindex = network.sindex

        # iterate over polygons which are marked to be consolidated
        # list segments to be removed and the averaged geoms replacing them
        averaged = []
        to_remove = []
        for poly in invalid.geometry:
            real = network.iloc[sindex.query(poly.exterior, predicate="intersects")]
            mask = real.intersection(poly.exterior).type.isin(
                ["LineString", "MultiLineString"]
            )
            real = real[mask]
            lines = list(real.geometry)
            to_remove += list(real.index)

            if lines:
                av = _average_geometry(lines, poly, distance)
                averaged += av

        # drop double lines
        clean = network.drop(set(to_remove))

        # merge new geometries with the existing network
        averaged = gpd.array.from_shapely(averaged, crs=network.crs).simplify(epsilon)
        result = pd.concat([clean, gpd.GeoDataFrame(geometry=averaged[~averaged.is_empty])])
        merge = topology(result)

        return merge


    def roundabouts(gdf, area=5000, circom=0.6):
        """
        Filter out roundabouts
        """

        # calculate parameters
        gdf["area"] = gdf.geometry.area
        gdf["circom"] = mm.CircularCompactness(gdf, "area").series
        # select valid and invalid network-net_blocks
        mask = (gdf["area"] < area) & (gdf["circom"] > circom)
        return mask


    def filter_comp(gdf, max_size=10000, circom_max=0.2):
        """
        Filter based on max size and compactness

        Parameters
        ----------
        gdf : GeoDataFrame
            polygonized network
        max_size : float
            maximum size of a polygon to be considered potentially invalid
        circom_max : float
            maximum circular compactness of a polygon to be considered
            potentially invalid.

        Returns boolean series

        """
        # calculate parameters
        gdf["area"] = gdf.geometry.area
        gdf["circom"] = mm.CircularCompactness(gdf, "area").series
        # select valid and invalid network-net_blocks
        mask = (gdf["area"] < max_size) & (gdf["circom"] < circom_max)
        return mask


    def topology(gdf):
        """
        Clean topology of existing LineString geometry by removal of nodes of degree 2.

        Parameters
        ----------
        gdf : GeoDataFrame
            (Multi)LineString data of street network
        """

        # explode to avoid MultiLineStrings
        # double reset index due to the bug in GeoPandas explode
        df = gdf.reset_index(drop=True).explode().reset_index(drop=True)

        # get underlying pygeos geometry
        geom = df.geometry.values.data

        # extract array of coordinates and number per geometry
        coords = pygeos.get_coordinates(geom)
        indices = pygeos.get_num_coordinates(geom)

        # generate a list of start and end coordinates and create point geometries
        edges = [0]
        i = 0
        for ind in indices:
            ix = i + ind
            edges.append(ix - 1)
            edges.append(ix)
            i = ix
        edges = edges[:-1]
        points = pygeos.points(np.unique(coords[edges], axis=0))

        # query LineString geometry to identify points intersecting 2 geometries
        tree = pygeos.STRtree(geom)
        inp, res = tree.query_bulk(points, predicate="intersects")
        unique, counts = np.unique(inp, return_counts=True)
        merge = res[np.isin(inp, unique[counts == 2])]

        # filter duplications and create a dictionary with indication of components to be merged together
        dups = [item for item, count in collections.Counter(merge).items() if count > 1]
        split = np.split(merge, len(merge) / 2)
        components = {}
        for i, a in enumerate(split):
            if a[0] in dups or a[1] in dups:
                if a[0] in components.keys():
                    i = components[a[0]]
                elif a[1] in components.keys():
                    i = components[a[1]]
            components[a[0]] = i
            components[a[1]] = i

        # iterate through components and create new geometries
        new = []
        for c in set(components.values()):
            keys = []
            for item in components.items():
                if item[1] == c:
                    keys.append(item[0])
            new.append(pygeos.line_merge(pygeos.union_all(geom[keys])))

        # remove incorrect geometries and append fixed versions
        df = df.drop(merge)
        final = gpd.GeoSeries(new).explode().reset_index(drop=True)
        return df.append(
            gpd.GeoDataFrame({df.geometry.name: final}, geometry=df.geometry.name),
            ignore_index=True,
        )

    user = 'martin'
    pwd = 'gdsl2020'
    host = '138.253.73.214'
    port = '5432'

    url = f"postgres+psycopg2://{user}:{pwd}@{host}:{port}/built_env"
    engine = create_engine(url)
    # x, y = xy  # coordinates in epsg 27700
    buffer = 1000  # radius in [m]

    sql = f'SELECT * FROM openroads_200803_topological WHERE ST_DWithin(geometry, ST_SetSRID(ST_Point({xy[0][1:-1]}), 27700), {buffer})'

    df = gpd.read_postgis(sql, engine, geom_col='geometry')

#     areas = range(500, 2501, 500)
#     compactness = np.linspace(0.7, 0.9, 5)
    areas=[500]
    compactness=[0.8]
    results = []

    for area in areas:
        for comp in compactness:
            try:
                topo = consolidate(df, filter_func=roundabouts, area=area, circom=comp)

                G = mm.gdf_to_nx(topo)
                mesh = mm.meshedness(G, radius=None)
                G = mm.subgraph(G, meshedness=True, cds_length=False, mean_node_degree=False, proportion={0: False, 3: False, 4: False}, cyclomatic=False, edge_node_ratio=False, gamma=False, local_closeness=True, closeness_weight=None, verbose=False)
                vals = list(nx.get_node_attributes(G, 'meshedness').values())
                l_mesh_mean = np.mean(vals)
                l_mesh_median = np.median(vals)
                l_mesh_dev = np.std(vals)
                vals = list(nx.get_node_attributes(G, 'local_closeness').values())
                l_close_mean = np.mean(vals)
                l_close_median = np.median(vals)
                l_close_dev = np.std(vals)
                node_density = nx.number_of_nodes(G) / topo.length.sum()

                results += [area, comp, mesh, l_mesh_mean, l_mesh_median, l_mesh_dev, l_close_mean, l_close_median, l_close_dev]
            except Exception:
                results += [None, None, None, None, None, None, None, None, None]
    
    return results

In [5]:
cities = pd.read_csv('https://gist.githubusercontent.com/martinfleis/4148eeb0ea19e7808e761e097a877196/raw/f3001eeb7294eb330c6675d1cab307cd562baa5c/major_cities.csv', index_col=0)

## worker on the same machine as scheduler

In [12]:
ddf = dd.from_pandas(cities.iloc[:28], npartitions=28)

In [13]:
ddf

Unnamed: 0_level_0,geometry
npartitions=27,Unnamed: 1_level_1
Barnsley,object
Basildon,...
...,...
Colchester,...
Coventry,...


In [15]:
vals = ddf.apply(lambda x:  check(x), axis=1, meta=pd.Series(dtype='object'))

In [16]:
computed = vals.compute()

In [17]:
computed

tcity15nm
Barnsley             [500, 0.8, 0.09230769230769231, 0.083749008432...
Basildon             [500, 0.8, 0.0608187134502924, 0.0538392863229...
Basingstoke          [500, 0.8, 0.07091469681397738, 0.064754915482...
Bath                 [500, 0.8, 0.1288805268109125, 0.0971790636111...
Bedford              [500, 0.8, 0.11612903225806452, 0.086294542518...
Birkenhead           [500, 0.8, 0.11755102040816326, 0.086451723996...
Birmingham           [500, 0.8, 0.13355048859934854, 0.103514244459...
Blackburn            [500, 0.8, 0.13704918032786886, 0.103527398220...
Blackpool            [500, 0.8, 0.18566037735849056, 0.130201667412...
Bolton               [500, 0.8, 0.13595166163141995, 0.100769073454...
Bournemouth          [500, 0.8, 0.1659919028340081, 0.1222513564717...
Bracknell            [500, 0.8, 0.07934893184130214, 0.072159803761...
Bradford             [500, 0.8, 0.1520428667113195, 0.1098673229464...
Brighton and Hove    [500, 0.8, 0.11497005988023952, 0.089599155793

## workers on cluster

In [19]:
client

0,1
Client  Scheduler: tcp://172.17.0.2:8786  Dashboard: http://172.17.0.2:8787/status,Cluster  Workers: 2  Cores: 56  Memory: 151.64 GB


In [24]:
ddf = dd.from_pandas(cities.iloc[:28], npartitions=28)

In [25]:
ddf

Unnamed: 0_level_0,geometry
npartitions=27,Unnamed: 1_level_1
Barnsley,object
Basildon,...
...,...
Colchester,...
Coventry,...


In [26]:
vals = ddf.apply(lambda x:  check(x), axis=1, meta=pd.Series(dtype='object'))

In [27]:
computed = vals.compute()

In [28]:
computed

tcity15nm
Barnsley             [500, 0.8, 0.09230769230769231, 0.083749008432...
Basildon             [500, 0.8, 0.0608187134502924, 0.0538392863229...
Basingstoke          [500, 0.8, 0.07091469681397738, 0.064754915482...
Bath                 [500, 0.8, 0.1288805268109125, 0.0971790636111...
Bedford              [500, 0.8, 0.11612903225806452, 0.086294542518...
Birkenhead           [500, 0.8, 0.11755102040816326, 0.086451723996...
Birmingham           [500, 0.8, 0.13355048859934854, 0.103514244459...
Blackburn            [500, 0.8, 0.13704918032786886, 0.103527398220...
Blackpool            [500, 0.8, 0.18566037735849056, 0.130201667412...
Bolton               [500, 0.8, 0.13595166163141995, 0.100769073454...
Bournemouth          [500, 0.8, 0.1659919028340081, 0.1222513564717...
Bracknell            [500, 0.8, 0.07934893184130214, 0.072159803761...
Bradford             [500, 0.8, 0.1520428667113195, 0.1098673229464...
Brighton and Hove    [500, 0.8, 0.11497005988023952, 0.089599155793

Less workers per machine makes it much faster...

In [36]:
ddf = dd.from_pandas(cities, npartitions=28)

In [37]:
ddf

Unnamed: 0_level_0,geometry
npartitions=28,Unnamed: 1_level_1
Barnsley,object
Bedford,...
...,...
Wolverhampton,...
York,...


In [38]:
vals = ddf.apply(lambda x:  check(x), axis=1, meta=pd.Series(dtype='object'))

In [39]:
computed = vals.compute()

In [40]:
computed

tcity15nm
Barnsley         [500, 0.8, 0.09230769230769231, 0.083749008432...
Basildon         [500, 0.8, 0.0608187134502924, 0.0538392863229...
Basingstoke      [500, 0.8, 0.07091469681397738, 0.064754915482...
Bath             [500, 0.8, 0.1288805268109125, 0.0971790636111...
Bedford          [500, 0.8, 0.11612903225806452, 0.086294542518...
                                       ...                        
Woking           [500, 0.8, 0.08414239482200647, 0.062861526938...
Wolverhampton    [500, 0.8, 0.11099020674646355, 0.080441009150...
Worcester        [500, 0.8, 0.0926130099228225, 0.0709719598774...
Worthing         [500, 0.8, 0.07547169811320754, 0.056878655718...
York             [500, 0.8, 0.10531220876048462, 0.089765475222...
Length: 112, dtype: object