In [1]:
%pip install h3
%pip install tqdm
%pip install psycopg[binary]

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [2]:
import h3
import tqdm
import json
import numpy as np
import pandas as pd
import asyncio
import psycopg

### Calculate Transit Accessibility Scores
#### Cells
* `-1`: geo cell, no station here
* ` 0`: bus stop
* ` 1`: tram / ferry stop
* ` 2`: metro / s-tog station
* ` 3`: train station

#### About
This notebook performs the transit accessibility score calculation on top of a postgres database.

In [2]:
H3_RES = 11
HOURS = 24 * 7
TRANSIT_TYPE = 2
MAX_DISTANCE = 24

In [3]:
DB_CONN = "postgresql://postgres:byS*<7AxwYC#U24s@srv-captain--postgres-db-db/postgres"

In [133]:
# reset scores in database
async with await psycopg.AsyncConnection.connect(DB_CONN) as conn:
    async with conn.cursor() as cursor:
        await cursor.execute(f"""
        update transit
            set scores = array []::float4[], 
            visitors = array []::text[]
            where cardinality(visitors) != 0
        """)

        await conn.commit()

In [4]:
# for development only, select a specific h3_4 cell
SELECTED_H3_4 = '841f059ffffffff'
# SELECTED_H3_4_QUERY = f"and h3_4 = '{SELECTED_H3_4}'"
SELECTED_H3_4_QUERY = ""

#### Load entrypoint (station) data

In [5]:
# load all stops for a specific type from the database
async with await psycopg.AsyncConnection.connect(DB_CONN) as conn:
    async with conn.cursor() as cursor:
        await cursor.execute(f"""
        select * from transit
            where type = {TRANSIT_TYPE}
            {SELECTED_H3_4_QUERY}
        """)

        stations = await cursor.fetchall()

In [6]:
# add all stations to a queue
queue = asyncio.Queue()
for station in stations:
    # (station origin h3 cell, current h3 cell, scores array, distance)
    await queue.put((station[0], station[0], station[3], 0))

#### Main Functions

In [7]:
# define formula here
async def calc_score(origin_scores, distance, type):
    result_scores = []
    for score in origin_scores:
        if score > 1 and distance < MAX_DISTANCE:
            result_scores.append(score - 1)
        else:
            result_scores.append(0)
    return result_scores

In [8]:
async def update_scores(origin_h3, current_h3, scores, distance):
    print(f"\rcalculating {current_h3}, dist={distance + 1}")
    # get h3 k-ring for origon_h3
    neighbors = h3.k_ring(current_h3, k= 1)
    neighbors.discard(current_h3)

    # get neighbors from DB
    async with await psycopg.AsyncConnection.connect(DB_CONN) as conn:
        async with conn.cursor() as cursor:
            await cursor.execute("""
                select h3 from transit
                    where h3 = any(%s)
                    and %s != all(visitors)
            """, [list(neighbors), origin_h3])

            db_neighbors = await cursor.fetchall()

            # update scores for neighbors according to calc_score function
            for neighbor in db_neighbors:
                neighbor = neighbor[0]

                score = await calc_score(scores, distance + 1, TRANSIT_TYPE)
                if sum(score) > 0:
                    await cursor.execute(f"""
                        update transit
                            set scores = array_cat(scores, array {score}),
                            visitors = array_append(visitors, '{origin_h3}')
                            where h3 = '{neighbor}'
                    """)

                    if distance < MAX_DISTANCE:
                        await queue.put((origin_h3, neighbor, score, distance + 1))
                    
        await conn.commit()

#### Execution Loop

In [None]:
while not queue.empty():
    current = await queue.get()
    await asyncio.gather(
        update_scores(current[0], current[1], current[2], current[3])
    )

#### Aggregate results and export for visualization

In [140]:
# get all h3_4 regions and perform the aggregation on a per-region basis
async with await psycopg.AsyncConnection.connect(DB_CONN) as conn:
    async with conn.cursor() as cursor:
        await cursor.execute("""
        select distinct(h3_4) from transit
        """)

        h3_4_regions = await cursor.fetchall()

In [141]:
h3_4_regions = list(map(lambda row: row[0], h3_4_regions))

In [142]:
async def aggregate_region(h3_4):
    # load the results for a single region
    async with await psycopg.AsyncConnection.connect(DB_CONN) as conn:
        async with conn.cursor() as cursor:
            await cursor.execute(f"""
            select h3_10, freq, scores, cardinality(visitors), type from transit
                where h3_4 = '{h3_4}'
                and cardinality(visitors) > 0
                order by h3_10
            """)

            hexagons = await cursor.fetchall()
            
            result = [] # at res 10
            for group in pd.DataFrame(hexagons, columns=['h3', 'freq', 'scores', 'cardinality', 'type']).groupby('h3'):
                score_sum = np.array([0]*HOURS)
                
                # res 11
                for idx, array in enumerate(group[1]['scores']):
                    cardinality = group[1]['cardinality'].iloc[idx]
                    # visitors for res 11 cell
                    for subarray in np.array_split(array, cardinality):
                        # overlap
                        score_sum = np.add(score_sum, subarray)
                    # frequency for res 11 (if stop)
                    score_sum = np.add(score_sum, group[1]['freq'].iloc[idx])
                # add aggregate result to overall result
                result.append([group[0], max(group[1]['type']), list(score_sum / len(group[1]))])

            pd.DataFrame(result, columns=['h3', 'type', 'freq']).to_json(f"../docs/h3/{h3_4}.json", orient='records')


In [143]:
# export loop
for h3_4 in h3_4_regions:
    await asyncio.gather(
        aggregate_region(h3_4)
    )