In [17]:
import requests
import geopandas as gpd
import pandas as pd
from shapely.geometry import Point, Polygon
from scipy.spatial import Voronoi
import matplotlib.pyplot as plt
import numpy as np
from geopy.distance import geodesic
import folium

In [None]:
def get_maps_df(tag, search, south, west, north, east):
    """
    query overpass for osm pois matching a tag/value in a bounding box
    returns a dataframe with lat, lon, and name
    """
    # build overpass query using bounding box
    query = f"""[out:json];
    node["{tag}"="{search}"]({south},{west},{north},{east});
    out center;
    """
    url = "https://overpass-api.de/api/interpreter"

    # send request to overpass api
    response = requests.get(url, params={'data': query})
    print(response)

    # parse json response into dataframe
    data = pd.DataFrame(response.json()["elements"])

    # extract name from tags dict if present
    data["name"] = data["tags"].apply(
        lambda x: x.get("name") if isinstance(x, dict) else None
    )

    # drop unused osm metadata columns
    data.drop(columns=["tags", "type", "id"], inplace=True)
    return data


def import_pop_data(filename="data/processed_pop.csv"):
    """
    load population block data from csv
    """
    df = pd.read_csv("data/out.csv")
    return df


def get_local_data(pop_data, tag, search, south, west, north, east):
    """
    filter population data to a bounding box and fetch existing pois
    """
    # get existing osm pois in the same bounding box
    existing = get_maps_df(tag, search, south, west, north, east)

    # extract latitude and longitude series
    lats = pop_data["lat"]
    lons = pop_data["lon"]

    # build bounding box mask
    mask = (
        (lats >= south) &
        (lats <= north) &
        (lons >= west) &
        (lons <= east)
    )

    # keep only population blocks inside box
    pop_data = pop_data.loc[mask]

    return existing, pop_data


def haversine_matrix(a, b):
    """
    compute pairwise haversine distances between two sets of lat/lon points
    returns distances in km
    """
    R = 6371.0  # earth radius in km

    # convert degrees to radians
    a = np.radians(a)
    b = np.radians(b)

    # pairwise coordinate differences
    dlat = a[:, None, 0] - b[None, :, 0]
    dlon = a[:, None, 1] - b[None, :, 1]

    # haversine formula
    h = (
        np.sin(dlat / 2) ** 2
        + np.cos(a[:, None, 0])
        * np.cos(b[None, :, 0])
        * np.sin(dlon / 2) ** 2
    )

    return 2 * R * np.arcsin(np.sqrt(h))


def calculate_rating(point, existing, localpop):
    """
    estimate market size for a candidate point using a gravity-style model
    """
    # population block coordinates and populations
    block_coords = localpop[["lat", "lon"]].to_numpy()
    block_pop = localpop["Person"].to_numpy()

    # existing poi coordinates
    existing_coords = existing[["lat", "lon"]].to_numpy()

    # distances from blocks to existing pois
    d_existing = haversine_matrix(block_coords, existing_coords)

    # avoid division by zero
    d_existing = np.maximum(d_existing, 1e-6)

    # total competitive draw from existing pois
    existing_draw = np.sum(1 / d_existing, axis=1)

    # distances from blocks to candidate point
    d_point = haversine_matrix(
        block_coords,
        np.array(point).reshape(1, 2)
    ).ravel()
    d_point = np.maximum(d_point, 1e-6)

    # candidate draw
    draw = 1 / d_point

    # allocate population proportionally between candidate and competitors
    contributions = block_pop * draw / (draw + existing_draw)

    return contributions.sum()


def compare_candidates(candidates: list[tuple[float, float]], existing, pop_data):
    """
    compute market size for each candidate location and rank them
    """
    # initialise results dataframe
    results = pd.DataFrame({
        "candidate_location": candidates
    })

    # apply rating function to each candidate
    results["market_size"] = results["candidate_location"].apply(
        calculate_rating,
        args=(existing, pop_data)
    )

    # split tuple coordinates into separate columns
    lat = [x[0] for x in candidates]
    lon = [x[1] for x in candidates]
    results["lat"] = lat
    results["lon"] = lon

    # drop tuple column now that lat/lon exist
    results.drop(columns=["candidate_location"], inplace=True)

    # return sorted by market size descending
    return results.sort_values(
        "market_size",
        ascending=False
    )[["lat", "lon", "market_size"]]


def plot_pois(data, map_center):
    """
    plot poi markers on a folium map
    """
    m = folium.Map(location=map_center, zoom_start=15)

    for row in data.itertuples(index=False):
        folium.Marker(
            location=(float(row.lat), float(row.lon)),
            popup=getattr(row, "name", None),
            icon=folium.Icon(color="blue", icon="info-sign")
        ).add_to(m)

    return m


def normalise_size(number):
    """
    convert population size to a bounded marker radius
    """
    return max(10 - 100 / (number + 10), 1)


def perc_to_col(x):
    """
    map a normalised score to a discrete folium icon colour
    """
    if x > 0.9:
        return "blue"
    if x > 0.5:
        return "purple"
    return "pink"


def plot_candidates(candidates_rated, existing, pop_data, map_centre):
    """
    plot existing pois, population blocks, and candidate locations on a map
    """
    m = folium.Map(location=map_centre, zoom_start=15)

    # plot existing competitors
    for row in existing.itertuples(index=False):
        folium.CircleMarker(
            location=(row.lat, row.lon),
            radius=4,
            fill=True,
            fill_opacity=1,
            color="orange"
        ).add_to(m)

    # plot population blocks with size-scaled markers
    for row in pop_data.itertuples(index=False):
        folium.CircleMarker(
            location=(row.lat, row.lon),
            radius=normalise_size(row.Person),
            color="green",
            fill=True,
            fill_opacity=0.4,
            popup=f"Population = {row.Person}"
        ).add_to(m)

    # copy to avoid mutating caller dataframe
    candidates_rated = candidates_rated.copy()

    # normalise market sizes for colouring
    max_msize = max(candidates_rated["market_size"])
    candidates_rated["market_norm"] = (
        candidates_rated["market_size"] / max_msize
    )

    # plot candidate locations
    for row in candidates_rated.itertuples(index=False):
        folium.Marker(
            location=(row.lat, row.lon),
            icon=folium.Icon(
                color=perc_to_col(row.market_norm),
                icon="info-sign"
            ),
            popup=f"Est. Market Size = {round(row.market_size, 1)}"
        ).add_to(m)

    return m


def calc_and_plot(candidates, tag, search, south, west, north, east, filename):
    """
    full pipeline: load data, score candidates, plot, and save map
    """
    pop_data = import_pop_data()

    # restrict data to area of interest
    existing, pop_data = get_local_data(
        pop_data,
        tag,
        search,
        south=south,
        west=west,
        north=north,
        east=east
    )

    # evaluate candidate market sizes
    candsdf = compare_candidates(candidates, existing, pop_data)

    # compute map centre from bounding box
    centre = ((south + north) * 0.5, (east + west) * 0.5)

    # build and save map
    m = plot_candidates(candsdf, existing, pop_data, centre)
    m.save(filename)

    return candsdf


In [19]:
south, west, north, east = -37.825, 144.94, -37.80, 145.00 # define the corners of the area we want to consider

# example locations we might have to choose from (these examples were chosen somewhat randomly)
candidates = [
    (-37.810, 144.963), # la trobe st, cbd
    (-37.810, 144.978), # corner of albert and landowne st, east melb
    (-37.804, 144.984), # smith street, collingwood
    (-37.820, 144.990), # punt road, richmond
    (-37.816, 144.964)  # cnr elizabeth and collins st, cbd
              ]

In [20]:
"""
fast food chains example
open the file to see the map
key:
    green = population blocks
    orange = existing locations (existing fast food restaurants)
    blue/purple/pink = candidates (colour indicates market size, blue being best, pink worst)
returns df containing the estimated market size of each location
"""
calc_and_plot(candidates, tag="amenity", search="fast_food", south=south,west=west,north=north,east=east, filename="fast_food_map.html")

<Response [200]>


Unnamed: 0,lat,lon,market_size
2,-37.804,144.984,564.205275
0,-37.81,144.963,507.345417
3,-37.82,144.99,463.122508
4,-37.816,144.964,419.089015
1,-37.81,144.978,416.086775


In [22]:
"""
car repair example
open the file to see the map
key:
    green = population blocks
    orange = existing locations (existing mechanics)
    blue/purple/pink = candidates (colour/size indicates market size, blue being best)
returns df containing the estimated market size of each location
market size is much bigger than for fast food, as there are far fewer mechanic shops, meaning the total market is less split up
"""
calc_and_plot(candidates, tag="shop", search="car_repair", south=south,west=west,north=north,east=east, filename="car_repair_map.html")

<Response [200]>


Unnamed: 0,lat,lon,market_size
0,-37.81,144.963,21642.300163
4,-37.816,144.964,18138.554416
1,-37.81,144.978,12769.161519
2,-37.804,144.984,11448.562047
3,-37.82,144.99,9328.496646
