In [1]:
import pharmalink.code.data as data
import requests as req
import osmium as osm
import docker as docker
import json as json
import time
import os
import geopandas as gpd
import pandas as pd
import aiohttp
import asyncio
import pharmalink.code.mot_eval as mot_eval

# regkey = "09663"
regkey = "09162"
model = data.Data(regkey)

In [None]:
# Goal is to select one of the 3 closest pharmacies for each customer with a probability based on distance

pharmacies = gpd.GeoDataFrame(model.Pharmacies.pharmacies)
customers = gpd.GeoDataFrame(model.Customers.customers)
ph_locs = pharmacies["location"]
cus_locs = customers["location"]

# Convert pharmacy and customer locations to appropriate UTM CRS to calculate accurate distances
utm_ph_locs = ph_locs.to_crs(crs=ph_locs.estimate_utm_crs())
utm_cus_locs = cus_locs.to_crs(crs=cus_locs.estimate_utm_crs())

chosen_pharmacies = []

for cus in utm_cus_locs:
    # Find the 3 pharmacies closest to the customer
    closest = utm_ph_locs.distance(cus).sort_values(axis=0, ascending=True).head(3)

    # Choose from the nearest pharmacies with a probability based on distance
    chosen_pharmacies.append(closest.sample(weights=1 / closest).index[0])

customers["chosen_pharmacy"] = chosen_pharmacies

In [None]:
def build_valhalla_tiles(model, docker_client):

    bounds = model.AreaGeometry.geometry.bounds
    regkey = model.RegKey.regkey

    # Query Overpass API for OSM data within bounds
    query = f"""
    [out: xml];
    (nwr({bounds.miny[0]},{bounds.minx[0]},{bounds.maxy[0]},{bounds.maxx[0]}); );
    (._;>;);
    out;"""

    overpass_url = "https://overpass-api.de/api/interpreter"
    valhalla_raw_data = req.get(overpass_url, params={"data": query})

    # Convert Valhalla raw data to PBF file format
    class FileWriter(osm.SimpleHandler):
        def __init__(self, writer):
            super(FileWriter, self).__init__()
            self.writer = writer

        def node(self, n):
            self.writer.add_node(n)

        def way(self, w):
            self.writer.add_way(w)

        def relation(self, r):
            self.writer.add_relation(r)

    writer = osm.SimpleWriter(f"./pharmalink/valhalla-{regkey}/valhalla-input.osm.pbf")
    handler = FileWriter(writer)

    handler.apply_buffer(valhalla_raw_data.content, "osm")
    writer.close()

    # Pull latest Valhalla Docker image
    docker_client.images.pull("ghcr.io/gis-ops/docker-valhalla/valhalla", tag="latest")

    # Remove any existing Valhalla builder containers
    for container in docker_client.containers.list(
        all=True, filters={"name": f"pharmalink-valhalla-builder"}
    ):
        container.remove(force=True)

    # Run Valhalla container to convert PBF file to Valhalla tiles
    builder = docker_client.containers.run(
        image="ghcr.io/gis-ops/docker-valhalla/valhalla:latest",
        detach=True,
        name="pharmalink-valhalla-builder",
        volumes=[f"{os.getcwd()}/pharmalink/valhalla-{regkey}:/custom_files"],
        healthcheck={
            "test": "curl --fail -s http://localhost:8002/status || exit 1",
            "interval": 5000000000,
            "timeout": 1000000000,
            "retries": 10,
            "start_period": 2000000000,
        },
        environment={
            "build_elevation": True,
            "use_default_speeds_config": True,
            "serve_tiles": False,
        },
    )

    # Wait for Valhalla tile conversion to complete
    while (
        builder.status != "exited"
        and builder.logs(tail=1) != "INFO: Not serving tiles. Exiting."
    ):
        builder.reload()
        time.sleep(1)

    # Remove Valhalla tile builder container
    builder.remove(force=True)

    return


def create_valhalla_container(regkey):

    # Pull latest Valhalla Docker image
    docker_client.images.pull("ghcr.io/gis-ops/docker-valhalla/valhalla", tag="latest")

    # Remove any existing Valhalla server containers
    for container in docker_client.containers.list(
        all=True, filters={"name": "pharmalink-valhalla-server"}
    ):
        container.remove(force=True)

    container = docker_client.containers.run(
        image="ghcr.io/gis-ops/docker-valhalla/valhalla:latest",
        detach=True,
        name=f"pharmalink-valhalla-server",
        ports={8002: None},
        volumes=[f"{os.getcwd()}/pharmalink/valhalla-{regkey}:/custom_files"],
        healthcheck={
            "test": "curl --fail -s http://localhost:8002/status || exit 1",
            "interval": 5000000000,
            "timeout": 1000000000,
            "retries": 10,
            "start_period": 2000000000,
        },
    )

    while container.health != "healthy":
        time.sleep(1)
        container.reload()

    # Pause Valhalla container after successful startup
    container.pause()

    return container


# Check if matching valhalla tile data already exists

valhalla_path = f"{os.getcwd()}/pharmalink/valhalla-{regkey}"

# Connect to Docker daemon
docker_client = docker.from_env()


if os.path.exists(valhalla_path):
    valhalla_tiles = valhalla_path
else:
    os.makedirs(valhalla_path)
    bounds = model.AreaGeometry.geometry.bounds
    valhalla_tiles = build_valhalla_tiles(model, docker_client)

container = create_valhalla_container(regkey)

In [None]:
%autoawait

async def fetch_trip(session, cus_id, locations, mot):
    params = {
        "id": cus_id,
        "locations": locations,
        "costing": mot,
        "units": "kilometers",
        "directions_type": "none",
    }
    async with session.get("/route", json=params) as response:
        return mot, await response.json()


async def process_customer(session, mot_evaluator, cus):
    cus_id = str(cus[0])
    cus_loc = cus[1]
    ph_loc = pharmacies.loc[cus[2]]["location"]

    start = end = {"lat": cus_loc.y, "lon": cus_loc.x}
    target = {"lat": ph_loc.y, "lon": ph_loc.x}

    locations = [start, target, end]

    tasks = []
    for mot in ["auto", "bicycle", "pedestrian"]:
        tasks.append(fetch_trip(session, cus_id, locations, mot))

    # results = {cus[0]: {}}

    results = {}

    for task in asyncio.as_completed(tasks):
        mot, result = await task

        # Check if response id matches customer id
        #if result["id"] == cus_id:
        #    del result["id"]

        if "trip" in result:
            results[mot] = result["trip"]

    avg_length = sum([results[mot]["summary"]["length"] for mot in results]) / len(results)

    # Choose a probable mode of transport based on the average trip length
    chosen_mot = mot_evaluator.evaluate_mot(avg_length, results.keys())

    # Add the chosen mode of transport to the results
    results[chosen_mot]["mot"] = chosen_mot

    return {cus_id: results[chosen_mot]}

async def calculate_trips():

    # Start Valhalla container
    container.unpause()

    # Find Valhalla API endpoint
    valhalla_api = f"http://localhost:{container.ports['8002/tcp'][0]['HostPort']}"

    # Instantiate a mode of transport evaluator
    mot_evaluator = mot_eval.MOTEvaluator()

    trips = {}

    async with aiohttp.ClientSession(base_url=valhalla_api, timeout=aiohttp.ClientTimeout(600), connector=aiohttp.TCPConnector(limit=0)) as session:
        tasks = []
        for cus in customers.itertuples():
            tasks.append(process_customer(session, mot_evaluator, cus))

        for task in asyncio.as_completed(tasks):
            trips.update(await task)

    # Stop Valhalla container
    container.pause()

    return trips

# Run the main function
raw_trips = await calculate_trips()

IPython autoawait is `on`, and set to use `asyncio`


APIError: 500 Server Error for http+docker://localhost/v1.46/containers/899fae61422bc32f4d54067d08b618579c353983d43adf737f5584764c0fe7e7/unpause: Internal Server Error ("Container 899fae61422bc32f4d54067d08b618579c353983d43adf737f5584764c0fe7e7 is not paused")

In [None]:
trips = pd.DataFrame.from_dict(
    raw_trips, orient="index", columns=["locations", "legs", "summary", "mot"]
)
trips.index.name = "cus_id"
trips.index = trips.index.astype(int)
trips = trips.sort_index()
trips = trips.join(pd.json_normalize(trips["summary"]))
trips = trips.drop(
    columns=["summary", "has_time_restrictions", "has_toll", "has_highway", "has_ferry"]
)

In [None]:
total = trips.groupby("mot").sum()
total = total[["time", "length"]]
total["time"] = total["time"] / 3600
total

Unnamed: 0_level_0,time,length
mot,Unnamed: 1_level_1,Unnamed: 2_level_1
auto,489.134302,10448.207
bicycle,108.454661,1713.406
pedestrian,387.828219,1872.799
