# Семинар 9 - Методы построения оптического потока по последовательности изображений

***

Источник - https://habr.com/ru/post/201406/

$\textbf{Task statement}$: Оптический поток (ОП) – изображение видимого движения, представляющее собой сдвиг каждой точки (пикселя) между двумя изображениями.

По сути, он представляет собой поле скоростей. Суть ОП в том, что для каждой точки изображения $I_{t_0} (\vec{r})$ находится такой вектор сдвига $\delta \vec{r}$, чтобы было соответсвие между исходной точкой и точкой на следущем фрейме $I_{t_1} (\vec{r} + \delta \vec{r})$. В качестве метрики соответвия берут близость интенсивности пикселей, беря во внимание маленькую разницу по времени между кадрами: $\delta{t} = t_{1} - t_{0}$. В более точных методах точку можно привязывать к объекту на основе, например, выделения ключевых точек, а также считать градиенты вокруг точки, лапласианы и проч.

$\textbf{For what}$: Определение собственной скорости, Определение локализации, Улучшение методов трекинга объектов, сегментации, Детектирование событий, Сжатие видеопотока и проч.

![](data/tennis.png)

Разделяют 2 вида оптического потока - плотный (dense) [Farneback method, neural nets], работающий с целым изображением, и выборочный (sparse) [Lucas-Kanade method], работающий с ключевыми точками

In [None]:
!wget https://www.bogotobogo.com/python/OpenCV_Python/images/mean_shift_tracking/slow_traffic_small.mp4 -O data/slow_traffic_small.mp4

In [1]:
import cv2
from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt
import IPython

%matplotlib inline

## Lucas-Kanade (sparse)

Пусть $I_{1} = I(x, y, t_{1})$ интенсивность в некоторой точке (x, y) на первом изображении (т. е. в момент времени t). На втором изображении эта точка сдвинулась на (dx, dy), при этом прошло время dt, тогда $I_{2} = I(x + dx, y + dx, t_{1} + dt) \approx I_{1} + I_{x}dx + I_{y}dy +  I_{t}dt$. Из постановки задачи следует, что интенсивность пикселя не изменилась, тогда $I_{1} = I_{2}$. Далее определяем $dx, dy$.

Самое простое решение проблемы – алгоритм Лукаса-Канаде. У нас же на изображении объекты размером больше 1 пикселя, значит, скорее всего, в окрестности текущей точки у других точек будут примерно такие же сдвиги. Поэтому мы возьмем окно вокруг этой точки и минимизируем (по МНК) в нем суммарную погрешность с весовыми коэффициентами, распределенными по Гауссу, то есть так, чтобы наибольший вес имели пиксели, ближе всего находящиеся к исследуемому.

**Полезные материалы:**
- цикл видео-лекций от First Principles of Computer Vision, посвященный Optical Flow и алгоритму Lucas-Kanade: https://youtube.com/playlist?list=PL2zRqk16wsdoYzrWStffqBAoUY8XdvatV

### Вопрос 1

В `cv2.calcOpticalFlowPyrLK` есть параметр, отвечающий за ImagePiramyd. Зачем нужна пирамида изображений в случае вычисления оптического потока?

**Ответ:**

Используя несколько уровней разрешения, алгоритм сначала фиксирует крупные движения в более крупном масштабе, что обеспечивает робастность, а затем уточняет отслеживание движений в более мелких масштабах для повышения точности.

### Задание 1

Напишите реализацию Лукаса-Канаде c помощью numpy и cv2. Сравните с реализацией `cv2.calcOpticalFlowPyrLK`.

In [4]:
def get_derivative_x(img, keypoint, winSize):
    """Compute x-derivative at the keypoint location within the window."""
    x, y = int(keypoint[0]), int(keypoint[1])
    sobel_x = cv2.Sobel(img, cv2.CV_64F, 1, 0, ksize=5)
    return sobel_x[y - winSize[1]//2:y + winSize[1]//2 + 1, x - winSize[0]//2:x + winSize[0]//2 + 1]


def get_derivative_y(img, keypoint, winSize):
    """Compute y-derivative at the keypoint location within the window."""
    x, y = int(keypoint[0]), int(keypoint[1])
    sobel_y = cv2.Sobel(img, cv2.CV_64F, 0, 1, ksize=5)
    return sobel_y[y - winSize[1]//2:y + winSize[1]//2 + 1, x - winSize[0]//2:x + winSize[0]//2 + 1]


def get_derivative_t(prevImg, nextImg, keypoint, winSize):
    """Compute temporal derivative at the keypoint location."""
    x, y = int(keypoint[0]), int(keypoint[1])
    patch_prev = prevImg[y - winSize[1]//2:y + winSize[1]//2 + 1, x - winSize[0]//2:x + winSize[0]//2 + 1]
    patch_next = nextImg[y - winSize[1]//2:y + winSize[1]//2 + 1, x - winSize[0]//2:x + winSize[0]//2 + 1]
    return patch_next - patch_prev


def my_calcOpticalFlowPyrLK(prevImg, nextImg, prevPts, winSize):
    nextPts = []
    for keypoint in prevPts:
        keypoint = keypoint[0]
        dx = get_derivative_x(prevImg, keypoint, winSize)
        dy = get_derivative_y(prevImg, keypoint, winSize)
        dt = get_derivative_t(prevImg, nextImg, keypoint, winSize)

        # Construct A and b matrices
        A = np.vstack((dx.flatten(), dy.flatten())).T
        b = -dt.flatten()

        # Solve for p using least squares
        p, _, _, _ = np.linalg.lstsq(A, b, rcond=None)

        # Update keypoint position
        new_x = keypoint[0] + p[0]
        new_y = keypoint[1] + p[1]
        nextPts.append([new_x, new_y])

    return np.expand_dims(np.stack(nextPts), axis=1)

In [5]:
video_path='data/slow_traffic_small.mp4'
cap = cv2.VideoCapture(video_path)
length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps    = cap.get(cv2.CAP_PROP_FPS)

fourcc = cv2.VideoWriter_fourcc(*'MP4V')
out = cv2.VideoWriter('output/my_LK.mp4', fourcc, fps, (width, height))

# params for ShiTomasi corner detection
feature_params = dict(
    maxCorners = 100,
    qualityLevel = 0.3,
    minDistance = 7,
    blockSize = 7,
)
# Create some random colors
color = np.random.randint(0, 255, (100, 3))
# Take first frame and find corners in it
ret, old_frame = cap.read()
old_gray = cv2.cvtColor(old_frame, cv2.COLOR_BGR2GRAY)
p0 = cv2.goodFeaturesToTrack(old_gray, mask = None, **feature_params)

# Create a mask image for drawing purposes
mask = np.zeros_like(old_frame)
for i in tqdm(range(length)):
    ret, frame = cap.read()
    if not ret:
        print('No frames grabbed!')
        break

    frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

    # calculate optical flow
    p1 = my_calcOpticalFlowPyrLK(
        prevImg=old_gray,
        nextImg=frame_gray,
        prevPts=p0,
        winSize=(15, 15)
    )

    # draw the tracks
    for i, (new, old) in enumerate(zip(p1, p0)):
        a, b = new.ravel()
        c, d = old.ravel()
        mask = cv2.line(mask, (int(a), int(b)), (int(c), int(d)), color[i].tolist(), 2)
        frame = cv2.circle(frame, (int(a), int(b)), 5, color[i].tolist(), -1)

    img = cv2.add(frame, mask)
    # Now update the previous frame and previous points
    old_gray = frame_gray.copy()
    p0 = p1.reshape(-1, 1, 2)

    out.write(img)

cap.release()
out.release()

100%|█████████▉| 913/914 [00:39<00:00, 22.93it/s]


No frames grabbed!


In [6]:
IPython.display.Video('output/my_LK.mp4')

### Релизация OpenCV - cv2.calcOpticalFlowPyrLK

In [7]:
video_path='data/slow_traffic_small.mp4'
cap = cv2.VideoCapture(video_path)
length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps    = cap.get(cv2.CAP_PROP_FPS)

fourcc = cv2.VideoWriter_fourcc(*'MP4V')
out = cv2.VideoWriter('output/LK.mp4', fourcc, fps, (width, height))

# params for ShiTomasi corner detection
feature_params = dict(
    maxCorners = 100,
    qualityLevel = 0.3,
    minDistance = 7,
    blockSize = 7,
)

# Parameters for lucas kanade optical flow
lk_params = dict(
    winSize  = (15, 15),
    maxLevel = 2,
    criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03),
)
# Create some random colors
color = np.random.randint(0, 255, (100, 3))
# Take first frame and find corners in it
ret, old_frame = cap.read()
old_gray = cv2.cvtColor(old_frame, cv2.COLOR_BGR2GRAY)
p0 = cv2.goodFeaturesToTrack(old_gray, mask = None, **feature_params)
# Create a mask image for drawing purposes
mask = np.zeros_like(old_frame)
for i in tqdm(range(length)):

    ret, frame = cap.read()

    if not ret:
        print('No frames grabbed!')
        break

    frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

    # calculate optical flow
    # see params here https://docs.opencv.org/3.4/dc/d6b/group__video__track.html#ga473e4b886d0bcc6b65831eb88ed93323
    p1, st, _ = cv2.calcOpticalFlowPyrLK(
        prevImg=old_gray,
        nextImg=frame_gray,
        prevPts=p0,
        nextPts=None,
        **lk_params,
    )

    # Select good points where status is equal 1
    if p1 is not None:
        good_new = p1[st==1]
        good_old = p0[st==1]

    # draw the tracks
    for i, (new, old) in enumerate(zip(good_new, good_old)):
        a, b = new.ravel()
        c, d = old.ravel()
        mask = cv2.line(mask, (int(a), int(b)), (int(c), int(d)), color[i].tolist(), 2)
        frame = cv2.circle(frame, (int(a), int(b)), 5, color[i].tolist(), -1)

    img = cv2.add(frame, mask)
    # Now update the previous frame and previous points
    old_gray = frame_gray.copy()
    p0 = good_new.reshape(-1, 1, 2)

    out.write(img)

cap.release()
out.release()

100%|█████████▉| 913/914 [00:06<00:00, 150.04it/s]

No frames grabbed!





In [16]:
IPython.display.Video('output/LK.mp4')

### Вопрос 2

Какие проблемы в текущей реализации вы увидели при просмотре результирующего видео? Как их можно устранить?

**Ответ:**  
Есть проблемы, связанные с неточностью определения и отслеживания, однако основная проблема: ключевые точки фиксируются только в начальный момент времени, поэтому новые объекты не отслеживаются. Это можно исправить путем обновления целевых точек, например, когда какие-либо объекты выходят за пределы поля видимости.

### Задание 2

Напишите код, устраняющий одну из проблем, покажите результат до/после.

In [12]:
video_path='data/slow_traffic_small.mp4'
cap = cv2.VideoCapture(video_path)
length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps    = cap.get(cv2.CAP_PROP_FPS)

fourcc = cv2.VideoWriter_fourcc(*'MP4V')
out = cv2.VideoWriter('output/fixed_LK.mp4', fourcc, fps, (width, height))

feature_params = dict(
    maxCorners = 100,
    qualityLevel = 0.3,
    minDistance = 7,
    blockSize = 7,
)
lk_params = dict(
    winSize  = (15, 15),
    maxLevel = 2,
    criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03),
)
color = np.random.randint(0, 255, (100, 3))
ret, old_frame = cap.read()
old_gray = cv2.cvtColor(old_frame, cv2.COLOR_BGR2GRAY)
p0 = cv2.goodFeaturesToTrack(old_gray, mask = None, **feature_params)
mask = np.zeros_like(old_frame)

for i in tqdm(range(length)):
    ret, frame = cap.read()
    if not ret:
        print('No frames grabbed!')
        break

    frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    p1, st, _ = cv2.calcOpticalFlowPyrLK(
        prevImg=old_gray,
        nextImg=frame_gray,
        prevPts=p0,
        nextPts=None,
        **lk_params,
    )

    if p1 is not None:
        good_new = p1[st==1]
        good_old = p0[st==1]

    for i, (new, old) in enumerate(zip(good_new, good_old)):
        a, b = new.ravel()
        c, d = old.ravel()
        mask = cv2.line(mask, (int(a), int(b)), (int(c), int(d)), color[i].tolist(), 2)
        frame = cv2.circle(frame, (int(a), int(b)), 5, color[i].tolist(), -1)
    img = cv2.add(frame, mask)

    if p1 is not None and len(good_new) > 0:
        old_gray = frame_gray.copy()
        p0 = good_new.reshape(-1, 1, 2)

    # re-detect features periodically
    if len(p0) < feature_params['maxCorners'] // 2 and \
        any([(p > old_frame.shape).all() for p in p0.ravel()]):
        p0 = cv2.goodFeaturesToTrack(old_gray, mask=None, **feature_params)

    out.write(img)

cap.release()
out.release()

100%|█████████▉| 913/914 [00:11<00:00, 79.67it/s]

No frames grabbed!





In [13]:
IPython.display.Video('output/fixed_LK.mp4')

## Farneback (dense)

Метод Farneback носит несколько более глобальный характер, чем метод Лукаса-Канаде. Он опирается на предположение о том, что на всем изображении оптический поток будет достаточно гладким.

In [14]:
cap = cv2.VideoCapture(video_path)

length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps    = cap.get(cv2.CAP_PROP_FPS)

fourcc = cv2.VideoWriter_fourcc(*'MP4V')
out = cv2.VideoWriter('output/Farneback.mp4', fourcc, fps, (width, height))

ret, frame1 = cap.read()
prvs = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)
hsv = np.zeros_like(frame1)
hsv[..., 1] = 255

for i in tqdm(range(length)):

    ret, frame2 = cap.read()

    if not ret:
        print('No frames grabbed!')
        break

    next = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)

    # see arguments here https://docs.opencv.org/3.4/dc/d6b/group__video__track.html#ga5d10ebbd59fe09c5f650289ec0ece5af
    flow = cv2.calcOpticalFlowFarneback(prvs, next, None, 0.5, 3, 15, 3, 5, 1.2, 0)
    mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
    hsv[..., 0] = ang*180/np.pi/2
    hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)
    bgr = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
    prvs = next

    out.write(bgr)

cap.release()
out.release()

100%|█████████▉| 913/914 [01:36<00:00,  9.45it/s]

No frames grabbed!





Посмотрим, что получилось

In [15]:
IPython.display.Video('output/Farneback.mp4')