# Семинар 9 - Методы построения оптического потока по последовательности изображений

***

Источник - https://habr.com/ru/post/201406/

$\textbf{Task statement}$: Оптический поток (ОП) – изображение видимого движения, представляющее собой сдвиг каждой точки (пикселя) между двумя изображениями.

По сути, он представляет собой поле скоростей. Суть ОП в том, что для каждой точки изображения $I_{t_0} (\vec{r})$ находится такой вектор сдвига $\delta \vec{r}$, чтобы было соответсвие между исходной точкой и точкой на следущем фрейме $I_{t_1} (\vec{r} + \delta \vec{r})$. В качестве метрики соответвия берут близость интенсивности пикселей, беря во внимание маленькую разницу по времени между кадрами: $\delta{t} = t_{1} - t_{0}$. В более точных методах точку можно привязывать к объекту на основе, например, выделения ключевых точек, а также считать градиенты вокруг точки, лапласианы и проч.

$\textbf{For what}$: Определение собственной скорости, Определение локализации, Улучшение методов трекинга объектов, сегментации, Детектирование событий, Сжатие видеопотока и проч.

![](data/tennis.png)

Разделяют 2 вида оптического потока - плотный (dense) [Farneback method, neural nets], работающий с целым изображением, и выборочный (sparse) [Lucas-Kanade method], работающий с ключевыми точками

In [None]:
# !wget https://www.bogotobogo.com/python/OpenCV_Python/images/mean_shift_tracking/slow_traffic_small.mp4 -O data/slow_traffic_small.mp4

In [2]:
import cv2
from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt
import IPython

%matplotlib inline

## Lucas-Kanade (sparse)

Пусть $I_{1} = I(x, y, t_{1})$ интенсивность в некоторой точке (x, y) на первом изображении (т. е. в момент времени t). На втором изображении эта точка сдвинулась на (dx, dy), при этом прошло время dt, тогда $I_{2} = I(x + dx, y + dx, t_{1} + dt) \approx I_{1} + I_{x}dx + I_{y}dy +  I_{t}dt$. Из постановки задачи следует, что интенсивность пикселя не изменилась, тогда $I_{1} = I_{2}$. Далее определяем $dx, dy$.

Самое простое решение проблемы – алгоритм Лукаса-Канаде. У нас же на изображении объекты размером больше 1 пикселя, значит, скорее всего, в окрестности текущей точки у других точек будут примерно такие же сдвиги. Поэтому мы возьмем окно вокруг этой точки и минимизируем (по МНК) в нем суммарную погрешность с весовыми коэффициентами, распределенными по Гауссу, то есть так, чтобы наибольший вес имели пиксели, ближе всего находящиеся к исследуемому.

**Полезные материалы:**
- цикл видео-лекций от First Principles of Computer Vision, посвященный Optical Flow и алгоритму Lucas-Kanade: https://youtube.com/playlist?list=PL2zRqk16wsdoYzrWStffqBAoUY8XdvatV

### Вопрос 1

В `cv2.calcOpticalFlowPyrLK` есть параметр, отвечающий за ImagePiramyd. Зачем нужна пирамида изображений в случае вычисления оптического потока?

**Ответ:** Для улучшения точности и устойчивости алгоритма, особенно при наличии больших перемещений объектов между кадрами. Это основано на создании нескольких уровней изображений, где каждый следующий уровень представляет собой уменьшенную версию предыдущего. Обработка начинается с самого низкого (самого мелкого) уровня пирамиды, где обнаружение больших перемещений происходит проще из-за уменьшенного размера изображения. Затем результаты используются как начальные приближения для следующего уровня, что позволяет постепенно уточнять расчёты на более детальных уровнях пирамиды. Поэтому пирамида изображений помогает эффективно справляться с различными масштабами движений и повышает общую надёжность определения оптического потока.

### Задание 1

Напишите реализацию Лукаса-Канаде c помощью numpy и cv2. Сравните с реализацией `cv2.calcOpticalFlowPyrLK`.

In [None]:
def get_derivative_x(
    prevImg,
    keypoint,
    winSize,
) -> np.array:
    pass


# arguments like in cv2 lib
def my_calcOpticalFlowPyrLK(
    prevImg,
    nextImg,
    prevPts,
    nextPts, #None is our case
    winSize,
    #maxLevel, if you want to be an expert in CV,
    #uncomment it and apply in LK method :)
) -> np.array:

    '''
    You should return output vector of 2D points
    (with single-precision floating-point coordinates)
    containing the calculated new positions of input features in the second image
    '''
    nextPts = []

    for keypoint in prevPts:

        derivative_x = get_derivative_x(prevImg, keypoint, winSize)
        derivative_y = get_derivative_y(prevImg, keypoint, winSize)
        derivative_t = get_derivative_t(prevImg, keypoint, winSize)

        # find a matrix and solve linear equation system
        pass

        # find result coordinates
        nextPts.append()

    return np.expand_dims(np.stack(nextPts), axis=1)

In [9]:
def get_derivative_x(prevImg,
                     keypoint,
                     winSize):
    sobelx = cv2.Sobel(prevImg, cv2.CV_64F, 1, 0, ksize=5)
    x, y = int(keypoint[0][0]), int(keypoint[0][1])
    half_win = winSize[0] // 2
    # Извлекаем окно вокруг точки интереса
    derivative_x = sobelx[y-half_win:y+half_win+1, x-half_win:x+half_win+1]
    return derivative_x

def get_derivative_y(prevImg,
                     keypoint,
                     winSize):
    sobely = cv2.Sobel(prevImg, cv2.CV_64F, 0, 1, ksize=5)
    x, y = int(keypoint[0][0]), int(keypoint[0][1])
    half_win = winSize[0] // 2
    derivative_y = sobely[y-half_win:y+half_win+1, x-half_win:x+half_win+1]
    return derivative_y

def get_derivative_t(prevImg, nextImg,
                     keypoint, winSize):
    x, y = int(keypoint[0][0]), int(keypoint[0][1])
    half_win = winSize[0] // 2
    prev_patch = prevImg[y-half_win:y+half_win+1, x-half_win:x+half_win+1]
    next_patch = nextImg[y-half_win:y+half_win+1, x-half_win:x+half_win+1]
    derivative_t = next_patch - prev_patch
    return derivative_t

def my_calcOpticalFlowPyrLK(prevImg, nextImg, prevPts, nextPts, winSize):
    nextPts = []
    for keypoint in prevPts:
        derivative_x = get_derivative_x(prevImg, keypoint, winSize)
        derivative_y = get_derivative_y(prevImg, keypoint, winSize)
        derivative_t = get_derivative_t(prevImg, nextImg, keypoint, winSize)

        A = np.vstack((derivative_x.flatten(), derivative_y.flatten())).T
        b = -derivative_t.flatten()

        nu = np.linalg.lstsq(A, b, rcond=None)[0]
        dx, dy = nu

        new_x = keypoint[0][0] + dx
        new_y = keypoint[0][1] + dy
        nextPts.append([new_x, new_y])

    return np.expand_dims(np.array(nextPts), axis=1)

### Релизация OpenCV - cv2.calcOpticalFlowPyrLK

In [7]:
video_path = '/content/data/slow_traffic_small.mp4'

cap = cv2.VideoCapture(video_path)

length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps    = cap.get(cv2.CAP_PROP_FPS)

fourcc = cv2.VideoWriter_fourcc(*'MP4V')
out = cv2.VideoWriter('output_LK.mp4', fourcc, fps, (width, height))

# params for ShiTomasi corner detection
feature_params = dict(
    maxCorners = 100,
    qualityLevel = 0.3,
    minDistance = 7,
    blockSize = 7,
)

# Parameters for lucas kanade optical flow
lk_params = dict(
    #window size
    winSize  = (15, 15),
    #image piramid
    maxLevel = 2,
    #after the specified maximum number of iterations criteria.maxCount
    #or when the search window moves by less than criteria.epsilon.
    criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03),
)
# Create some random colors
color = np.random.randint(0, 255, (100, 3))
# Take first frame and find corners in it
ret, old_frame = cap.read()
old_gray = cv2.cvtColor(old_frame, cv2.COLOR_BGR2GRAY)
p0 = cv2.goodFeaturesToTrack(old_gray, mask = None, **feature_params)
# Create a mask image for drawing purposes
mask = np.zeros_like(old_frame)
for i in tqdm(range(1, length)):

    ret, frame = cap.read()

    if not ret:
        print('No frames grabbed!')
        break

    frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

    # calculate optical flow
    # see params here https://docs.opencv.org/3.4/dc/d6b/group__video__track.html#ga473e4b886d0bcc6b65831eb88ed93323
    p1, st, err = cv2.calcOpticalFlowPyrLK(
        prevImg=old_gray,
        nextImg=frame_gray,
        prevPts=p0,
        nextPts=None,
        **lk_params,
    )

    # Select good points where status is equal 1
    if p1 is not None:
        good_new = p1[st==1]
        good_old = p0[st==1]

    # draw the tracks
    for i, (new, old) in enumerate(zip(good_new, good_old)):
        a, b = new.ravel()
        c, d = old.ravel()
        mask = cv2.line(mask, (int(a), int(b)), (int(c), int(d)), color[i].tolist(), 2)
        frame = cv2.circle(frame, (int(a), int(b)), 5, color[i].tolist(), -1)

    img = cv2.add(frame, mask)
    # Now update the previous frame and previous points
    old_gray = frame_gray.copy()
    p0 = good_new.reshape(-1, 1, 2)

    out.write(img)

cap.release()
out.release()

100%|██████████| 913/913 [00:10<00:00, 86.97it/s] 


In [8]:
IPython.display.Video('output_LK.mp4')

### Вопрос 2

Какие проблемы в текущей реализации вы увидели при просмотре результирующего видео? Как их можно устранить?

**Ответ:** Треки начинают формироваться с самого начала видео, но нет критерия остановки отслеживания. К тому же треккинг для объектов, появившихся в середине видео, не происходит.  
Возможно решение - ограничения жизни треков по времени/динамике/достижению конца изображения и/или можно запускать параллельно алгоритм с каким-то временным сдвигом для каждого из них и затем агрегировать треки на итоговом изображении, применяя какой-то фильтр aka NMS  

### Задание 2

Напишите код, устраняющий одну из проблем, покажите результат до/после.

In [25]:
def process_video_segment(start_frame, end_frame, cap, feature_params, lk_params, color, out):
    cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)

    # Initialize the first frame of the segment
    ret, old_frame = cap.read()
    old_gray = cv2.cvtColor(old_frame, cv2.COLOR_BGR2GRAY)
    p0 = cv2.goodFeaturesToTrack(old_gray, mask=None, **feature_params)
    mask = np.zeros_like(old_frame)

    for i in range(start_frame, end_frame):
        ret, frame = cap.read()
        if not ret:
            break

        frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        p1, st, err = cv2.calcOpticalFlowPyrLK(old_gray, frame_gray, p0, None, **lk_params)

        if p1 is not None:
            good_new = p1[st==1]
            good_old = p0[st==1]

            for i, (new, old) in enumerate(zip(good_new, good_old)):
                a, b = new.ravel()
                c, d = old.ravel()
                mask = cv2.line(mask, (int(a), int(b)), (int(c), int(d)), color[i].tolist(), 2)
                frame = cv2.circle(frame, (int(a), int(b)), 5, color[i].tolist(), -1)

            img = cv2.add(frame, mask)
            old_gray = frame_gray.copy()
            p0 = good_new.reshape(-1, 1, 2)
        else:
            mask = np.zeros_like(old_frame)

        out.write(img)

In [26]:
video_path = '/content/data/slow_traffic_small.mp4'
cap = cv2.VideoCapture(video_path)

fps = cap.get(cv2.CAP_PROP_FPS)
n_frames_per_segment = int(fps * 4)

fourcc = cv2.VideoWriter_fourcc(*'MP4V')
out = cv2.VideoWriter('output_LK_new.mp4', fourcc, fps, (width, height))

total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
segments = [(start, min(start + n_frames_per_segment, total_frames)) for start in range(0, total_frames, n_frames_per_segment)]

for start_frame, end_frame in segments:
    process_video_segment(start_frame, end_frame, cap, feature_params, lk_params, color, out)

cap.release()
out.release()

Реализован параллельный запуск алгоримта каждые 4 секунды видео, что позволяет избавиться от проблем

## Farneback (dense)

Метод Farneback носит несколько более глобальный характер, чем метод Лукаса-Канаде. Он опирается на предположение о том, что на всем изображении оптический поток будет достаточно гладким.

In [None]:
cap = cv2.VideoCapture(video_path)

length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps    = cap.get(cv2.CAP_PROP_FPS)

fourcc = cv2.VideoWriter_fourcc(*'MP4V')
out = cv2.VideoWriter('output_Farneback.mp4', fourcc, fps, (width, height))

ret, frame1 = cap.read()
prvs = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)
hsv = np.zeros_like(frame1)
hsv[..., 1] = 255

for i in tqdm(range(length)):

    ret, frame2 = cap.read()

    if not ret:
        print('No frames grabbed!')
        break

    next = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)

    #see arguments here https://docs.opencv.org/3.4/dc/d6b/group__video__track.html#ga5d10ebbd59fe09c5f650289ec0ece5af
    flow = cv2.calcOpticalFlowFarneback(prvs, next, None, 0.5, 3, 15, 3, 5, 1.2, 0)
    mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
    hsv[..., 0] = ang*180/np.pi/2
    hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)
    bgr = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
    prvs = next

    out.write(bgr)

cap.release()
out.release()

Посмотрим, что получилось

In [14]:
IPython.display.Video('output_Farneback.mp4')