# Initialisation

Imports

In [1]:
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.animation as animation
matplotlib.use("TkAgg")

# Trajectories generation
from scipy.interpolate import lagrange
from scipy.ndimage import gaussian_filter1d

# Class discrimination
from scipy.special import gammaln
from scipy.optimize import minimize

Constants

In [2]:
SIM_DURATION = 1000

WINDOW_SIZE = 20
DISTANCE = 5
BEAM_ANGLE = 0.5

RESOLUTION = 0.01
RADIUS_DECREASE_COEF = 0.5
RADIUS_INCREASE_COEF = 2
SPEED_RATIO = 2 # How quickly the sensor updates compared to the AUV

STEP_COEF = 1.25    # < (1 + RADIUS_INCREASE_COEF)/np.sqrt(5) for optimal surface coverage

MU = 0
SIGMA = 0.01

NUMBER_OF_TRAJ = 100       # Number of trajectories
MAX_DEG_X = 4  # Maximum degree for x(t)
MAX_DEG_Y = 3  # Maximum degree for y(t)

Global variables

In [None]:
beam_radius = 0

sensor_position = np.array([WINDOW_SIZE/2, WINDOW_SIZE/2])  # Initial position of beam's center
tracking_position = np.copy(sensor_position)    # Current position of beam's center
target_position = np.array([0, 0])

is_tracking = False # To decide whether the sensor is in the searching or tracking phase


# Spiral parameters
axis = 0        # 0: x-axis      ;   1: y-axis
direction = 1   # 1: increasing  ;  -1: decreasing
initial_axis = axis



# Plot initialisation
fig, ax = plt.subplots()
ax.set_xlim(0, WINDOW_SIZE)
ax.set_ylim(0, WINDOW_SIZE)
target_line, = ax.plot([], [], 'r--', alpha=0.5, label="Trajectoire cible")
tracking_line, = ax.plot([], [], 'b--', alpha=0.5, label="Trajectoire suivi")
sensor_dot, = ax.plot([], [], 'go', markersize=8, label="Capteur")
target_dot, = ax.plot([], [], 'ro', markersize=8, label="Cible")
tracking_dot, = ax.plot([], [], 'bo', markersize=8, label="Tracking")
ax.legend()
ax.set_title("Simulation Pointing, Acquisition, and Tracking (PAT)")

Tool functions

In [4]:
def calculate_radius(distance, angle):
    """Computes the beam radius at a certain distance from its source"""
    radius = distance * np.tan(angle)
    return radius


def calculate_distance(point1, point2):
    """Computes the distance between two points"""
    distance = np.linalg.norm(point1 - point2)
    return distance


def detect_target():
    """Tests whether the target is within the beam"""
    distance = calculate_distance(tracking_position, target_position)
    return distance <= beam_radius

# 1. Trajectories generation

In [5]:
def sample_z():
    """Renvoie une variable aléatoire de loi demi-cercle"""
    while True:
        z = np.random.uniform(-np.sqrt(2/np.pi), np.sqrt(2/np.pi))
        p_z = np.sqrt((2/np.pi) - z**2)
        if np.random.uniform(0, np.sqrt(2/np.pi)) <= p_z:
            return z


def generate_lagrange_polynomial(max_degree):
    """Renvoie un polynome interpolé de degré <=max_degree"""
    degree = np.random.choice(range(1, max_degree + 1))  # Randomly select the polynomial degree
    t_points = np.linspace(0, 500, degree + 1)
    values = WINDOW_SIZE * (np.pi / np.sqrt(2)) * np.array([sample_z() for _ in range(degree + 1)])
    values = (values + 2*WINDOW_SIZE) / 4
    return lagrange(t_points, values)


def add_noise_to_trajectories(trajectories, mu, sigma):
    """Ajoute un bruit gaussien à une trajectoire"""
    noisy_trajectories = []
    for trajectory in trajectories:
        t, x_t, y_t = trajectory
        noise_x = np.random.normal(mu, sigma, size=x_t.shape)
        noise_y = np.random.normal(mu, sigma, size=y_t.shape)
        x_t_noisy = x_t + noise_x
        y_t_noisy = y_t + noise_y
        noisy_trajectories.append((t, x_t_noisy, y_t_noisy))
    return noisy_trajectories


def generate_trajectories(number_of_trajectories, max_deg_x, max_deg_y):
    """Génère un polynôme pour x et un pour y et renvoie x(t) et y(t)"""
    trajectories = []
    for _ in range(number_of_trajectories):
        poly_x = generate_lagrange_polynomial(max_deg_x)
        poly_y = generate_lagrange_polynomial(max_deg_y)
        t = np.linspace(0, 500, SIM_DURATION)
        x_t = poly_x(t)
        y_t = poly_y(t)
        trajectories.append((t, x_t, y_t))
    return trajectories


def plot_trajectories(trajectories):
    """Affiche la trajectoire"""
    n = len(trajectories)
    rows = int(np.ceil(np.sqrt(n)))
    cols = int(np.ceil(n / rows))
    
    fig, axes = plt.subplots(rows, cols, figsize=(cols * 5, rows * 5))
    axes = np.array(axes).flatten()
    
    for i, (t, x_t, y_t) in enumerate(trajectories):
        ax = axes[i]
        ax.plot(x_t, y_t)
        ax.set_xlabel("x(t)")
        ax.set_ylabel("y(t)")
        ax.set_title(f"Curve {i+1}")
        ax.grid()
    
    for j in range(n, len(axes)):
        fig.delaxes(axes[j])
    
    plt.tight_layout()
    plt.show()

# 2. PAT

A. Generation of a trajectory using part 1.

In [None]:
trajectory = add_noise_to_trajectories(generate_trajectories(1, MAX_DEG_X, MAX_DEG_Y), 0, 0.01)

x0 = trajectory[0][1][0]
y0 = trajectory[0][2][0]
target_position = np.array([x0, y0])


target_history = [target_position.copy()]
tracking_history = [tracking_position.copy()]

B. Movement of AUV and sensor

In [7]:
def initialise_spiral_parameters(ax, dir):
    """Initialises parameters to start a spiral movement"""
    global initial_axis, axis, direction, side_length, current_length, spiral_count
    initial_axis = ax
    axis = ax
    direction = dir
    side_length = 1
    current_length = 0
    spiral_count = -1


def move_sensor():
    """Moves the sensor to its next position along the spiral"""
    global tracking_position, axis, direction, side_length, current_length, spiral_count
    point = tracking_position.copy()
    step = STEP_COEF * beam_radius * 0.5
    
    if current_length == side_length - 1:
        spiral_count += 1

    if current_length < side_length: # if the spiral side is not finished, the sensor keeps following it
        current_length += 1
    else: # if the sensor reaches a spiral angle, it turns
        if axis != initial_axis:
            direction *= -1
            side_length += 1
        axis = 1 - axis
        current_length = 1

    point[axis] += direction * step
    tracking_position = np.clip(point.copy(), 0, WINDOW_SIZE)


def move_AUV(frame, trajectory):
    """Updates the AUV's position along its trajectory"""
    global target_position
    new_x = trajectory[0][1][frame]
    new_y = trajectory[0][2][frame]
    target_position = np.array([new_x, new_y])
    target_position = np.clip(target_position, 0, WINDOW_SIZE)

    # Free history to erase the beginning of the trajectory displayed
    if len(target_history) > 50:
        target_history.pop(0)
    target_history.append(target_position.copy())


C. PAT functions

In [8]:
def search_target():
    """Updates the beam's position during the searching phase"""
    global tracking_position, is_tracking, beam_radius, spiral_count, current_length, side_length, axis
    
    if len(tracking_history) > 50:
        tracking_history.pop(0)
    tracking_history.append(tracking_position.copy())
    
    if spiral_count == 5:
        current_length -= 1
        move_sensor()
        if len(tracking_history) > 50:
            tracking_history.pop(0)
        tracking_history.append(tracking_position.copy())
        side_length -= 2
        beam_radius *= RADIUS_INCREASE_COEF
        spiral_count = 0
    
    if detect_target():
        spiral_count = 0
        if beam_radius <= calculate_radius(DISTANCE, RESOLUTION):
            is_tracking = True
            print("found")
        else:
            beam_radius *= RADIUS_DECREASE_COEF
            initialise_spiral_parameters(axis, direction)
    else:
        move_sensor()
    

def check_neighbourhood():
    """Looks for the AUV in the neighbourhood of the current position, and returns the position closest to it"""
    center = tracking_position
    closest_point = center.copy()
    min_dist = calculate_distance(closest_point, target_position)

    for dx in [-1, 0, 1]:
        for dy in [-1, 0, 1]:
            center = tracking_position.copy()
            directional_vector = np.array([dx, dy])
            temp_position = center + beam_radius * directional_vector
            distance = calculate_distance(temp_position, target_position)
            if distance < min_dist:
                closest_point = temp_position.copy()
                min_dist = distance
    return closest_point


def track_target():
    """Updates the beam's position during the tracking phase"""
    global tracking_position, beam_radius
    
    if len(tracking_history) > 50:
        tracking_history.pop(0)
    tracking_history.append(tracking_position.copy())
    
    if not detect_target():
        beam_radius *= RADIUS_INCREASE_COEF
    else:
        if beam_radius > calculate_radius(DISTANCE, RESOLUTION):
            beam_radius *= RADIUS_DECREASE_COEF
        tracking_position = check_neighbourhood()


def update(frame):
    """Runs on every frame of the animation to compute the frame's state and update what is displayed"""
    global tracking_position, target_position
    
    if frame % SPEED_RATIO == 0:  # to adapt the AUV speed to the sensor's
        move_AUV(frame//SPEED_RATIO, trajectory)

    if not is_tracking:
        search_target()           
    else :
        track_target() 


    # Updates the animation's plot

    target_x, target_y = zip(*target_history)
    tracking_x, tracking_y = zip(*tracking_history)

    target_line.set_data(target_x, target_y)
    tracking_line.set_data(tracking_x, tracking_y)
    target_dot.set_data([target_position[0]], [target_position[1]])
    sensor_dot.set_data([sensor_position[0]], [sensor_position[1]])
    tracking_dot.set_data([tracking_position[0]], [tracking_position[1]])

    return target_line, target_dot, sensor_dot, tracking_line, tracking_dot,

D. Execution

In [9]:

beam_radius = calculate_radius(DISTANCE, BEAM_ANGLE)
initialise_spiral_parameters(axis, direction)

ani = animation.FuncAnimation(fig, update, frames=SIM_DURATION, interval=30, blit=True)
plt.show()

# 3. Taking water diffusion into consideration: class-discrimination algorithms


A. Generation of class discrimination histograms and COR curve

In [10]:
def log_likelihood(mean, alpha, yi):
    """Computes the log-likelihood"""
    P = len(yi)
    return (P * alpha * np.log(alpha / mean)
        - P * gammaln(alpha)
        + (alpha - 1) * np.sum(np.log(yi))
        - (alpha / mean) * np.sum(yi))


def max_log_likelihood(alpha, yi):
    """Returns the mean that maximises the log-likelihood"""
    result = minimize(lambda m: -log_likelihood(m, alpha, yi), [1.0], method='BFGS')
    return result.x[0]


def generate_data(alpha, P, n, m0, r):
    """Generates data necessary for plotting class discrimination histograms"""
    m1 = m0 * (1 + r)
    y0 = np.random.gamma(alpha, m0 / alpha, P * n)
    y1 = np.random.gamma(alpha, m1 / alpha, P * n)
    l0_0 = np.array([log_likelihood(m0, alpha, y0[i * P : (i + 1) * P]) for i in range(n)])
    l1_0 = np.array([log_likelihood(m1, alpha, y0[i * P : (i + 1) * P]) for i in range(n)])
    l0_1 = np.array([log_likelihood(m0, alpha, y1[i * P : (i + 1) * P]) for i in range(n)])
    l1_1 = np.array([log_likelihood(m1, alpha, y1[i * P : (i + 1) * P]) for i in range(n)])
    return l0_0, l1_0, l0_1, l1_1


def plot_histograms(l0, l1):
    """Plots the class discrimination histograms"""
    diff_0 = l1[0] - l0[0]
    diff_1 = l1[1] - l0[1]
    plt.figure(figsize=(8, 6))
    plt.hist([diff_0, diff_1], bins=50, density=True, color=['blue', 'red'], alpha=0.7,
             label=["$l_{1,0} - l_{0,0}$", "$l_{1,1} - l_{0,1}$"])
    plt.axvline(0, color='black', linestyle='--', linewidth=1, label='Zero')
    plt.xlabel("Value")
    plt.ylabel("Frequency")
    plt.grid()
    plt.legend()
    plt.show()


def plot_roc(diff_0, diff_1, num_thresholds=1000):
    """Plots the Reiciever Operating Caracteristic curve"""
    thresholds = np.linspace(min(diff_0.min(), diff_1.min()), max(diff_0.max(), diff_1.max()), num_thresholds)
    p_fa = [(diff_0 >= t).mean() for t in thresholds]
    p_d = [(diff_1 >= t).mean() for t in thresholds]
    plt.figure(figsize=(8, 6))
    plt.plot(p_fa, p_d, color='blue', label='ROC Curve')
    plt.xscale('log')
    plt.xlabel(r'$ P_{fa} $')
    plt.ylim([0, 1.05])
    plt.xlim([1e-5, 1])
    plt.ylabel(r'$ P_{de} $')
    plt.grid()
    plt.legend()
    plt.show()

B. Code execution

In [None]:
# Generate data and plot results
l0_0, l1_0, l0_1, l1_1 = generate_data(2, 10, 10000, 5, 0.5)
plot_histograms([l0_0, l0_1], [l1_0, l1_1])
plot_roc(l1_0 - l0_0, l1_1 - l0_1)

# 4. Kalman filter

A. Trajectories filtering using Kalman filter

In [12]:
def kalman_filter_trajectories(trajectories, meas_var_x, meas_var_y, process_var_x, process_var_y):
    """Uses Kalman filter to filter trajectories"""
    filtered_trajectories = []
    dt = 0.4  # Time interval between two successive measurements
    A = np.array([[1, dt], [0, 1]])  # States matrix
    H = np.array([[1, 0]])  # Observation matrix

    for t, x_noisy, y_noisy in trajectories:
        x = np.array([[x_noisy[0]], [0]])
        y = np.array([[y_noisy[0]], [0]])
        P = np.eye(2)
        Q_x = np.array([[process_var_x, 0], [0, process_var_x]])
        Q_y = np.array([[process_var_y, 0], [0, process_var_y]])
        R_x = meas_var_x
        R_y = meas_var_y

        x_filtered, y_filtered = [], []

        for i in range(len(t)):
            x = A @ x
            P = A @ P @ A.T + Q_x

            y = A @ y
            P = A @ P @ A.T + Q_y

            K_x = P @ H.T / (H @ P @ H.T + R_x)
            x = x + K_x * (x_noisy[i] - H @ x)
            P = (np.eye(2) - K_x @ H) @ P
            x_filtered.append(x[0, 0])

            K_y = P @ H.T / (H @ P @ H.T + R_y)
            y = y + K_y * (y_noisy[i] - H @ y)
            P = (np.eye(2) - K_y @ H) @ P
            y_filtered.append(y[0, 0])

        filtered_trajectories.append((t, np.array(x_filtered), np.array(y_filtered)))

    return filtered_trajectories

B. Optimisation of parameters

In [13]:
def estimate_noise_variances(trajectories, smooth_sigma):
    """Estimates noise parameters on noisy trajectories"""
    meas_vars_x, meas_vars_y = [], []
    process_vars_x, process_vars_y = [], []
    for t, x_t_noisy, y_t_noisy in trajectories:
        x_smooth = gaussian_filter1d(x_t_noisy, sigma=smooth_sigma)
        y_smooth = gaussian_filter1d(y_t_noisy, sigma=smooth_sigma)

        meas_vars_x.append(np.var(x_t_noisy - x_smooth))
        meas_vars_y.append(np.var(y_t_noisy - y_smooth))

        vx = np.gradient(x_smooth, t)
        vy = np.gradient(y_smooth, t)
        ax = np.gradient(vx, t)
        ay = np.gradient(vy, t)

        process_vars_x.append(np.var(ax))
        process_vars_y.append(np.var(ay))

    meas_var_x = np.mean(meas_vars_x)
    meas_var_y = np.mean(meas_vars_y)
    process_var_x = np.mean(process_vars_x)
    process_var_y = np.mean(process_vars_y)

    return meas_var_x, meas_var_y, process_var_x, process_var_y


def calculate_trajectory_error(trajectories, estimates):
    """Computes the total quadratic error between real and estimated trajectories"""    
    total_error = 0
    for (x_true, y_true), (x_est, y_est) in zip(trajectories, estimates):
        error = np.sum((x_true - x_est)**2 + (y_true - y_est)**2)
        total_error += error
    return total_error


def compute_error_for_sigma(trajectories_true, trajectories_noisy, sigma_smooth):
    """Computes error for a particular smoothing sigma_smooth"""
    meas_var_x, meas_var_y, process_var_x, process_var_y = estimate_noise_variances(trajectories_noisy, sigma_smooth)
    filtered_trajectories = kalman_filter_trajectories(trajectories_noisy, meas_var_x, meas_var_y, process_var_x, process_var_y)
    error = calculate_trajectory_error(trajectories_true, filtered_trajectories)
    return error


def grid_search(trajectories_true, trajectories_noisy, sigma_values):
    """Searches for the best sigma_smooth to minimise the error"""
    best_sigma = None
    best_error = float('inf')
    step = 0
    for sigma in sigma_values:
        step += 1
        print(step)
        error = compute_error_for_sigma(trajectories_true, trajectories_noisy, sigma)
        if error < best_error:
            best_error = error
            best_sigma = sigma
    return best_sigma, best_error

C. Application

In [None]:
# Generate real trajectories
real_trajectories = generate_trajectories(NUMBER_OF_TRAJ, MAX_DEG_X, MAX_DEG_Y)

# Add noise to trajectories
trajectories_noisy = add_noise_to_trajectories(real_trajectories, MU, SIGMA)

# Use grid search to optimise sigma
sigma_values = np.linspace(8, 12, 100)
best_sigma, best_error = grid_search(real_trajectories, trajectories_noisy, sigma_values)

# Estimate noise parameters using the best sigma
meas_var_x, meas_var_y, process_var_x, process_var_y = estimate_noise_variances(trajectories_noisy, best_sigma)

# Filter trajectories using Kalman filter
filtered_trajectories = kalman_filter_trajectories(trajectories_noisy, meas_var_x, meas_var_y, process_var_x, process_var_y)

# Display trajectories
t_true, x_true, y_true = real_trajectories[0]
t_noisy, x_noisy, y_noisy = trajectories_noisy[0]
t_filtered, x_filtered, y_filtered = filtered_trajectories[0]
plt.figure(figsize=(10, 6))
plt.plot(x_true, y_true, label='Trajectoire vraie (y)', color='g')
plt.scatter(x_noisy, y_noisy, label='Trajectoire bruitée (y)', color='r', s=5)
plt.plot(x_filtered, y_filtered, label='Trajectoire filtrée (y)', color='b')
plt.legend()
plt.title(f"Trajectoire")
plt.show()

# Complements: previous way of moving the AUV using differential equations

In [None]:
# target_position = np.array([np.random.uniform(WINDOW_SIZE/4, 3*WINDOW_SIZE/4), np.random.uniform(WINDOW_SIZE/4, 3*WINDOW_SIZE/4)])
# target_speed = np.array([np.random.uniform(-0.1, 0.1), np.random.uniform(-0.1, 0.1)])

# DT = 1
# NOISE = 0.1
# 
# def move_AUV():
#     """Met à jour les position et vitesse de la cible"""
#     global target_position
#     global target_speed

#     x, y = target_position
#     vx, vy = target_speed

#     new_x = x + vx*DT + random.gauss(0, NOISE*vx*DT)
#     new_y = y + vy*DT + random.gauss(0, NOISE*vy*DT)
#     new_vx = (new_x - x) / DT
#     new_vy = (new_y - y) / DT
    
#     target_position = np.array([new_x, new_y])
#     target_speed = np.array([new_vx, new_vy])
#     target_position = np.clip(target_position, 0, WINDOW_SIZE)
#     target_speed = np.clip(target_speed, 0, WINDOW_SIZE)

#     # Limiter la taille de l'historique pour éviter une surcharge mémoire
#     if len(target_history) > 50:
#         target_history.pop(0)
#     target_history.append(target_position.copy())
