# Particle Tracking

**RULES:** As usual, **`OpenCV`** is banned in this repository.


## Introduction


In [None]:
import cv2
import numpy as np
import time
import imageio
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import display, clear_output, HTML
import base64
from PIL import Image, ImageDraw
from skimage import io, color
from particle_filter_utils import *
from functools import partial

video_reader = imageio.get_reader("./input/pres_debate.avi")
frames = []
for frame in video_reader:
    frame_array = np.array(frame)
    frames.append(frame_array)
video_reader.close()
president_video = np.array(frames)

video_reader = imageio.get_reader("./input/pedestrians.avi")
frames = []
for frame in video_reader:
    frame_array = np.array(frame)
    frames.append(frame_array)
video_reader.close()
blonde_video = np.array(frames)


In [None]:
class DynamicModel:
    def __init__(self, std) -> None:
        self.std = std

    def predict(self, particles):
        # The simplest dyanmic model is brownian motion.
        particles += np.random.normal(loc=0, scale=self.std, size=particles.shape)


class MeasurementModel:
    def __init__(self, std, template, measure_func, alpha=0.0, update_thres=2) -> None:
        self.std = std
        self.template = template
        self.alpha = alpha
        self.measure_func = measure_func
        self.update_thres = update_thres

    def measure(self, particles, frame):
        num_particles = particles.shape[0]
        # Get windows corresponding to each particle
        win_height, win_width = self.template.model.shape[:2]
        start_x = (particles[:, 0] - win_width / 2).astype(int)
        start_y = (particles[:, 1] - win_height / 2).astype(int)
        end_x = start_x + win_width
        end_y = start_y + win_height

        artificial_measurements = []
        for i in range(num_particles):
            window = frame[start_y[i] : end_y[i], start_x[i] : end_x[i]]
            # window += np.random.normal(loc=0, scale=self.std, size=window.shape).astype(np.uint8)
            artificial_measurements.append(window)

        # Compute importance weight: Measuring similarity of each window to the template
        weights = np.array(
            [self.measure_func(m, self.template.model) for m in artificial_measurements]
        )
        # Normalize the weights
        weights /= np.sum(weights)

        # Only update template if very certain.
        best_index = np.argmax(weights)
        if weights[best_index] > self.update_thres * 1 / num_particles:
            self.update_template(frame, particles[best_index, :])

        return weights

    def update_template(self, frame, states):
        w = self.template.w
        h = self.template.h
        x = int(states[0] - w / 2)
        y = int(states[1] - h / 2)
        best_model = frame[y : y + h, x : x + w]
        if best_model.shape == self.template.model.shape:
            self.template.model = (
                self.alpha * best_model + (1 - self.alpha) * self.template.model
            )
            # self.template.model = self.template.model.astype(np.uint8)


def mean_squared_error(window, template, mse_std):
    if window.shape != template.shape:
        return 0
    mse = np.sum(np.subtract(window, template, dtype=np.float64) ** 2)
    mse /= float(window.shape[0] * window.shape[1])
    return np.exp(-mse / (2 * mse_std**2))


def cosine_similarity(window, template):
    """Cosine Similarity
    left, right: C x H x W"""
    # assert window.shape == template.shape and window.ndim == 3
    if window.shape != template.shape:
        return 0
    dividend = np.sum(np.multiply(window, template))
    divisor = np.multiply(np.sqrt(np.sum(window**2)), np.sqrt(np.sum(template**2)))
    assert np.all(divisor != 0), f"divisor has 0 in it. {divisor}"
    return 0.5 * (np.divide(dividend, divisor) + 1)


def chi_squared(window, template, num_bins=8):
    if window.shape != template.shape:
        return 0
    # Compute histograms for each channel
    hist1 = []
    hist2 = []

    for channel in range(window.shape[2]):
        hist1_channel, _ = np.histogram(
            window[:, :, channel], bins=num_bins, range=(0, 256)
        )
        hist2_channel, _ = np.histogram(
            template[:, :, channel], bins=num_bins, range=(0, 256)
        )

        # Normalize the histograms
        hist1_channel = hist1_channel.astype(np.float64)
        hist2_channel = hist2_channel.astype(np.float64)
        hist1_channel /= hist1_channel.sum() + 1e-10
        hist2_channel /= hist2_channel.sum() + 1e-10

        hist1.append(hist1_channel)
        hist2.append(hist2_channel)

    hist1 = np.array(hist1)
    hist2 = np.array(hist2)

    # Compute the Chi-Squared distance
    chi_squared_distance = 0.5 * np.sum(
        ((hist1 - hist2) ** 2) / (hist1 + hist2 + 1e-10)
    )
    mse_std = 15
    return np.exp(-chi_squared_distance / (2 * mse_std**2))


class NaiveParticleFilter:
    def __init__(
        self,
        search_height,
        search_width,
        dynamic_model: DynamicModel,
        measure_model: MeasurementModel,
        num_particles=100,
    ):
        self.search_height = search_height
        self.search_width = search_width
        self.num_particles = num_particles
        self.dynamic_model = dynamic_model
        self.measure_model = measure_model

        particles = []
        for x, y in zip(
            np.random.uniform(0, self.search_width, self.num_particles),
            np.random.uniform(0, self.search_height, self.num_particles),
        ):
            particles.append((x, y))

        self.particles = np.array(particles)  # (N, 2): x, y
        self.weights = np.ones(self.num_particles) / self.num_particles  # (N, ): weight
        self.states = self.particles[0, :]

    def update(self, frame):
        self.resample_particles()
        self.dynamic_model.predict(self.particles)
        self.weights = self.measure_model.measure(self.particles, frame)
        self.update_states()

    def resample_particles(self):
        sw, sh = self.search_width, self.search_height

        # sample new particle indices using the distribution of the weights
        j = np.random.choice(
            np.arange(self.num_particles),
            self.num_particles,
            replace=True,
            p=self.weights,
        )
        # get a random control input from a normal distribution
        # sample the particles using the distribution of the weights
        self.particles = np.array(self.particles[j])
        assert self.particles.shape[0] == self.num_particles
        # clip particles in case the window goes out of the image limits
        self.particles[:, 0] = np.clip(self.particles[:, 0], 0, sw - 1)
        self.particles[:, 1] = np.clip(self.particles[:, 1], 0, sh - 1)
        self.weights = np.ones(self.num_particles) / self.num_particles

    def update_states(self):
        best_index = np.argmax(self.weights)
        self.states = self.particles[best_index, :]

In [None]:
# patch = president_video[0, ...]
# x, y, w, h = 320.8751, 175.1776, 103.5404, 129.0504
# template = Template(patch, x, y, w, h)

# fig, ax = plt.subplots(figsize=(6, 4))  # Adjust the figure size as needed
# plt.axis("off")
# ax.imshow(template.model)
# plt.show()
# print(
#     "template.model: ",
#     template.model.shape,
#     template.model.dtype,
#     np.min(template.model),
#     np.max(template.model),
# )


# tracker = NaiveParticleFilter(
#     patch.shape[1],
#     patch.shape[0],
#     DynamicModel(std=10),
#     MeasurementModel(
#         std=10,
#         template=template,
#         measure_func=partial(mean_squared_error, mse_std=16),
#         # measure_func=partial(chi_squared, num_bins=16),
#         # measure_func=partial(cosine_similarity),
#         alpha=0.0075,
#     ),
#     num_particles=100,
# )

# for i in range(1, president_video.shape[0]):
#     start_time = time.time()

#     frame = president_video[i, ...]

#     tracker.update(frame)

#     frame = visualize_particle_filter(
#         frame, tracker.particles, tracker.states, tracker.measure_model.template
#     )

#     delay = int(25 - (time.time() - start_time))
#     if cv2.waitKey(delay) & 0xFF == ord("q"):
#         break
#     cv2.imshow("pres_debate", frame[:, :, ::-1])  # Display the resulting frame
#     # ax.imshow(frame)
#     # display(fig)q
#     # clear_output(wait=True)
#     # time.sleep(1)

# cv2.destroyAllWindows()

In [None]:
patch = blonde_video[0, ...]
x, y, w, h = 211.0000, 36.0000, 100.0000, 293.0000
template = Template(patch, x, y, w, h)

fig, ax = plt.subplots(figsize=(6, 4))  # Adjust the figure size as needed
plt.axis("off")
ax.imshow(template.model)
plt.show()
print(
    "template.model: ",
    template.model.shape,
    template.model.dtype,
    np.min(template.model),
    np.max(template.model),
)


tracker = NaiveParticleFilter(
    patch.shape[1],
    patch.shape[0],
    DynamicModel(std=10),
    MeasurementModel(
        std=10,
        template=template,
        measure_func=partial(mean_squared_error, mse_std=16),
        # measure_func=partial(chi_squared, num_bins=16),
        # measure_func=partial(cosine_similarity),
        alpha=0.0075,
    ),
    num_particles=100,
)

for i in range(1, blonde_video.shape[0]):
    start_time = time.time()

    frame = blonde_video[i, ...]

    tracker.update(frame)

    frame = visualize_particle_filter(
        frame, tracker.particles, tracker.states, tracker.measure_model.template
    )

    delay = int(25 - (time.time() - start_time))
    if cv2.waitKey(delay) & 0xFF == ord("q"):
        break
    cv2.imshow("pres_debate", frame[:, :, ::-1])  # Display the resulting frame
    # ax.imshow(frame)
    # display(fig)q
    # clear_output(wait=True)
    # time.sleep(1)

cv2.destroyAllWindows()