# Семинар 9 - Методы построения оптического потока по последовательности изображений

**Этот семинар содержит оцениваемое домашнее задание**

***

Источник - https://habr.com/ru/post/201406/

$\textbf{Task statement}$: Оптический поток (ОП) – изображение видимого движения, представляющее собой сдвиг каждой точки (пикселя) между двумя изображениями.

По сути, он представляет собой поле скоростей. Суть ОП в том, что для каждой точки изображения $I_{t_0} (\vec{r})$ находится такой вектор сдвига $\delta \vec{r}$, чтобы было соответсвие между исходной точкой и точкой на следущем фрейме $I_{t_1} (\vec{r} + \delta \vec{r})$. В качестве метрики соответвия берут близость интенсивности пикселей, беря во внимание маленькую разницу по времени между кадрами: $\delta{t} = t_{1} - t_{0}$. В более точных методах точку можно привязывать к объекту на основе, например, выделения ключевых точек, а также считать градиенты вокруг точки, лапласианы и проч.

$\textbf{For what}$: Определение собственной скорости, Определение локализации, Улучшение методов трекинга объектов, сегментации, Детектирование событий, Сжатие видеопотока и проч.

![](data/tennis.png)

Разделяют 2 вида оптического потока - плотный (dense) [Farneback method, neural nets], работающий с целым изображением, и выборочный (sparse) [Lucas-Kanade method], работающий с ключевыми точками

In [None]:
# !wget https://www.bogotobogo.com/python/OpenCV_Python/images/mean_shift_tracking/slow_traffic_small.mp4 -O data/slow_traffic_small.mp4

In [None]:
import cv2
from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt
import IPython

%matplotlib inline

## Lucas-Kanade (sparse)

Пусть $I_{1} = I(x, y, t_{1})$ интенсивность в некоторой точке (x, y) на первом изображении (т. е. в момент времени t). На втором изображении эта точка сдвинулась на (dx, dy), при этом прошло время dt, тогда $I_{2} = I(x + dx, y + dx, t_{1} + dt) \approx I_{1} + I_{x}dx + I_{y}dy +  I_{t}dt$. Из постановки задачи следует, что интенсивность пикселя не изменилась, тогда $I_{1} = I_{2}$. Далее определяем $dx, dy$.

Самое простое решение проблемы – алгоритм Лукаса-Канаде. У нас же на изображении объекты размером больше 1 пикселя, значит, скорее всего, в окрестности текущей точки у других точек будут примерно такие же сдвиги. Поэтому мы возьмем окно вокруг этой точки и минимизируем (по МНК) в нем суммарную погрешность с весовыми коэффициентами, распределенными по Гауссу, то есть так, чтобы наибольший вес имели пиксели, ближе всего находящиеся к исследуемому.

**Полезные материалы:** 
- цикл видео-лекций от First Principles of Computer Vision, посвященный Optical Flow и алгоритму Lucas-Kanade: https://youtube.com/playlist?list=PL2zRqk16wsdoYzrWStffqBAoUY8XdvatV

### Вопрос 1

Перечислите три основных предположения, на которых базируется метод Lucas-Kanade. Почему каждое из них важно для корректной работы алгоритма?

**Ответ:**


### Вопрос 2

Объясните, зачем нужен пирамидальный подход в алгоритме Lucas-Kanade. Какую проблему он решает и как именно?

**Ответ:**

### Вопрос 3

С какими проблемами может столкнуться алгоритм Lucas-Kanade при отслеживании точек на видео? Назовите минимум три ограничения.

**Ответ:**


### Задание 1

Напишите реализацию Лукаса-Канаде c помощью numpy и cv2. Сравните с реализацией `cv2.calcOpticalFlowPyrLK`.

In [None]:
def build_image_pyramid(image, num_levels, scale_factor=0.5):
    """
    Создаёт пирамиду изображений с уменьшающимся разрешением.

    Аргументы:
        image: Исходное изображение (одноканальное, grayscale)
        num_levels: Количество уровней пирамиды
        scale_factor: Коэффициент масштабирования между соседними уровнями

    Возвращает:
        Список изображений [image_level_0, image_level_1, ..., image_level_n-1], где:
        - image_level_0 - исходное изображение
        - Каждый следующий уровень уменьшен относительно предыдущего в scale_factor раз

    Примечание:
        - Используйте cv2.resize с интерполяцией cv2.INTER_LINEAR для уменьшения размера
        - Первым элементом списка должна быть копия исходного изображения
    """
    pass


def compute_image_gradients(image):
    """
    Вычисляет пространственные градиенты изображения.

    Аргументы:
        image: Входное изображение (одноканальное, grayscale)

    Возвращает:
        Кортеж (Ix, Iy), где Ix и Iy - градиенты по x и y соответственно

    Примечание:
        - Используйте фильтр Собеля (cv2.Sobel) с ksize=3
        - Используйте тип данных cv2.CV_64F для более точных вычислений
    """
    pass


def compute_lk_optical_flow_point(Ix, Iy, It, window_size=5):
    """
    Вычисляет оптический поток по методу Lucas-Kanade для одного окна.

    Аргументы:
        Ix: Градиент изображения по x
        Iy: Градиент изображения по y
        It: Временной градиент (разница между кадрами)
        window_size: Размер окна для вычисления (нечетное число)

    Возвращает:
        Кортеж (u, v) компонентов вектора потока или (None, None) если решение ненадежное

    Примечание:
        - Создайте окно для градиентов, выбрав центральный пиксель и окно размером window_size x window_size
        - Вычислите сумму произведений градиентов для формирования матрицы A:
          A = [[sum(Ix*Ix), sum(Ix*Iy)], [sum(Ix*Iy), sum(Iy*Iy)]]
        - Проверьте обусловленность матрицы A через собственные значения
        - Если минимальное собственное значение меньше порога (например, 1e-4), верните (None, None)
        - Сформируйте вектор b: [-sum(Ix*It), -sum(Iy*It)]
        - Решите систему уравнений A * [u, v] = b
        - Обработайте возможное исключение np.linalg.LinAlgError
    """
    pass


def compute_lk_optical_flow_for_patch(prev_patch, curr_patch, window_size=5):
    """
    Вычисляет оптический поток для патча изображения.

    Аргументы:
        prev_patch: Патч из предыдущего кадра
        curr_patch: Соответствующий патч из текущего кадра
        window_size: Размер окна для LK

    Возвращает:
        Кортеж (u, v) компонентов вектора потока для центра патча

    Примечание:
        - Вычислите пространственные градиенты prev_patch с помощью compute_image_gradients
        - Вычислите временной градиент как разность патчей: It = curr_patch - prev_patch
        - Используйте функцию compute_lk_optical_flow_point для вычисления вектора потока
    """
    pass


def track_point_with_pyramid_lk(prev_pyramid, curr_pyramid, point, window_size=15, max_iterations=10, epsilon=0.01):
    """
    Отслеживает точку между кадрами с использованием пирамидального LK.

    Аргументы:
        prev_pyramid: Пирамида предыдущего кадра (список изображений)
        curr_pyramid: Пирамида текущего кадра (список изображений)
        point: Координаты отслеживаемой точки (x, y) на исходном изображении
        window_size: Размер окна для LK
        max_iterations: Максимальное количество итераций для уточнения каждого уровня
        epsilon: Порог для остановки итераций

    Возвращает:
        Кортеж (new_x, new_y) - новые координаты точки на текущем кадре
        или None если отслеживание неуспешно

    Примечание:
        - Начните обработку с верхнего уровня пирамиды (самое маленькое изображение)
        - Масштабируйте исходную точку для соответствия размеру изображения верхнего уровня
        - Для каждого уровня пирамиды (от верхнего к нижнему):
            1. Масштабируйте общее смещение в 2 раза при переходе на уровень ниже
            2. Пересчитайте позицию точки с учетом масштаба текущего уровня
            3. Примените итеративное уточнение позиции с помощью LK:
                a. Проверьте, что точка и окно вокруг неё находятся в границах изображения
                b. Извлеките патчи из предыдущего и текущего кадров
                c. Вычислите смещение с помощью compute_lk_optical_flow_for_patch
                d. Обновите позицию точки
                e. Остановите итерации, если смещение меньше epsilon
            4. Обновите общее смещение
        - Вычислите финальную позицию точки на исходном изображении
    """
    pass


def lucas_kanade_optical_flow(prev_frame, curr_frame, points,
                             window_size=15, num_pyramid_levels=3,
                             max_iterations=10, epsilon=0.01):
    """
    Вычисляет разреженный оптический поток методом Лукаса-Канаде.

    Аргументы:
        prev_frame: Предыдущий кадр (может быть цветным)
        curr_frame: Текущий кадр (может быть цветным)
        points: Список точек для отслеживания в формате [[x1, y1], [x2, y2], ...]
        window_size: Размер окна для LK
        num_pyramid_levels: Количество уровней в пирамиде изображений
        max_iterations: Максимальное количество итераций на каждом уровне
        epsilon: Порог сходимости для итераций

    Возвращает:
        Кортеж (new_points, status), где:
        - new_points: Массив новых позиций точек в формате [[x1, y1], [x2, y2], ...]
        - status: Массив статусов отслеживания (1 - успешно, 0 - неуспешно)

    Примечание:
        - Преобразуйте входные кадры в полутоновые, если они цветные
        - Нормализуйте изображения к диапазону [0, 1]
        - Создайте пирамиды изображений для обоих кадров
        - Для каждой точки из списка:
            1. Отследите её с помощью track_point_with_pyramid_lk
            2. Сохраните результат в new_points и отметьте статус в status
        - Если отслеживание точки не удалось, установите статус 0 и сохраните исходную точку
    """
    pass


def demo_optical_flow(video_path='data/slow_traffic_small.mp4', output_path='output_my_LK.mp4'):
    """
    Демонстрация работы алгоритма на видео.

    Args:
        video_path: Путь к входному видео
        output_path: Путь для сохранения результата
    """
    # Открываем видео
    cap = cv2.VideoCapture(video_path)

    # Получаем параметры видео
    length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)

    # Настраиваем запись выходного видео
    fourcc = cv2.VideoWriter_fourcc(*'MP4V')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    # Параметры для обнаружения углов Shi-Tomasi
    feature_params = dict(
        maxCorners=100,
        qualityLevel=0.3,
        minDistance=7,
        blockSize=7
    )

    # Берем первый кадр и находим в нем углы
    ret, old_frame = cap.read()
    old_gray = cv2.cvtColor(old_frame, cv2.COLOR_BGR2GRAY)
    p0 = cv2.goodFeaturesToTrack(old_gray, mask=None, **feature_params)
    p0 = p0.reshape(-1, 2)  # Преобразуем в формат [[x1, y1], [x2, y2], ...]

    # Сохраняем изначальные точки для отслеживания через все видео
    initial_points = p0.copy()

    # Создаем маску для рисования
    mask = np.zeros_like(old_frame)

    # Создаем случайные цвета для визуализации
    color = np.random.randint(0, 255, (len(p0), 3))

    from tqdm import tqdm
    for i in tqdm(range(length - 1)):  # -1 потому что первый кадр мы уже прочитали
        ret, frame = cap.read()

        if not ret:
            print('No frames grabbed!')
            break

        frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        # Вычисляем оптический поток с помощью нашей реализации
        p1, st = lucas_kanade_optical_flow(
            old_gray,
            frame_gray,
            p0,
            window_size=15,
            num_pyramid_levels=3
        )

        # Выбираем хорошие точки
        good_new = p1[st == 1]
        good_old = p0[st == 1]

        # Рисуем треки
        for i, (new, old) in enumerate(zip(good_new, good_old)):
            a, b = new
            c, d = old
            mask = cv2.line(mask, (int(a), int(b)), (int(c), int(d)), color[i % len(color)].tolist(), 2)
            frame = cv2.circle(frame, (int(a), int(b)), 5, color[i % len(color)].tolist(), -1)

        # Объединяем кадр и маску
        img = cv2.add(frame, mask)

        # Записываем результат
        out.write(img)

        # Обновляем предыдущий кадр
        old_gray = frame_gray.copy()

        # Обновляем точки, но только те, которые успешно отслежены
        p0[st == 1] = good_new

    # Освобождаем ресурсы
    cap.release()
    out.release()

    print(f"Результат сохранен в {output_path}")
    return output_path

In [None]:
result_path = demo_optical_flow(video_path='data/slow_traffic_small.mp4', output_path='output_my_LK.mp4')

### Релизация OpenCV - cv2.calcOpticalFlowPyrLK

In [None]:
def demo_optical_flow_opencv(video_path='data/slow_traffic_small.mp4', output_path='output_my_LK.mp4'):
    """
    Демонстрация работы алгоритма на видео с использованием cv2.calcOpticalFlowPyrLK.

    Args:
        video_path: Путь к входному видео
        output_path: Путь для сохранения результата
    """
    # Открываем видео
    cap = cv2.VideoCapture(video_path)

    # Получаем параметры видео
    length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)

    # Настраиваем запись выходного видео
    fourcc = cv2.VideoWriter_fourcc(*'MP4V')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    # Параметры для обнаружения углов Shi-Tomasi
    feature_params = dict(
        maxCorners=100,
        qualityLevel=0.3,
        minDistance=7,
        blockSize=7
    )

    # Параметры для Lucas-Kanade оптического потока
    lk_params = dict(
        winSize=(15, 15),
        maxLevel=3,
        criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)
    )

    # Берем первый кадр и находим в нем углы
    ret, old_frame = cap.read()
    old_gray = cv2.cvtColor(old_frame, cv2.COLOR_BGR2GRAY)
    p0 = cv2.goodFeaturesToTrack(old_gray, mask=None, **feature_params)

    # Создаем маску для рисования
    mask = np.zeros_like(old_frame)

    # Создаем случайные цвета для визуализации
    color = np.random.randint(0, 255, (100, 3))

    from tqdm import tqdm
    for i in tqdm(range(length - 1)):  # -1 потому что первый кадр мы уже прочитали
        ret, frame = cap.read()

        if not ret:
            print('No frames grabbed!')
            break

        frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        # Вычисляем оптический поток с помощью встроенной функции cv2.calcOpticalFlowPyrLK
        p1, st, err = cv2.calcOpticalFlowPyrLK(old_gray, frame_gray, p0, None, **lk_params)

        # Выбираем хорошие точки
        if p1 is not None:
            good_new = p1[st == 1]
            good_old = p0[st == 1]

        # Рисуем треки
        for i, (new, old) in enumerate(zip(good_new, good_old)):
            a, b = new.ravel()
            c, d = old.ravel()
            mask = cv2.line(mask, (int(a), int(b)), (int(c), int(d)), color[i % len(color)].tolist(), 2)
            frame = cv2.circle(frame, (int(a), int(b)), 5, color[i % len(color)].tolist(), -1)

        # Объединяем кадр и маску
        img = cv2.add(frame, mask)

        # Записываем результат
        out.write(img)

        # Обновляем предыдущий кадр
        old_gray = frame_gray.copy()

        # Обновляем точки, но только те, которые успешно отслежены
        p0 = good_new.reshape(-1, 1, 2)

    # Освобождаем ресурсы
    cap.release()
    out.release()

    print(f"Результат сохранен в {output_path}")
    return output_path

In [None]:
result_path = demo_optical_flow_opencv(video_path='data/slow_traffic_small.mp4', output_path='output_opencv_LK.mp4')

### Задание 2

В базовой реализации у кода есть одна важная проблема - ключевые точки инициализируются единожды. В реальных задачах необходимо отслеживать точки, которые исчезают из кадра и появляются в других местах. Реализуйте механизм, который будет отслеживать точки, которые пропадают из кадра и добавлять новые точки в те места, где они появляются. Для этого вам нужно будет реализовать механизм поиска новых точек на изображении.

In [None]:
# your code

### Вопрос 4

В чем основное отличие разреженного (sparse) оптического потока Lucas-Kanade от плотного (dense) оптического потока (например, метода Farneback)?

**Ответ:**


## Farneback (dense)

Метод Farneback носит несколько более глобальный характер, чем метод Лукаса-Канаде. Он опирается на предположение о том, что на всем изображении оптический поток будет достаточно гладким.

# Вопрос 5

Перечислите основные шаги алгоритма Farneback для расчета оптического потока.

**Ответ:**


### Вопрос 6

Каким образом в методе Farneback обрабатываются большие смещения объектов между кадрами?

**Ответ:**


In [None]:
def demo_optical_flow_farneback_opencv(video_path='data/slow_traffic_small.mp4', output_path='output_Farneback.mp4'):
    """
    Демонстрация работы алгоритма плотного оптического потока Farneback на видео.

    Args:
        video_path: Путь к входному видео
        output_path: Путь для сохранения результата
    """
    # Открываем видео
    cap = cv2.VideoCapture(video_path)

    # Получаем параметры видео
    length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)

    # Настраиваем запись выходного видео
    fourcc = cv2.VideoWriter_fourcc(*'MP4V')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    # Берем первый кадр и преобразуем его в оттенки серого
    ret, frame1 = cap.read()
    if not ret:
        print('Не удалось прочитать видео')
        return None

    prvs = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)

    # Создаем HSV-изображение для визуализации потока
    hsv = np.zeros_like(frame1)
    hsv[..., 1] = 255  # Насыщенность устанавливаем на максимум

    from tqdm import tqdm
    for i in tqdm(range(length - 1)):  # -1 потому что первый кадр мы уже прочитали
        ret, frame2 = cap.read()

        if not ret:
            print('No frames grabbed!')
            break

        next_frame = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)

        # Вычисляем оптический поток методом Farneback
        # Параметры:
        # - 0.5: коэффициент масштабирования для пирамиды изображений
        # - 3: кол-во уровней пирамиды
        # - 15: размер окна для усреднения
        # - 3: число итераций на каждом уровне пирамиды
        # - 5: размер окна для полиномиальной аппроксимации
        # - 1.2: стандартное отклонение для сглаживания
        flow = cv2.calcOpticalFlowFarneback(
            prvs, next_frame, None,
            0.5, 3, 15, 3, 5, 1.2, 0
        )

        # Преобразуем векторы потока из декартовых координат в полярные
        mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])

        # Кодируем направление потока как оттенок (hue)
        hsv[..., 0] = ang * 180 / np.pi / 2

        # Кодируем величину потока как яркость (value)
        hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)

        # Преобразуем HSV в BGR для отображения
        bgr = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)

        # Записываем результат
        out.write(bgr)

        # Обновляем предыдущий кадр
        prvs = next_frame

    # Освобождаем ресурсы
    cap.release()
    out.release()

    print(f"Результат сохранен в {output_path}")
    return output_path

In [None]:
result_path = demo_optical_flow_farneback_opencv(video_path='data/slow_traffic_small.mp4', output_path='output_opencv_farneback.mp4')

### Вопрос 7

Как влияет предварительная обработка изображений (фильтрация шума, выравнивание гистограмм) на качество оптического потока, получаемого методом Farneback? Предложите оптимальный пайплайн предобработки.

**Ответ:**
