Cartogram
=========
Preparing a network of OD-pairs for time cartograms, as an input for the QGIS cartogram plugin.
(Under development, not ready yet)

In [38]:
import snman
import networkx as nx
from snman import osmnx_customized as oxc
import geopandas as gpd

PERIMETER = 'zollikerberg'

# Set these paths according to your own setup
data_directory = 'C:/Users/lballo/polybox/Research/SNMan/SNMan Shared/data/'
inputs_path = data_directory + 'inputs/'
export_path = data_directory + 'outputs/_active/'
process_path = data_directory + 'process/' + PERIMETER + '/'

In [39]:
# =====================================================================================
# LOAD DATA
# =====================================================================================

print('Load perimeters')
perimeters = snman.load_perimeters(inputs_path + 'perimeters/perimeters.shp', filter=[PERIMETER])

print('Load street graph')
G = snman.io.load_street_graph(process_path + 'edges_all_attributes.gpkg', process_path + 'nodes_all_attributes.gpkg')

print('Load POIs')
poi = snman.io.load_poi(inputs_path + 'poi/poi.gpkg', perimeter=perimeters)

Load perimeters
Load street graph
Load POIs


In [40]:
def calculate_link_cost(mode, length, grade, speed_limit, cost_factor):
    if mode == 'cycling':
        return length * cost_factor
    else:
        raise 'Mode not implemented: ' + mode


def build_mode_specific_graph(G, mode, forward_key, backward_key):

    H = nx.MultiDiGraph()
    H.graph = G.graph
    H.nodes = G.nodes

    for uvk, data in G.edges.items():
        u,v,k = uvk

        length = data.get('length')
        grade = data.get('grade')
        cost_factor = 1

        forward_offer = data.get(forward_key)
        forward_properties = snman.lanes._lane_properties(forward_offer)
        if mode == 'cycling':
            cost_factor = forward_properties.cycling_cost_factor
        if forward_properties.valid:
            H.add_edge(u, v, offer=forward_offer, cost_factor=cost_factor, cost=calculate_link_cost(mode, length, grade, None, cost_factor) * 0.5)

        backward_offer = data.get(backward_key)
        backward_properties = snman.lanes._lane_properties(backward_offer)
        if mode == 'cycling':
            cost_factor = backward_properties.cycling_cost_factor
        if backward_properties.valid:
            H.add_edge(v, u, offer=backward_offer, cost=calculate_link_cost(mode, length, grade, None, cost_factor))

    return H

G_cycling = build_mode_specific_graph(G, 'cycling', 'cycling_forward', 'cycling_backward')

In [41]:
# snap poi's to the network
nodes = oxc.graph_to_gdfs(G_cycling, edges=False)
nodes['node_geom'] = nodes.geometry

poi_snapped = gpd.sjoin_nearest(poi, nodes, how='inner', max_distance=100)[['index_right','node_geom']].rename(columns={'node_geom':'geometry'}).set_geometry('geometry').set_index('index_right')

In [43]:
# Cost List

import itertools as it
import shapely as shp

ods = it.permutations(poi_snapped.index, 2)
a = gpd.GeoDataFrame(ods, columns=['origin', 'destination'])

a['origin_geom'] = poi_snapped.merge(a, left_index=True, right_on='origin')['geometry']
a['destination_geom'] = poi_snapped.merge(a, left_index=True, right_on='destination')['geometry']
a['od_line_geom'] = a.apply(lambda row: shp.LineString([row['origin_geom'], row['destination_geom']]), axis=1).set_crs(2056)
a['cycling_cost'] = a.apply(lambda row: nx.shortest_path_length(G_cycling, source=row['origin'], target=row['destination'], weight='cost'), axis=1)

a['od_path_geom'] = a.apply(lambda row: shp.LineString(nodes.loc[
    nx.shortest_path(G_cycling, source=row['origin'], target=row['destination'], weight='cost')
]['geometry']), axis=1).set_crs(2056)

a_complete = a
a = a[['origin', 'destination', 'cycling_cost', 'od_path_geom']]
a = a.set_geometry('od_path_geom')
a.to_file(export_path + 'od.gpkg')

a[['origin', 'destination', 'cycling_cost']].to_csv(export_path + 'od.csv')

#snman.export_gdf(a, export_path + 'od.gpkg')


In [44]:
# List of Points

poi_snapped.to_file(export_path + 'cartogram_points.gpkg')



In [45]:
# OD Matrix

import numpy as np
import pandas as pd

M = pd.DataFrame(index=poi_snapped.index, columns=poi_snapped.index)
M.index.name = None

for idx, row in a.iterrows():
    M.loc[row['origin'], row['destination']] = row['cycling_cost']
    M.loc[row['origin'], row['origin']] = 0
    M.loc[row['destination'], row['destination']] = 0

M.to_csv(export_path + 'od.csv')