In [None]:
import numpy as np
import pyproj
import math
from scipy.spatial.transform import Rotation
from scipy.optimize import minimize, least_squares, Bounds, minimize_scalar, root_scalar
from scipy.stats import linregress
from scipy.interpolate import CubicSpline, splprep
from scipy.integrate import quad

EARTH_RADIUS = 6371e3
DUE_EAST = 90
FOLLOW_DISTANCE = 3.0
CAR = 1
BROADCAST_INTERVAL = 0.1
CAR_LENGTH = 0.779
KPV = 0.3
KDV = 0.5#0.3
# KPH = 0.2
# KDH = 0.6
K = 0.3
WHEELBASE = 0.48
MAX_STEER = 30
geodesic = pyproj.Geod(ellps='WGS84')

### NOTE: cos is usually associated with x and sin with y, but our reference point (the running track) faces directly north-south
# This is perfect because it means we don't have to rotate the car coordinates to match the track frame since the equirectangular projection already assumes
# this orientation. However, when we graph it it means that the long side is the y-axis and the short side is the x-axis.
# Normally in ROS we take x to be forward and y to be left, but with this track orientation forward is actually y and left is x
# so to make the vx and vy consistent with the graph, we have to switch the sin and cos in the equations

# Center of the running track; serves as reference point
import json
with open("/Users/owenburns/workareas/cavrel-platooning/missions/tracks/bus_loop_large.json", 'r') as f:
    track = json.load(f)

center_latitude = track['center']['lat']
center_longitude = track['center']['lon']
center_orientation = DUE_EAST

datapoints = []

def coords_to_local(target_lat, target_lon):
    # Convert lat, lon to radians
    target_lat_rad, target_lon_rad = math.radians(target_lat), math.radians(target_lon)
    current_lat_rad, current_lon_rad = math.radians(center_latitude), math.radians(center_longitude)

    x = EARTH_RADIUS * (target_lon_rad - current_lon_rad) * math.cos((current_lat_rad + target_lat_rad) / 2)
    y = EARTH_RADIUS * (target_lat_rad - current_lat_rad)

    angle = math.radians(center_orientation - DUE_EAST)
    qx = math.cos(angle) * x - math.sin(angle) * y
    qy = math.sin(angle) * x + math.cos(angle) * y
    return qx, qy

def get_goal_motion(ego_car, target_cars, prev_ego_vector):
    targets = []
    for i in range(CAR - 1, -1, -1):
        (lat1, lon1, head1, time1), (lat2, lon2, head2, time2) = target_cars[i][-2:]

        x1, y1 = coords_to_local(lat1, lon1)
        x2, y2 = coords_to_local(lat2, lon2)

        velocity = np.sqrt((x2 - x1)**2 + (y2 - y1)**2)/(time2 - time1)
        heading = head2
        targets.append((x2, y2, heading, velocity, CAR - 1 - i))

    ex, ey = coords_to_local(ego_car[1][0], ego_car[1][1])
    _, _, eh, ev = prev_ego_vector
    tmp = [(x, y, h, v) for x, y, h, v, _ in targets] + [(ex, ey, eh, ev)]
    datapoints.append(tmp)

    def minimization_objective(params):
        v, head = params
        head = np.radians(head)
        x, y = coords_to_local(ego_car[1][0], ego_car[1][1])

        total_cost = 0
        for target in targets:
            x_target, y_target, head_target, v_target, position = target
            head_target = np.radians(head_target)
            position += 1
            goal_follow_distance = FOLLOW_DISTANCE*position + CAR_LENGTH*(position - 1)

            # simulate the motion of the cars
            x_sim_target = x_target + v_target*np.sin(head_target)*BROADCAST_INTERVAL
            y_sim_target = y_target + v_target*np.cos(head_target)*BROADCAST_INTERVAL
            x_sim_ego = x + v*np.sin(head)*BROADCAST_INTERVAL
            y_sim_ego = y + v*np.cos(head)*BROADCAST_INTERVAL

            # determine the point where the following car *should* be to be perfectly maintaining its following distance
            x_goal = x_sim_target - goal_follow_distance*np.sin(head_target)
            y_goal = y_sim_target - goal_follow_distance*np.cos(head_target)

            # the cost is the distance between the actual simulated position of the following car and the ideal position
            total_cost += np.sqrt((x_goal - x_sim_ego)**2 + (y_goal - y_sim_ego)**2)

        return total_cost
    
    # Set the bounds for the optimization, and set the initial guess to the previous ego vector
    bounds = Bounds([0, -360], [10, 360])
    # _, _, eh, es = prev_ego_vector
    # [es, eh]

    # use the motion of the immediately leading target car as the guess
    _, _, head, v, _ = targets[0]

    # Use the 'SLSQP' method with bounds
    res = minimize(minimization_objective, [v, head], method='SLSQP', bounds=bounds)

    return res, targets

def velocity_controller(v, v_ego):
    accel = KPV*(v - v_ego) + KDV*(v - v_ego)/BROADCAST_INTERVAL
    return accel

# def distance_to_line(x0, y0, dx, dy, x, y):
#     lambda_val = ((x - x0) * dx + (y - y0) * dy) / (dx**2 + dy**2)
#     closest_point = np.array([x0 + lambda_val * dx, y0 + lambda_val * dy])
#     distance = np.linalg.norm(closest_point - np.array([x, y]))
#     return distance, closest_point

# def stanley_controller(ego_car, head_ego, v_ego, target_head, targets):
#     points = []
#     initial_guess = None

#     # get the local coordinates of the last two locations of all other cars
#     for i in range(CAR):
#         (lat1, lon1, _, _), (lat2, lon2, _, _) = targets[i][-2:]
#         x1, y1 = coords_to_local(lat1, lon1)
#         x2, y2 = coords_to_local(lat2, lon2)
#         points.append((x1, y1))
#         points.append((x2, y2))

#         if i == CAR - 1: # we use the car closest to the ego v
#             initial_guess = [x1, y1, x2 - x1, y2 - y1]

#     # get the local coordinates of the ego car
#     _, (lat1, lon1) = ego_car
#     ex1, ey1 = coords_to_local(lat1, lon1)

#     def distances_to_line(params, points):
#         x0, y0, dx, dy = params
#         distances = []
#         for (x, y) in points:
#             distance, _ = distance_to_line(x0, y0, dx, dy, x, y)
#             distances.append(distance)
#         return distances
    
#     # compute a line of best fit using the last 2 positions of all other cars
#     # this allows us to accurately calculate the cross track error between where we are and where we should be to be most directly in line with the other vehicles
#     # NOTE: this will start to break as you get more cars going around a curve, at which point we should switch to a curved line of best fit
#     result = least_squares(distances_to_line, initial_guess, args=(points,))

#     x0_opt, y0_opt, dx_opt, dy_opt = result.x
#     heading_diff = target_head - head_ego

#     dist, closest = distance_to_line(x0_opt, y0_opt, dx_opt, dy_opt, ex1, ey1)
#     cte = np.arctan2(K*dist, v_ego)
#     cte = np.rad2deg(cte)

#     print(f"CTE: {cte}, heading diff: {heading_diff}")

#     steer = heading_diff + cte
#     return steer, (x0_opt, y0_opt, dx_opt, dy_opt), closest

def stanley_controller(ego_car, head_ego, v_ego, target_head, targets):
    points = []

    # get the local coordinates of the last two locations of all other cars
    for i in range(CAR):
        for point in targets[i]:
            (lat, lon, _, _) = point
            x, y = coords_to_local(lat, lon)
            points.append((x, y))

    points.sort(key=lambda point: point[0])
    xs, ys = zip(*points)

    cs = CubicSpline(xs, ys)

    _, (lat1, lon1) = ego_car
    ex, ey = coords_to_local(lat1, lon1)

    def distance_to_spline(x):
        spline_y = cs(x)
        return np.sqrt((ex - x)**2 + (ey - spline_y)**2)
    
    res = minimize_scalar(distance_to_spline, bounds=(min(xs), max(xs)), method='bounded')
    closest_x = res.x
    closest_y = cs(closest_x)
    dist = distance_to_spline(closest_x)

    line_x = np.linspace(min(xs)-2, max(xs)+2, 300)
    line_y = cs(line_x)
    line = (line_x, line_y)
    closest = (closest_x, closest_y)

    heading_diff = target_head - head_ego
    cte = np.arctan2(K * dist, v_ego)
    cte = np.rad2deg(cte)

    steer = heading_diff + cte
    return steer, line, closest


In [None]:
def calculate_straight(start, end, seg_dist, target_speed):
    bearing, _, dist = geodesic.inv(start[1], start[0], end[1], end[0])
    seg_count = int(dist/seg_dist)
    segs = [(start[0], start[1], bearing)]
    vectors = [(start[1], start[0], bearing, target_speed)]
    for _ in range(seg_count):
        lat_prev, lon_prev, _ = segs[-1]
        lon_next, lat_next, _ = geodesic.fwd(lon_prev, lat_prev, bearing, seg_dist)
        segs.append((lat_next, lon_next, bearing))
        vectors.append((lat_prev, lon_prev, bearing, target_speed))

    return segs, vectors, bearing

def calculate_turn(start, start_bearing, end, end_bearing, seg_dist, target_speed, direction='left'):
    """Treats the start and end as vertices of the ellipse along its major and minor axes, and calculates the curge
    between them. The curve is calculated by incrementing the angle along the ellipse and calculating the distance.
    This method only works for turns up to 90 degrees."""

    bearing, _, dist = geodesic.inv(start[1], start[0], end[1], end[0])

    # determine starting angle along ellipse
    # compass angle being in degrees, and ellipse in radians
    if direction == 'left':
        c2e = lambda c_bearing: np.deg2rad((360 - c_bearing) % 360)
        e2c = lambda e_angle: (360 - np.rad2deg(e_angle)) % 360
        anglediff = lambda angle_1, angle_2: (angle_2 - angle_1 + np.pi*2) % (np.pi*2)
        increment = 0.001
    else:
        # TODO: implement support for right turns
        raise NotImplementedError("Right turns aren't supported yet")
    
    bearing = c2e(bearing)
    x_mult = abs(np.cos(bearing)*dist)
    y_mult = abs(np.sin(bearing)*dist)

    t_start = c2e(start_bearing)
    t_end = c2e(end_bearing)
    turn = anglediff(t_start, t_end)
    progress = 0

    # print(f"start_bearing: {start_bearing}, end_bearing: {end_bearing}")
    # print(f"t_start: {np.rad2deg(t_start)}, t_end: {np.rad2deg(t_end)}")

    points = [(x_mult*np.sin(t_start), y_mult*np.cos(t_start))]
    full_track = [(*start, start_bearing)]
    vectors = [(*start, start_bearing, target_speed)]
    t = t_start

    while progress < turn:
        t = (t + increment) % (np.pi*2)
        progress += increment

        x = x_mult*np.sin(t)
        y = y_mult*np.cos(t)

        lx, ly = points[-1]
        dist = np.sqrt((x - lx)**2 + (y - ly)**2)
        if dist >= seg_dist:
            points.append((x, y))

            bearing = np.rad2deg(np.arctan2(y - ly, x - lx))
            lon_next, lat_next, _ = geodesic.fwd(full_track[-1][1], full_track[-1][0], bearing, dist)
            full_track.append((lat_next, lon_next, e2c(t)))
            vectors.append((full_track[-1][0], full_track[-1][1], bearing, target_speed))

    final_point = (x_mult*np.sin(t_end), y_mult*np.cos(t_end))
    if points[-1] != final_point:
        dist = np.sqrt((final_point[0] - points[-1][0])**2 + (final_point[1] - points[-1][1])**2)
        bearing = np.rad2deg(np.arctan2(final_point[1] - points[-1][1], final_point[0] - points[-1][0]))
        lon_next, lat_next, _ = geodesic.fwd(full_track[-1][1], full_track[-1][0], bearing, dist)
        full_track.append((lat_next, lon_next, e2c(t_end)))
        vectors.append((full_track[-1][0], full_track[-1][1], bearing, target_speed))

    return full_track, vectors

def calculate_track(straights, turns, broadcast_interval):
    straight_points = []
    straight_vectors = []
    bearings = []
    for straight in straights:
        seg_dist = broadcast_interval*straight['speed']
        start = (straight['start']['lat'], straight['start']['lon'])
        end = (straight['end']['lat'], straight['end']['lon'])
        segs, vectors, bearing = calculate_straight(start, end, seg_dist, straight['speed'])
        straight_points.append(segs)
        straight_vectors.append(vectors)
        bearings.append(bearing)

    bearings = [(bearings[i], bearings[(i+1)%len(bearings)]) for i in range(len(bearings))]
    turn_points = []
    turn_vectors = []
    for turn, (start_bearing, end_bearing) in zip(turns, bearings):
        seg_dist = broadcast_interval*turn['speed']
        start = (turn['start']['lat'], turn['start']['lon'])
        end = (turn['end']['lat'], turn['end']['lon'])
        segs, vectors = calculate_turn(start, start_bearing, end, end_bearing, seg_dist, turn['speed'], turn['direction'])
        turn_points.append(segs)
        turn_vectors.append(vectors)

    full_track = []
    for straight, turn in zip(straight_points, turn_points):
        full_track += straight + turn

    time = 0
    for i in range(len(full_track)):
        full_track[i] = (*full_track[i], time)
        time += broadcast_interval

    ego_vectors = []
    for straight, turn in zip(straight_vectors, turn_vectors):
        ego_vectors += straight + turn

    return full_track, ego_vectors

full_track, target_vectors = calculate_track(track['straights'], track['turns'], 0.1)

In [None]:
ego = [(28.602147, -81.196794) for _ in range(3)]#[(28.608156, -81.195606), (28.608154723971715, -81.19560744596528)]
ego_start_heading, _, _ = geodesic.inv(ego[0][1], ego[0][0], ego[1][1], ego[1][0])
ego_vectors = [(28.602147, -81.196794, 0, 0) for _ in range(3)]#[(ego[0][0], ego[0][1], 180, 0) for _ in range(3)] # lat, lon, heading, velocity
ego_res = []
ego_targets = [(0,0), (0,0), (0,0)]
lines = [((0, 0, 0),(0, 0, 0)), ((0, 0, 0),(0, 0, 0)), ((0, 0, 0),(0, 0, 0))] # lines = [(0,0,0,0)]
# closest_points = [(0,0), (0,0), (0,0)] # closest_points = [(0,0)]

for frame in range(3, len(full_track)):
    ego_car = [ego[-2], ego[-1]]
    target_cars = [[full_track[frame-3], full_track[frame-2], full_track[frame-1], full_track[frame]]]
    res, targets = get_goal_motion(ego_car, target_cars, ego_vectors[-1])
    v, head = res.x
    ego_targets.append((v, head))
    ego_res.append(res)

    # head_ego, _, current_dist = geodesic.inv(ego_car[0][1], ego_car[0][0], ego_car[1][1], ego_car[1][0])
    # v_ego = current_dist/broadcast_int
    _, _, head_ego, v_ego = ego_vectors[-1]

    vel_accel = velocity_controller(v, v_ego)
    delta, line, closest = stanley_controller(ego_car, head_ego, v_ego, head, target_cars)
    lines.append(line)
    # closest_points.append(closest)

    new_heading = head_ego + delta
    new_speed = v_ego + vel_accel*BROADCAST_INTERVAL

    lon_next, lat_next, _ = geodesic.fwd(ego_car[1][1], ego_car[1][0], new_heading, new_speed*BROADCAST_INTERVAL)
    ego.append((lat_next, lon_next))
    ego_vectors.append((ego_car[1][0], ego_car[1][1], new_heading, new_speed))

In [None]:
import matplotlib.pyplot as plt
x = [t[3] for t in full_track[4:]]
fig, ax = plt.subplots(2)
ax[0].plot(x, [x[0] for x in ego_targets[4:]], label='ego speed')
ax[1].plot(x, [x[1]%360 for x in ego_targets[4:]], label='ego heading')
ax[0].plot(x, [x[3] for x in target_vectors[4:]], label='target speed')
ax[1].plot(x, [x[2]%360 for x in target_vectors[4:]], label='target heading')
ax[0].legend()
ax[1].legend()
plt.show()
# plt.xlabel('time')
# plt.ylabel('value')
# plt.legend()

In [None]:
from typing import List
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from matplotlib.animation import FuncAnimation
from matplotlib.text import Annotation
from matplotlib.lines import Line2D
from IPython.display import HTML
import matplotlib
import math
matplotlib.rcParams['animation.embed_limit'] = 2**128

def heading_velocity_to_components(heading, velocity):
    # Convert heading (degrees) to radians
    heading_rad = math.radians(heading)
    # Calculate the x and y components of the velocity
    vx = velocity * math.sin(heading_rad)
    vy = velocity * math.cos(heading_rad)
    return vx, vy

def vector_to_arrow(vector):
    x, y = coords_to_local(vector[0], vector[1])
    vx, vy = heading_velocity_to_components(vector[2], vector[3])
    return (x, y, vx, vy)

# Convert all track points for plotting running track
xy_points = [coords_to_local(lat, lon) for lat, lon, heading, time in full_track]
xs = [point[0] for point in xy_points]
ys = [point[1] for point in xy_points]

# set up the plot and plot the runing track
fig = plt.figure(figsize=(10, 8))
ax = fig.add_subplot(autoscale_on=False, xlim=(-100, 100), ylim=(-100, 100))
ax.plot(xs, ys, 'k-', label='Track Path')  # green dots connected by lines
ax.plot(xs[0], ys[0], 'yo', label='Start/End Point')
ax.set_aspect('equal')

ex, ey, edx, edy = vector_to_arrow(ego_vectors[0])
v, h = ego_targets[0]
# vacc, hacc = ego_controls[0]
tx, ty, tdx, tdy = vector_to_arrow(target_vectors[0])
xsl, ysl = lines[0] # x0_opt, y0_opt, dx_opt, dy_opt = lines[0]
# closest = closest_points[0]
ego_arrow = patches.Arrow(ex, ey, edx, edy, width=2, fc='red', ec='red')
target_arrow = patches.Arrow(tx, ty, tdx, tdy, width=2, fc='blue', ec='blue')
ego_text = Annotation(f"dx: {edx:.2f}, dy: {edy:.2f}\nv: {v}, h: {h}", xy=(ex, ey), xytext=(ex, ey))
# target_text = Annotation(f"dx: {tdx:.2f}, dy: {tdy:.2f}", xy=(tx, ty), xytext=(tx, ty))
line = Line2D(xs, ys, color='green') # line = Line2D([x0_opt, closest[0]], [y0_opt, closest[1]], color='green')
# pt = Line2D([closest[0]], [closest[1]], marker='o', color='green') # pt = Line2D([closest[0]], [closest[1]], marker='o', color='green')


def init_func() -> list[patches.Arrow]:
    print('init_func')
    ax.add_patch(ego_arrow)
    ax.add_patch(target_arrow)
    ax.add_artist(ego_text)
    # ax.add_artist(target_text)
    ax.add_artist(line)
    # ax.add_artist(pt)
    return ego_arrow, target_arrow, ego_text#, target_text

def func(frame, *fargs) -> list[patches.Arrow]:
    global ego_arrow, target_arrow, ego_text#, target_text, line, pt

    ax.patches[ax.patches.index(ego_arrow)].remove()
    ax.patches[ax.patches.index(target_arrow)].remove()

    _, _, e_heading, _ = ego_vectors[frame]
    _, _, t_heading, _ = target_vectors[frame]
    v, h = ego_targets[frame]
    ex, ey, edx, edy = vector_to_arrow(ego_vectors[frame])
    tx, ty, tdx, tdy = vector_to_arrow(target_vectors[frame])
    ego_arrow = patches.Arrow(ex, ey, edx*BROADCAST_INTERVAL, edy*BROADCAST_INTERVAL, width=2, fc='red', ec='red')
    target_arrow = patches.Arrow(tx, ty, tdx*BROADCAST_INTERVAL, tdy*BROADCAST_INTERVAL, width=2, fc='blue', ec='blue')
    ego_text.set_text(f"dx: {edx*BROADCAST_INTERVAL:.2f}, dy: {edy*BROADCAST_INTERVAL:.2f}, heading: {e_heading:.2f}")#\nv: {v:.2f}, t_h: {h:.2f}")#\naccx: {accx:.2f}, accy: {accy:.2f}")
    ego_text.set_position((ex, ey))
    # target_text.set_text(f"dx: {tdx*broadcast_int:.2f}, dy: {tdy*broadcast_int:.2f}, heading: {t_heading:.2f}")
    # target_text.set_position((tx, ty))
    ax.add_patch(ego_arrow)
    ax.add_patch(target_arrow)

    xsl, ysl = lines[frame]# x0_opt, y0_opt, dx_opt, dy_opt = lines[frame]
    # closest = closest_points[frame]
    line.set_data(xsl, ysl) # line.set_data([x0_opt, closest[0]], [y0_opt, closest[1]])
    # pt.set_data([closest[0]], [closest[1]])

    return ego_arrow, target_arrow, ego_text#, target_text

ax.set_xlabel('Meters East of Center')
ax.set_ylabel('Meters North of Center')
ax.set_title('Vehicle Path and Vectors')
ax.legend()
ax.grid()

# Create animation
ani = FuncAnimation(fig, func, range(min(len(ego_vectors), len(target_vectors))), init_func, blit=True, interval=50) # Adjust interval as needed
HTML(ani.to_jshtml())