### Particle filters with Python

Фильтры частиц (Particle filters) представляют собой широкую семью алгоритмов последовательного метода Монте-Карло (Sequential Monte Carlo, SMC) для приближённого вывода в частично наблюдаемых марковских цепях. Цель фильтра частиц — оценить апостериорную плотность вероятности переменных состояния с учётом наблюдаемых переменных. Универсальный фильтр частиц оценивает апостериорное распределение скрытых состояний, используя процесс измерения наблюдений.

Сравнение с гистограммными фильтрами и фильтрами Калмана: фильтры частиц обычно работают в непрерывном пространстве состояний, могут представлять произвольные мультимодальные распределения и, как гистограммные фильтры и фильтры Калмана, являются приближёнными методами. Основное преимущество фильтров частиц заключается в том, что их довольно просто программировать.

В текущем посте рассматривается фильтр частиц, используемый для задачи локализации в непрерывном пространстве. Пример кода на Python был представлен Себастьяном Труном в его лекции о «Фильтрах частиц» в онлайн-курсе Udacity. Здесь он объясняется подробно и дополняется инструментами визуализации. Полный код реализации фильтра частиц доступен по ссылке.

Шаги алгоритма:

   • Инициализация: Создаётся множество случайных частиц, каждая из которых представляет возможное состояние системы.

   • Предсказание: Каждая частица обновляется на основе модели движения системы (например, модель движения робота).

   • Обновление весов: Каждой частице присваивается вес на основе того, насколько хорошо она соответствует текущим наблюдениям.

   • Ресэмплинг: Частицы с низкими весами удаляются, а частицы с высокими весами копируются. Это позволяет сосредоточиться на наиболее вероятных состояниях.

   • Этот процесс повторяется при каждом новом наблюдении.

4. Результат:
   Итоговое распределение частиц даёт представление о вероятности нахождения системы в разных состояниях.

В этом примере робот живёт в мире размером 100 на 100. В мире есть 8 точек-ориентиров

In [1]:
import math
import os
import random
import sys
from copy import copy
from dataclasses import dataclass

import matplotlib.pyplot as plt

WORLD_SIZE = 100


@dataclass(frozen=True)
class Point:
    x: float = 0.
    y: float = 0.

    def __post_init__(self) -> None:
        if not 0 <= self.x < WORLD_SIZE:
            raise ValueError(f'x = {self.x} is out of bounds')
        if not 0 <= self.y < WORLD_SIZE:
            raise ValueError(f'y = {self.y} is out of bounds')


@dataclass(frozen=True)
class Noise:
    forward: float = 0.
    turn: float = 0.
    sense: float = 0.


LANDMARKS = (Point(20., 20.), Point(20., 80.),
             Point(20., 50.), Point(50., 20.),
             Point(50., 80.), Point(80., 80.),
             Point(80., 20.), Point(80., 50.))

Робот может поварачиваться, двигаться прямо после поворота, измерять дистанции до восьми ориентиров. RoboState будет моделировать состояние робота. Начальное положеие - праметр шума

In [2]:
class RobotState:

    def __init__(self, point: Point = None, angle: float = None,
                 noise: Noise = None) -> None:
        self.point = point if point else Point(random.random() * WORLD_SIZE,
                                               random.random() * WORLD_SIZE)
        self._noise = noise if noise else Noise(0., 0., 0.)

        if angle:
            if not 0 <= angle <= 2 * math.pi:
                raise ValueError(f'Angle must be within [{0.}, {2 * math.pi}, '
                                 f'the given value is {angle}]')
        self.angle = angle if angle else random.random() * 2.0 * math.pi

    @property
    def point(self) -> Point:
        return self._point

    @point.setter
    def point(self, point: Point) -> None:
        self._point = point

    @property
    def angle(self) -> float:
        return self._angle

    @angle.setter
    def angle(self, value: float) -> None:
        self._angle = float(value)

    def __str__(self) -> str:
        x, y = self.point.x, self.point.y
        return f'x = {x:.3f} y = {y:.3f} angle = {self.angle:.3f}'

    def __copy__(self) -> 'RobotState':
        return type(self)(self.point, self.angle, self._noise)

    # При вычислении расстояния до ориентиров есть шум. У шума нулевое математическое ожидание.

    def _distance(self, landmark: Point) -> float:
        x, y = self.point.x, self.point.y
        dist = (x - landmark.x) ** 2 + (y - landmark.y) ** 2
        return math.sqrt(dist)

    def sense(self) -> list[float]:
        return [self._distance(x) + random.gauss(.0, self._noise.sense)
                for x in LANDMARKS]

    # Движение робота:

    def move(self, turn: float, forward: float) -> None:
        if forward < 0.:
            raise ValueError('RobotState cannot move backwards')

        # turn, and add randomness to the turning command
        angle = self._angle + turn + random.gauss(0., self._noise.turn)
        angle %= 2 * math.pi

        # move, and add randomness to the motion command
        gain = forward + random.gauss(0., self._noise.forward)
        x = self.point.x + math.cos(angle) * gain
        y = self.point.y + math.sin(angle) * gain

        self.point = Point(x % WORLD_SIZE, y % WORLD_SIZE)
        self.angle = angle

    # Вычисление шума

    @staticmethod
    def gaussian(mu: float, sigma: float, x: float) -> float:
        var = sigma ** 2
        numerator = math.exp(-((x - mu) ** 2) / (2 * var))
        denominator = math.sqrt(2 * math.pi * var)
        return numerator / (denominator + sys.float_info.epsilon)

    # Следующая функция, которая нам понадобится, чтобы присвоить вес каждой частице в соответствии с текущим измерением. Более подробную информацию смотрите в тексте ниже. Она эффективно использует гауссову шкалу, которая измеряет, насколько далеко прогнозируемые измерения будут отличаться от фактических. Обратите внимание, что для этой функции вам следует позаботиться о шуме измерения, чтобы предотвратить деление на ноль. Такие проверки здесь пропущены, чтобы сохранить код как можно более коротким и компактным.

    def meas_probability(self, measurement: list[float]) -> float:
        prob = 1.
        for ind, landmark in enumerate(LANDMARKS):
            dist = self._distance(landmark)
            prob *= self.gaussian(dist, self._noise.sense, measurement[ind])
        return prob

# Функция ниже демонстрирует использование класса робота

def robot_playground() -> None:
    robot = RobotState(Point(30., 50.), math.pi / 2., Noise(5., .1, 5.))
    print(robot)
    print(robot.sense())

    # clockwise turn and move
    robot.move(-math.pi / 2., 15.)
    print(robot)
    print(robot.sense())

    # clockwise turn again and move
    robot.move(-math.pi / 2., 10.)
    print(robot)
    print(robot.sense())

Теперь мы создаем робота со случайной ориентацией в случайном месте в мире. Наш фильтр частиц будет поддерживать набор из 1000 случайных предположений (частиц) о том, где может находиться робот. Каждое предположение (или частица) представляет собой вектор, содержащий координаты (x,y) и направление движения, которое представляет собой угол относительно оси x. Теперь мы создаем список из 1000 частиц:

In [5]:
# Для расчёта погрешности
def evaluation(robot: RobotState, particles: list[RobotState]) -> float:
    sum_ = 0.
    x, y = robot.point.x, robot.point.y
    for particle in particles:
        dx = (particle.point.x - x + (WORLD_SIZE / 2.)) % WORLD_SIZE - (
            WORLD_SIZE / 2.)
        dy = (particle.point.y - y + (WORLD_SIZE / 2.)) % WORLD_SIZE - (
            WORLD_SIZE / 2.)
        err = math.sqrt(dx ** 2 + dy ** 2)
        sum_ += err

    return sum_ / len(particles)

# Визуализация
def visualization(_robot: RobotState, step: int, _particles: list[RobotState],
                  _particles_resampled: list[RobotState]) -> None:
    plt.figure("Robot in the world", figsize=(15., 15.))
    plt.title('Particle filter, step ' + str(step))

    # draw coordinate grid for plotting
    grid = [0, WORLD_SIZE, 0, WORLD_SIZE]
    plt.axis(grid)
    plt.grid(visible=True, which='major', color='0.75', linestyle='--')
    plt.xticks(range(0, int(WORLD_SIZE), 5))
    plt.yticks(range(0, int(WORLD_SIZE), 5))

    def draw_circle(x_: float, y_: float, face: str, edge: str,
                    alpha: float = 1.) -> None:
        circle = plt.Circle(
            (x_, y_), 1., facecolor=face, edgecolor=edge, alpha=alpha)
        plt.gca().add_patch(circle)

    def draw_arrow(x_: float, y_: float, face: str, edge: str,
                   alpha: float = 1.) -> None:
        arrow = plt.Arrow(x_, y_, 2 * math.cos(particle.angle),
                          2 * math.sin(particle.angle), facecolor=face,
                          edgecolor=edge, alpha=alpha)
        plt.gca().add_patch(arrow)

    # draw particles
    for particle in _particles:
        x, y = particle.point.x, particle.point.y
        draw_circle(x, y, '#ffb266', '#994c00', 0.5)
        draw_arrow(x, y, '#994c00', '#994c00')

    # draw resampled particles
    for particle in _particles_resampled:
        x, y = particle.point.x, particle.point.y
        draw_circle(x, y, '#66ff66', '#009900', 0.5)
        draw_arrow(x, y, '#006600', '#006600')

    # draw landmarks
    for landmark in LANDMARKS:
        draw_circle(landmark.x, landmark.y, '#cc0000', '#330000')

    # robot's location and angle
    draw_circle(_robot.point.x, _robot.point.y, '#6666ff', '#0000cc')
    draw_arrow(_robot.point.x, _robot.point.y, '#000000', '#000000', 0.5)

    # plt.savefig(os.path.join('output', 'figure_' + str(step) + '.png'))
    # plt.close()

    my_path = os.getcwd()
    plt.savefig(my_path + f'/states/figure_{str(step)}.png')
    plt.close()




# Код алгоритма
robot = RobotState()

n = 1000
particles = [RobotState(noise=Noise(0.05, 0.05, 5.)) for _ in range(n)]

# Для  каждой частицы мы симулируем движение робота. Они повернутся на 0.1 и пройдут 5 метров

steps = 50
for step in range(steps):
    robot.move(.1, 5.)
    meas = robot.sense()

    for p in particles:
        p.move(.1, 5.)

    # Чем ближе частица к текущей позиции, тем точнее будет расчёт ориентиров данной позиции. Разница между измерениями и предсказанными измерениями - Вес важности. Он обозначает, насколько важна чситаца.

    weights = [p.meas_probability(meas) for p in particles]

    # После этого мы позволим частицам выживать со случайно йвероятностью, но эта вероятность будет пропорциональна Весу важности. Процедура называется Ресэмплинг

    particles_resampled = []
    index = int(random.random() * n)
    beta = 0.

    for _ in range(n):
        beta += random.random() * 2. * max(weights)
        while beta > weights[index]:
            beta -= weights[index]
            index = (index + 1) % n
        particles_resampled.append(copy(particles[index]))

    # На каждмо шаге мы хотим увидеть погреность расчётов, используя функцию evaluation

    print(f'step {step}, evaluation: {evaluation(robot, particles):.3f}')
    # Визуализация
    visualization(robot, step, particles, particles_resampled)
    particles = particles_resampled





step 0, evaluation: 39.170
step 1, evaluation: 6.527
step 2, evaluation: 5.079
step 3, evaluation: 2.213
step 4, evaluation: 2.666
step 5, evaluation: 3.415
step 6, evaluation: 3.998
step 7, evaluation: 4.077
step 8, evaluation: 2.353
step 9, evaluation: 1.901
step 10, evaluation: 1.774
step 11, evaluation: 1.737
step 12, evaluation: 1.726
step 13, evaluation: 1.826
step 14, evaluation: 1.840
step 15, evaluation: 1.804
step 16, evaluation: 1.811
step 17, evaluation: 1.808
step 18, evaluation: 1.763
step 19, evaluation: 1.705
step 20, evaluation: 1.697
step 21, evaluation: 1.662
step 22, evaluation: 1.638
step 23, evaluation: 1.655
step 24, evaluation: 1.706
step 25, evaluation: 1.736
step 26, evaluation: 1.758
step 27, evaluation: 1.712
step 28, evaluation: 1.689
step 29, evaluation: 1.749
step 30, evaluation: 1.837
step 31, evaluation: 1.924
step 32, evaluation: 1.592
step 33, evaluation: 1.506
step 34, evaluation: 1.509
step 35, evaluation: 1.543
step 36, evaluation: 1.611
step 37, e

As we can see, particles inconsistent with the robot measurements tend to die out and only a correct set of particles survives.