<a href="https://colab.research.google.com/github/shivamtrip/mpc-drone-tracking/blob/main/Figure_Eight_Challenge_Shivam_Tripathy.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Skydio Take Home Challenge - Figure Eight

**Goal** - Write a planning policy for a 2D drone that can:
1. Track a moving subject at a desired range.
2. Stay in front of the subject's velocity vector (aka film them from the front).
3. Avoid a moving obstacle / keep out zone.

The function you write will command an acceleration at each time step given the drone, subject, and obstacle states. The drone is a simple double integrator with maximum acceleration and velocity limits as set in the scenario definition. All quantities are in a consistent global frame.

**Metric** - We provide an error metric that scores your policy against these goals. All error terms are smooth - for example, we penalize a quadratic error for the drone's (x, y) location being inside the radius of the obstacle at each time step. There is no way to get an error of zero!

Iterate on your policy to get the lowest error you can. As a rough guideline, 800 is a poor score and 250 is a good score.

**Submission** - Make a copy of this notebook and fill out the code and writeup. Feel free to download and run locally if helpful. Share the completed Google Colab link with us by setting the permission to *Anyone with the link can view*.

In [None]:
import numpy as np
import plotly.express as px
import time
import typing as T
import cvxpy as cvx
import scipy

## Simulation Loop

In [None]:
def simulate_drone_dynamics(
    dt : float,
    position : np.ndarray,
    velocity : np.ndarray,
    accel_command : np.ndarray,
    max_accel : float,
    max_speed : float
  ) -> np.ndarray:
    """
    Simulate the drone's dynamics forward given a control input.
    """
    def saturated(vec : np.ndarray, max_mag : float) -> np.ndarray:
        mag = np.sqrt(np.sum(vec**2) + 1e-6)
        return vec * np.minimum(mag, max_mag) / mag

    # Basic forward euler integration
    accel_next = saturated(accel_command, max_accel)
    velocity_next = saturated(velocity + dt * accel_next, max_speed)
    position_next = position + dt * velocity_next

    return position_next, velocity_next, accel_next

In [None]:
def run_simulation(
    times : np.ndarray,
    subject_positions : np.ndarray,
    obstacle_positions : np.ndarray,
    planning_policy : T.Any,
    params : T.Dict[str, T.Any]
  ) -> T.Tuple[np.ndarray]:
    """
    Compute the drone's trajectory by stepping along time, computing a commmand,
    and simulating the drone's dynamics.
    """
    policy = planning_policy(params=params)
    drone_positions = [params['drone_initial_position']]
    drone_velocities = [np.array([0, 0])]
    drone_accels = [np.array([0, 0])]
    subject_velocities = [np.array([0, 0])]

    for i in range(len(times) - 1):

        # Get current state
        t, t_next = times[i], times[i + 1]
        dt = t_next - t
        drone_position, drone_velocity = drone_positions[i], drone_velocities[i]
        subject_position = subject_positions[i]
        subject_velocity = (subject_positions[i + 1] - subject_position) / dt
        obstacle_position = obstacle_positions[i]
        obstacle_velocity = (obstacle_positions[i + 1] - obstacle_positions[i]) / dt

        # Compute the command
        accel_command = policy.compute_accel(
            t=t,
            drone_position=drone_position,
            drone_velocity=drone_velocity,
            subject_position=subject_position,
            subject_velocity=subject_velocity,
            obstacle_position=obstacle_position,
            obstacle_velocity=obstacle_velocity
        )

        # Simulate the next drone position
        drone_position_next, drone_velocity_next, drone_accel_next =\
            simulate_drone_dynamics(
                dt=t_next - t,
                position=drone_position,
                velocity=drone_velocity,
                accel_command=accel_command,
                max_accel=params['drone_max_accel'],
                max_speed=params['drone_max_speed']
            )
        drone_positions.append(drone_position_next)
        drone_velocities.append(drone_velocity_next)
        drone_accels.append(drone_accel_next)
        subject_velocities.append(subject_velocity)

    return np.vstack(drone_positions), np.vstack(drone_velocities),\
        np.vstack(drone_accels), np.vstack(subject_velocities)

## Error Metric

In [None]:
def mod_2_pi(x : np.ndarray) -> np.ndarray:
    """
    Convert an angle in radians into range [-pi, pi]
    """
    return np.mod(x + np.pi, 2 * np.pi) - np.pi

def compute_error_for_scenario(
    subject_positions : np.ndarray,
    subject_velocities : np.ndarray,
    drone_positions : np.ndarray,
    drone_velocities : np.ndarray,
    drone_accels : np.ndarray,
    obstacle_positions : np.ndarray,
    params : T.Dict[str, T.Any],
    verbose : bool = True
  ):
    """
    Score the planning policy's performance with a multi-objective metric,
    where lower is better.

    Current cost terms:
      #1) Drone is at a desired range from the subject
      #2) Drone is at a desired angle relative to the subject's velocity vector
      #3) Drone is penalized in a soft way for being inside the obstacle
    """
    error = 0.0

    # Error term for the range of the drone from the subject
    range_cost = params['desired_range_to_subject_cost']
    diff = drone_positions - subject_positions
    actual_range = np.linalg.norm(diff, axis=1)
    range_residual = params['desired_range_to_subject'] - actual_range
    range_error = 0.5 * params['dt'] * range_cost * range_residual.dot(range_residual)
    error += range_error
    if verbose:
        print(f'Subject range error: {range_error:0.1f}')

    # Error term for the angle to the subject
    angle_cost = params['desired_angle_to_subject_velocity_cost']
    actual_angle = np.arctan2(diff[:, 1], diff[:, 0])
    subject_vel_angle = np.arctan2(subject_velocities[:, 1], subject_velocities[:, 0] + 1e-6)
    angle_residual = mod_2_pi(params['desired_angle_to_subject_velocity'] +\
                              subject_vel_angle - actual_angle)
    angle_error = 0.5 * params['dt'] * angle_cost * angle_residual.dot(angle_residual)
    error += angle_error
    if verbose:
        print(f'Subject angle error: {angle_error:0.1f}')

    # Error term for going inside a moving obstacle radius
    obstacle_cost = params['obstacle_cost']
    range_to_obstacle = np.linalg.norm(drone_positions - obstacle_positions, axis=1)
    infringement = np.maximum(0, params['obstacle_radius'] - range_to_obstacle)
    obstacle_error = 0.5 * params['dt'] * obstacle_cost * infringement.dot(infringement)
    error += obstacle_error
    if verbose:
        print(f'Obstacle error: {obstacle_error:0.1f}')

    print(f'Total error: {error:0.1f}')
    return error

## Scenario Definitions

In [None]:
class Scenario:
    """
    Base class for describing the test scenario.
    """

    def __init__(self):
        pass

    def params(self):
        """
        Return static parameters for the scenario
        """
        return dict(
            dt = 0.025,  # [s]
            total_time = 30.0,  # [s]

            drone_initial_position = np.array([-3.0, 0.0]),  # [m]
            drone_max_speed = 10.0,  # [m/s]
            drone_max_accel = 2.5,  # [m/s^2]

            obstacle_radius = 2.0,  # [m]

            desired_range_to_subject_cost = 10.0,
            desired_range_to_subject = 3.0,  # [m]

            desired_angle_to_subject_velocity_cost = 10.0,
            desired_angle_to_subject_velocity = 0.0 * np.pi,  # [rad]

            obstacle_cost = 500.0
        )

    def subject_position(self, t : float) -> np.ndarray:
        """
        Return the subject's (x, y) position given a time.
        """
        return np.array([0.0, 0.0])

    def obstacle_position(self, t : float) -> np.ndarray:
        """
        Return the obstacle's (x, y) position given a time.
        """
        return np.array([10.0, 10.0])

In [None]:
class FigureEightCosineObstacle(Scenario):
    """
    The subject moves along a figure eight curve:
        https://mathworld.wolfram.com/EightCurve.html

    Obstacle moves along a cosine wave.
    """

    def __init__(self,
                 loop_time : float = 25.0,  # [s]
                 loop_radius : float = 5.0,  # [m]
                 obstacle_period = 4.14):
        self.loop_time = loop_time
        self.loop_radius = loop_radius
        self.obstacle_period = obstacle_period

    def subject_position(self, t : float) -> np.ndarray:
        t_angle = t / self.loop_time * (2 * np.pi)
        x = self.loop_radius * np.sin(t_angle)
        y = x * np.cos(t_angle)
        return np.array([x, y]) + np.array([0, 1])

    def obstacle_position(self, t : float) -> np.ndarray:
        return np.array([11, -1]) + (t/25) * np.array([-21.1, 0]) +\
           np.cos(t * 2 * np.pi / self.obstacle_period) * np.array([0, 5])

## Animated Plot Function

In [None]:
def plot_animation(scenario : Scenario, result : T.Dict[str, T.Any],
                   range_dims : T.Tuple[float, float] = (10, 6),
                   drawing_dt : float = 0.5) -> None:
    """
    Draw the experiment results as an interactive widget.
    """
    title = f'{scenario.__class__.__name__} (error = {result["error"]:0.1f})'
    times = result['times']
    subject_positions = result['subject_positions']
    drone_positions = result['drone_positions']
    obstacle_positions = result['obstacle_positions']
    obstacle_radius = scenario.params()['obstacle_radius']

    # Downsample for drawing sanity, also grabbing last frame
    dt = times[1] - times[0]
    step = int(drawing_dt / dt)
    indices = np.array(list(range(len(times))[::step]) + [len(times) - 1])

    # Assemble the data frame by stacking subject, drone, obstacle data
    data = dict()
    data['t'] = np.hstack([times[indices], times[indices], times[indices]])
    data['x'] = np.hstack([subject_positions[indices, 0],
                           drone_positions[indices, 0],
                           obstacle_positions[indices, 0]])
    data['y'] = np.hstack([subject_positions[indices, 1],
                           drone_positions[indices, 1],
                           obstacle_positions[indices, 1]])
    data['type'] = ['subject'] * len(indices) +\
                   ['drone'] * len(indices) +\
                   ['obstacle'] * len(indices)
    data['size'] = [5] * len(indices) + [5] * len(indices) +\
                   [53 * obstacle_radius] * len(indices)

    # Make the animated trace
    fig = px.scatter(
        data,
        x='x',
        y='y',
        animation_frame='t',
        animation_group='type',
        color='type',
        category_orders={'type': ['obstacle', 'subject', 'drone']},
        color_discrete_sequence=('#FF5555', '#CCCC00', '#5555FF'),
        size='size',
        size_max=data['size'][-1],
        hover_name='type',
        template='plotly_dark',
        range_x=(-range_dims[0], range_dims[0]),
        range_y=(-range_dims[1], range_dims[1]),
        height=700,
        title=title
      )

    # Make equal one meter grid
    fig.update_xaxes(
        dtick=1.0,
        showline=False
    )
    fig.update_yaxes(
      scaleanchor = "x",
      scaleratio = 1,
      showline=False,
      dtick=1.0
    )

    # Draw full curve of the subject's path
    subject_line = px.line(
        x=subject_positions[:, 0],
        y=subject_positions[:, 1]
      ).data[0]
    subject_line.line['color'] = '#FFFF55'
    subject_line.line['width'] = 1
    fig.add_trace(subject_line)

    # Draw full curve of the drone's path
    drone_line = px.line(
        x=drone_positions[:, 0],
        y=drone_positions[:, 1]
      ).data[0]
    drone_line.line['color'] = '#AAAAFF'
    drone_line.line['width'] = 1
    fig.add_trace(drone_line)

    # Draw full curve of the obstacle's path
    drone_line = px.line(
        x=obstacle_positions[:, 0],
        y=obstacle_positions[:, 1]
      ).data[0]
    drone_line.line['color'] = '#FFAAAA'
    drone_line.line['width'] = 1
    fig.add_trace(drone_line)

    fig.show()

## Run Scenario Function


In [None]:
def run_scenario_using_planning_policy(scenario : Scenario,
                                       planning_policy : T.Any) -> T.Dict:
    """
    Simulate the given planning policy and score its performance.
    """
    params = scenario.params()
    times = np.arange(0, params['total_time'], params['dt'])

    # Compute the subject and obstacle trajectory
    subject_positions = np.array([scenario.subject_position(t) for t in times])
    obstacle_positions = np.array([scenario.obstacle_position(t) for t in times])

    # Compute the drone trajectory
    start_time = time.time()
    drone_positions, drone_velocities, drone_accels, subject_velocities =\
        run_simulation(
            times=times,
            subject_positions=subject_positions,
            obstacle_positions=obstacle_positions,
            planning_policy=planning_policy,
            params=params
        )
    print(f'\nScenario took {time.time() - start_time:6.2f} s.')

    # See how well the policy did
    error = compute_error_for_scenario(
        subject_positions=subject_positions,
        subject_velocities=subject_velocities,
        drone_positions=drone_positions,
        drone_velocities=drone_velocities,
        drone_accels=drone_accels,
        obstacle_positions=obstacle_positions,
        params=params,
        verbose=True
    )

    return dict(
        times=times,
        subject_positions=subject_positions,
        drone_positions=drone_positions,
        obstacle_positions=obstacle_positions,
        error=error
    )

## Create and Test Planning Policy <font color='red'><-- your code goes here</font>

In [None]:
class MyPlanningPolicy:
    """
    Command accelerations to the drone to follow the subject according to the
    desired settings and avoid the obstacle.
    """

    def __init__(self, params):
      self.params = params

    def compute_accel(
        self,
        t: float,
        drone_position : np.ndarray,
        drone_velocity : np.ndarray,
        subject_position : np.ndarray,
        subject_velocity : np.ndarray,
        obstacle_position : np.ndarray,
        obstacle_velocity : np.ndarray
      ) -> np.ndarray:

        print("T: ", t)

        # Step 1 - Initializing variables and weights
        des_range_from_subject = self.params['desired_range_to_subject']
        des_angle_from_subject_velocity = self.params['desired_angle_to_subject_velocity']

        nx = 4              # number of state variables
        nu = 2              # number of input variables

        dt = self.params['dt']

        Q = np.diag(np.ones(nx))
        R = np.diag(np.ones(nu))

        Q[0, 0] *= 20             # increasing gain to converge to goal faster in x position
        Q[1, 1] *= 20             # increasing gain to converge to goal faster in y position

        N_mpc = 10                # MPC horizon size

        repulsion_weight =  1.5            # weight of repulsion force for obstacle avoidance
        safety_distance = 2                # Additional safety distance for obstacle avoidance

        subject_traj = np.zeros((nx, N_mpc))          # matrix to store subject trajectory
        obstacle_traj = np.zeros((nx, N_mpc))           # matrix to store obstacle trajectory
        desired_drone_traj = np.zeros((nx, N_mpc))      # matrix to store drone trajectory

        subject_pred_position = subject_position      # subject initial position
        obstacle_pred_position = obstacle_position      # obstacle initial position


        # Step 2 - Predicting trajectories for the subject and obstacle over a horizon
        for i in range(N_mpc):
          subject_pred_position += dt * subject_velocity      # Assuming subject travels at a constant velocity for N_mpc timesteps
          subject_traj[:, i] = np.hstack((subject_pred_position, subject_velocity))

          obstacle_pred_position += dt * obstacle_velocity      # Assuming obstacle travels at a constant velocity for N_mpc timesteps
          obstacle_traj[:, i] = np.hstack((obstacle_pred_position, obstacle_velocity))


        # Step 3 - Initializing drone trajectory (based on subject and obstacle trajectories)
        desired_drone_heading_angle = np.arctan2(subject_velocity[1], subject_velocity[0]) - des_angle_from_subject_velocity        # drone heading angle
        desired_drone_range = des_range_from_subject * np.array([np.cos(desired_drone_heading_angle), np.sin(desired_drone_heading_angle)])   # drone range from subject

        desired_drone_traj[:, 1] = subject_traj[:, 1]
        obstacle_repulsion_force = repulsion_weight * (1 / np.linalg.norm(desired_drone_traj[0:2, 1] - obstacle_traj[0:2, 1], 2))

        if obstacle_repulsion_force <= 0:
          obstacle_repulsion_force = max(-(self.params['obstacle_radius'] + safety_distance), obstacle_repulsion_force)
        else:
          obstacle_repulsion_force = min(self.params['obstacle_radius'] + safety_distance, obstacle_repulsion_force)

        desired_drone_traj[:2, 1] += desired_drone_range + obstacle_repulsion_force         # setting initial value of drone trajectory


        # Step 4 - Generating drone trajectory (based on drone range, drone angle and obstacle avoidance)
        for i in range(2, N_mpc):     # starting at 2 since drone tracks subject better after subject has moved by a certain distance (upon testing)
          obstacle_repulsion_force = repulsion_weight * (1 / np.linalg.norm(desired_drone_traj[0:2, i - 1] - obstacle_traj[0:2, i], 2))

          if obstacle_repulsion_force <= 0:
            obstacle_repulsion_force = max(-(self.params['obstacle_radius'] + safety_distance), obstacle_repulsion_force)
          else:
            obstacle_repulsion_force = min(self.params['obstacle_radius'] + safety_distance, obstacle_repulsion_force)

          desired_drone_position = subject_traj[0:2, i] + desired_drone_range + obstacle_repulsion_force
          desired_vel_temp = (desired_drone_position - desired_drone_traj[:2, i - 1]) / dt
          desired_vel = np.clip(desired_vel_temp, -self.params['drone_max_speed'], self.params['drone_max_speed'])
          desired_drone_traj[:, i] = np.hstack((desired_drone_position, desired_vel))


        # Step 5 - Using MPC to control the drone to track the generated trajectory
        X = cvx.Variable((nx, N_mpc))
        U = cvx.Variable((nu, N_mpc - 1))

        cost = 0        # initializing cost function
        X_init = np.hstack((drone_position, drone_velocity))        # initial condition

        for i in range(1, N_mpc - 1):
          xk = X[:, i]
          uk = U[:, i]
          xk_ref = desired_drone_traj[:, i]
          cost += 0.5 * cvx.quad_form(xk - xk_ref, Q)
          cost += 0.5 * cvx.quad_form(uk, R)

        xn = X[:, N_mpc - 1] - subject_traj[:, N_mpc - 1]

        cost += 0.5 * cvx.quad_form(xn, Q)                  # terminal cost

        objective = cvx.Minimize(cost)

        constraints = [X[:, 0] == X_init]

        for i in range(N_mpc - 1):
          constraints += [X[2:,i+1] == X[2:,i] + U[:,i] * dt]                             # double integrator velocity at next time step
          constraints += [X[:2,i+1] == X[:2,i] + X[2:,i] * dt]                            # double integrator position at next time step
          constraints += [(U[:, i])**2 <= (self.params['drone_max_accel'])**2]            # input acceleration constraints

        prob = cvx.Problem(objective, constraints)

        result = prob.solve()

        acc_command = U.value[:, 0]

        return acc_command

In [None]:
# Run one scenario for debugging
# scenario = FigureEightCosineObstacle(
#     loop_time=25,
#     loop_radius=5.0,
#     obstacle_period=28
# )

scenario = FigureEightCosineObstacle(loop_time=30, loop_radius=4.4, obstacle_period=15)

result = run_scenario_using_planning_policy(scenario, MyPlanningPolicy)

T:  0.0
T:  0.025
T:  0.05
T:  0.07500000000000001
T:  0.1
T:  0.125
T:  0.15000000000000002
T:  0.17500000000000002
T:  0.2
T:  0.225
T:  0.25
T:  0.275
T:  0.30000000000000004
T:  0.325
T:  0.35000000000000003
T:  0.375
T:  0.4
T:  0.42500000000000004
T:  0.45
T:  0.47500000000000003
T:  0.5
T:  0.525
T:  0.55
T:  0.5750000000000001
T:  0.6000000000000001
T:  0.625
T:  0.65
T:  0.675
T:  0.7000000000000001
T:  0.7250000000000001
T:  0.75
T:  0.775
T:  0.8
T:  0.8250000000000001
T:  0.8500000000000001
T:  0.875
T:  0.9
T:  0.925
T:  0.9500000000000001
T:  0.9750000000000001
T:  1.0
T:  1.0250000000000001
T:  1.05
T:  1.075
T:  1.1
T:  1.125
T:  1.1500000000000001
T:  1.175
T:  1.2000000000000002
T:  1.225
T:  1.25
T:  1.2750000000000001
T:  1.3
T:  1.3250000000000002
T:  1.35
T:  1.375
T:  1.4000000000000001
T:  1.425
T:  1.4500000000000002
T:  1.475
T:  1.5
T:  1.5250000000000001
T:  1.55
T:  1.5750000000000002
T:  1.6
T:  1.625
T:  1.6500000000000001
T:  1.675
T:  1.7000000000000002

In [None]:
# Visualize
plot_animation(scenario, result)

# Evaluate Score

In [None]:
# Create five scenarios with small variations to prevent overfitting
scenarios = [
  FigureEightCosineObstacle(loop_time=25, loop_radius=5.0, obstacle_period=28),
  FigureEightCosineObstacle(loop_time=24, loop_radius=4.9, obstacle_period=21),
  FigureEightCosineObstacle(loop_time=28, loop_radius=5.2, obstacle_period=15),
  FigureEightCosineObstacle(loop_time=30, loop_radius=4.4, obstacle_period=15),
  FigureEightCosineObstacle(loop_time=30, loop_radius=6.0, obstacle_period=23)
]

In [None]:
# Evaluate to obtain final score
results = [run_scenario_using_planning_policy(s, MyPlanningPolicy)
           for s in scenarios]
mean_error = np.mean([r['error'] for r in results])

from IPython.display import display, HTML
display(HTML(f'<h2 style="color:blue">** Mean Error: {mean_error:6.1f} **</h2>'))

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
T:  25.625
T:  25.650000000000002
T:  25.675
T:  25.700000000000003
T:  25.725
T:  25.75
T:  25.775000000000002
T:  25.8
T:  25.825000000000003
T:  25.85
T:  25.875
T:  25.900000000000002
T:  25.925
T:  25.950000000000003
T:  25.975
T:  26.0
T:  26.025000000000002
T:  26.05
T:  26.075000000000003
T:  26.1
T:  26.125
T:  26.150000000000002
T:  26.175
T:  26.200000000000003
T:  26.225
T:  26.25
T:  26.275000000000002
T:  26.3
T:  26.325000000000003
T:  26.35
T:  26.375
T:  26.400000000000002
T:  26.425
T:  26.450000000000003
T:  26.475
T:  26.5
T:  26.525000000000002
T:  26.55
T:  26.575000000000003
T:  26.6
T:  26.625
T:  26.650000000000002
T:  26.675
T:  26.700000000000003
T:  26.725
T:  26.75
T:  26.775000000000002
T:  26.8
T:  26.825000000000003
T:  26.85
T:  26.875
T:  26.900000000000002
T:  26.925
T:  26.950000000000003
T:  26.975
T:  27.0
T:  27.025000000000002
T:  27.05
T:  27.075000000000003
T:  27.1
T:  27.125
T: 

In [None]:
# Browse output
plot_animation(scenarios[0], results[0])

## Discussion <font color='red'><-- your writeup goes here</font>


My Approach -
1. First, I predict the subject and obstacle trajectories for a fixed horizon by assuming that they will both travel at a constant velocity throughout the horizon.
2. Using the predicted trajectories of the subject and the obstacle, I then generate a trajectory for the drone. This trajectory takes into account the drone's angle from the subject, the drone's range from the subject and the drone's distance from obstacles.
3. Finally, I use an MPC controller to find the optimal acceleration inputs in order for the drone to track the generated trajectory. I use the drone's acceleration limits and double integrator dynamics as constraints.

I spent around 5 - 6 hours on this task.


## Stretch Goal (optional)

In this section, extend the challenge in any interesting or creative way you like! Some possibilities:

* Create a new exciting scenario to demonstrate your planner's agility.
* Use optimization in your planning policy over a fixed horizon.
* Find the optimal "offline" solution using ground truth subject and obstacle trajectories, and see how low the error can get by commanding its results in your online policy. The globally optimal score is much lower using this information.
* Refactor the drone's state and dynamics to be more realistic by adding an orientation and providing accel and angular accel commands in the body frame. Optionally use an ODE solver to simulate the dynamics.
