In [76]:
import json

import numpy as np
import pandas as pd
from scipy.optimize import leastsq

In [77]:
## Reverse-engineered matching road graph nodes
# Node coordinates probably in Transverse Mercator
node_ids = [105, 741, 90, 732, 567, 580, 924, 6099, 59, 601, 1351, 133]
# Node coordinates in WGS84
other_node_ids = [6091, 3891, 3815, 4582, 3890, 3808, 3809, 10529, 6927, 4237, 3835, 4218]

In [78]:
## Build other road graph
nodes_path = "node_shenzhen.csv"
edges_path = "edge_shenzhen.csv"

nodes_df = pd.read_csv(nodes_path)
edges_df = pd.read_csv(edges_path)

# https://github.com/tsinghua-fib-lab/City-Camera-Trajectory-Data/blob/main/example/usage.ipynb
# nodes = [
#     (
#         int(nid),
#         {
#             "id": int(nid),
#             "x": float(lon),
#             "y": float(lat),
#             "has_camera": bool(hasc)
#         }
#     ) for _, (nid, lon, lat, hasc) in nodes_df.iterrows()
# ]
# edges = [
#     (
#         int(o),
#         int(d),
#         {
#             "id": i,
#             "od": [int(o), int(d)],
#             "class": c,
#             "points": [[float(x) for x in p.split("-")] for p in geo.split("_")],
#             "length": float(l)
#         }
#     ) for i, (o, d, c, geo, l) in edges_df.iterrows()
# ]
#
# for e in edges:
#     e[2]["geometry"] = shp.LineString(e[2]["points"])
#
# road_graph = nx.DiGraph()
# road_graph.add_nodes_from(nodes)
# road_graph.add_edges_from(edges)
# road_graph.graph["crs"] = "WGS84"
#
# # Visualize other road graph
# fig, ax = ox.plot_graph(
#     nx.MultiDiGraph(road_graph),
#     bgcolor="white",
#     node_size=2,
#     node_color="blue",
#     edge_color="black")
#
# for node, data in road_graph.nodes(data=True):
#     if node in other_node_ids:
#         x, y = data["x"], data["y"]
#         ax.text(x, y, str(node), fontsize=1, color="red", ha="center", va="center")
#
# fig.savefig("other_road_graph.svg")
# plt.show()

In [79]:
# Fetch node coordinates
other_nodes_dict = dict()
for _, row in nodes_df.iterrows():
    node_id = int(row["NodeID"])
    if node_id in other_node_ids:
        other_nodes_dict[node_id] = {"lon": float(row["Longitude"]), "lat": float(row["Latitude"])}

nodes_dict = dict()
dataset_path = "../datasets/UrbanVehicle"
with open(f"{dataset_path}/map.json", mode="r", encoding="utf-8") as file:
    items = json.load(file)

    for item in items:
        node_id = item["id"]
        if item["type"] == "node" and node_id in node_ids:
            nodes_dict[node_id] = {"x": item["xy"][0], "y": item["xy"][1]}

In [80]:
gps_points = np.array(
    [[other_nodes_dict[node_id]["lon"], other_nodes_dict[node_id]["lat"]] for node_id in other_node_ids]).astype(
    np.float64)
xy_points = np.array([[nodes_dict[node_id]["x"], nodes_dict[node_id]["y"]] for node_id in node_ids]).astype(np.float64)


## Affine transformation
def affine_transform(params, xy_points):
    a, b, c, d, e, f = params
    x, y = xy_points[:, 0], xy_points[:, 1]
    lon = a * x + b * y + e
    lat = c * x + d * y + f
    return np.vstack([lon, lat]).T


def error_func(params, xy_points, gps_points):
    transformed = affine_transform(params, xy_points)
    return (transformed - gps_points).flatten()


initial_guess = [1, 0, 0, 1, 0, 0]
optimal_params, _ = leastsq(error_func, initial_guess, args=(xy_points[:6], gps_points[:6]))

print(affine_transform(optimal_params, xy_points[6:]))
print(gps_points[6:])

[[114.03568464  22.60114097]
 [114.00505659  22.67658253]
 [114.00534317  22.67982599]
 [114.05563285  22.62509997]
 [114.05030638  22.62529856]
 [114.06003314  22.61540274]]
[[114.035699  22.601163]
 [114.005028  22.676499]
 [114.005181  22.679942]
 [114.055657  22.625086]
 [114.050207  22.625298]
 [114.060067  22.615359]]
