In [None]:
import numpy as np
import shapely
import geopandas as gpd
import contextily as cx
import uuid
import time

import matplotlib.pyplot as plt
from shapely import Point, LineString, Polygon

# 1. Generating target geometry

In [None]:
# Parameters
AREA = 500_000_000_000 # m^2
NUM_LANES = 8
D_HUBS = 10_000 # m
D_LANES = 10 # m

CENTER = shapely.Point(0, 0)
RADIUS = np.sqrt(AREA / np.pi)
BOUNDS = CENTER.buffer(RADIUS)

In [None]:
seq = [D_HUBS, *[D_LANES] * (NUM_LANES - 1)]
seq = [0] + seq * int(np.floor(2 * RADIUS / np.sum(seq)))
seq = np.cumsum(seq)
seq = seq - np.max(seq) / 2

In [None]:
def consecutive_pairs(s):
    it = iter(s)
    next(it)
    return list(zip(s, it))

In [None]:
points = [Point(x, y) for x in seq for y in seq]

In [None]:
lines = []

for a, b in consecutive_pairs([-2 * RADIUS, *seq, 2 * RADIUS]):
    lines.extend(LineString(((x, a), (x, b))) for x in seq)
    lines.extend(LineString(((a, y), (b, y))) for y in seq)

In [None]:
polygons = []

for a, b in consecutive_pairs([-2 * RADIUS, *seq, 2 * RADIUS]):
    for c, d in consecutive_pairs([-2 * RADIUS, *seq, 2 * RADIUS]):
        polygons.append(Polygon(((a, c), (a, d), (b, d), (b, c))))

In [None]:
def create_gdf(geometry, rotation=45):
    """
    Create a GeoDataFrame from a list of shapely geometries.
    """

    gdf = gpd.GeoDataFrame(geometry=geometry, crs='epsg:3857')
    gdf.geometry = gdf.geometry.intersection(BOUNDS)
    gdf = gdf[~gdf.geometry.is_empty]
    gdf.geometry = gdf.geometry.rotate(rotation, origin=CENTER)
    gdf['uuid'] = gdf.apply(lambda _: str(uuid.uuid4()), axis=1)
    gdf.reset_index(drop=True, inplace=True)

    return gdf

In [None]:
gdf_points   = create_gdf(points)
gdf_lines    = create_gdf(lines)
gdf_polygons = create_gdf(polygons)

In [None]:
# fig, axs = plt.subplots(ncols=3, figsize=(10, 10))
# gdf_points  .to_crs('epsg:4326').translate(9.902056, 49.843).plot(ax=axs[0], alpha=0.5)
# gdf_lines   .to_crs('epsg:4326').translate(9.902056, 49.843).plot(ax=axs[1], alpha=0.5)
# gdf_polygons.to_crs('epsg:4326').translate(9.902056, 49.843).plot(ax=axs[2], alpha=0.5, color=['red' if i % 2 == 0 else 'blue' for i in gdf_polygons.index])

# for ax in axs:
#     ax.set_xticks([])
#     ax.set_yticks([])
#     cx.add_basemap(ax, crs='epsg:4326', source=cx.providers.OpenStreetMap.Mapnik)

# 2. Generating benchmark points

In [None]:
GPS_ACCURACY = 1 # m
gdf_benchmark = gpd.GeoDataFrame(geometry=gdf_points.geometry.buffer(GPS_ACCURACY))

In [None]:
def generate_points(points):
    offsets = np.random.normal((0, 0), (GPS_ACCURACY, GPS_ACCURACY), size=(len(points), 2))
    return [Point(x, y) for x, y in np.array(points) + offsets]

In [None]:
def create_benchmark(src, geometry_func=None, explode_func=None):
    assert isinstance(src, gpd.GeoDataFrame)
    assert 'geometry' in src.columns and 'uuid' in src.columns
    assert explode_func is not None

    gdf = src.copy()

    if geometry_func is not None:
        gdf.geometry = geometry_func(gdf.geometry)
        gdf = gdf[~gdf.geometry.is_empty]
    
    gdf['expl'] = gdf.geometry.apply(explode_func)
    gdf = gdf.explode('expl')

    gdf.expl = generate_points([(p.x, p.y) for p in gdf.expl])

    gdf.drop(columns=['geometry'], inplace=True)
    gdf.rename(columns={'expl': 'geometry', 'uuid': 'target_uuid'}, inplace=True)
    gdf.reset_index(drop=True, inplace=True)
    gdf.set_geometry('geometry', crs='epsg:3857', inplace=True)

    return gdf

In [None]:
gdf_point_point = create_benchmark(
    gdf_points,
    explode_func=lambda p: [p]
)

len(gdf_point_point)

In [None]:
gdf_point_line = create_benchmark(
    gdf_lines,
    geometry_func=lambda g: g.segmentize(D_HUBS / (5 * GPS_ACCURACY)),
    explode_func=lambda l: [
        Point(
            np.mean([l.coords[0][0], l.coords[1][0]]),
            np.mean([l.coords[0][1], l.coords[1][1]]),
        )
    ],
)

len(gdf_point_line)

In [None]:
gdf_point_polygon = create_benchmark(
    gdf_polygons,
    geometry_func=lambda g: g.segmentize(D_HUBS / (5 * GPS_ACCURACY)).buffer(-D_LANES / 4),
    explode_func=lambda p: [Point(c) for c in p.exterior.coords],
)

len(gdf_point_polygon)

# 3. Example joins

In [None]:
def evaluate_join(gdf_target, gdf):
    assert 'target_uuid' in gdf_target.columns and 'uuid' in gdf.columns

    st = time.time()
    result = gdf.sjoin_nearest(gdf_target, how='left', distance_col='distance')
    et = time.time()

    print(f'Time: {et - st:.2f} sec')

    correct = (result.uuid == result.target_uuid)
    print(f'Accuracy: {correct.sum() / len(correct)}')

In [None]:
evaluate_join(gdf_point_point, gdf_points)

In [None]:
evaluate_join(gdf_point_line, gdf_lines)

In [None]:
evaluate_join(gdf_point_polygon, gdf_polygons)

# 4. Export

In [None]:
import os
os.makedirs('shape/points', exist_ok=True)
os.makedirs('shape/lines', exist_ok=True)
os.makedirs('shape/polygons', exist_ok=True)
gdf_points  .to_crs('epsg:4326').to_file('shape/points/points.shp')
gdf_lines   .to_crs('epsg:4326').to_file('shape/lines/lines.shp')
gdf_polygons.to_crs('epsg:4326').to_file('shape/polygons/polygons.shp')