In [1]:
import cv2
import numpy as np
import matplotlib.pyplot as plt

# Optical Flow
Optical flow is the pattern of apparent motion of image objects between two consecutive frames caused by the movemement of object or camera. It is 2D vector field where each vector is a displacement vector showing the movement of points from first frame to second.
<img src="../resources/optic_flow.jpg">

<b><u>Applications</u></b>
- Structure from Motion / Motion Estimation
- Video Compression
- Video Stabilisation

<b><u>Assumptions</u></b>
- The pixel intensities of an object do not change between consecutive frames.
- Neighbouring pixels have similar motion.

Consider a pixel I(x, y, t) in first frame, with displacement of (dx, dy) in next frame after time dt. Since pixels are same and intensity doesn't change,
$$I(x, y, t) = I(x + dx, y + dy, t + dt)$$
$$u = \frac{dx}{dt}, v = \frac{dy}{dt}$$
Applying taylor expansion,
$$\frac{\delta f}{\delta x}u + \frac{\delta f}{\delta y}v + \frac{\delta f}{\delta t} = 0$$

The above equation is solved for unknown (u, v) by Lucas-Kanade Method.

Assuming all neighbouring pixels will have similar motion, take a 3x3 patch around the considered pixel. Now applying the same equation to all the 9 pixels, a least-square fit solution is obtained.
$$\begin{bmatrix}u \\ v\end{bmatrix} = - \sum\begin{bmatrix}f_{x_{i}}^{2} & f_{x_{i}}f_{y_{i}} \\ f_{x_{i}}f_{y_{i}} & f_{y_{i}}^{2}\end{bmatrix}^{-1} \sum\begin{bmatrix}f_{x_{i}}f_{t_{i}} \\ f_{y_{i}}f_{t_{i}}\end{bmatrix}$$

This is applicable to only small motions. For large motions, we go for pyramid approach. Going up the pyramid removes small motions and transforms large motions to small motions, where we can apply Lucas-Kanade method and get the optic flow with scale.

We'll use two methods
- cv2.calcOpticalFlowPyrLK(prevImg, nextImg, prevPts, nextPts[, status[, err[, winSize[, maxLevel[, criteria[, flags[, minEigThreshold]]]]]]]) --> nextPts, status, err
- cv2.goodFeaturesToTrack(image, maxCorners, qualityLevel, minDistance[, corners[, mask[, blockSize[, useHarrisDetector[, k]]]]]) --> corners

In [2]:
import io
import base64
from IPython.display import HTML

cap = cv2.VideoCapture("../resources/football.avi")

# for corner detection
feature_params = dict(maxCorners=100, qualityLevel=0.3, minDistance=7, blockSize=7)
# for optical flow
lk_params = dict(winSize=(15, 15), maxLevel=2, criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

color = np.random.randint(0, 255, (100, 3))
prev_frame = cap.read()[1]
prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)

writer = cv2.VideoWriter(
    '../resources/flow.mp4',
    cv2.VideoWriter_fourcc('H', '2', '6', '4'),
    10,  # fps
    prev_frame.shape[:2][::-1],
    True  # colored
)
writer.write(prev_frame)

features = cv2.goodFeaturesToTrack(prev_gray, **feature_params)
mask = np.zeros_like(prev_frame)
batch = 0

while True:
    frame = cap.read()[1]
    if frame is None: break
    
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    pts, status, err = cv2.calcOpticalFlowPyrLK(prev_gray, gray, features, None, **lk_params)
    
    good_new = pts[status == 1]
    good_old = features[status == 1]
    
    for idx, (new, old) in enumerate(zip(good_new, good_old)):
        x1, y1 = new.ravel()
        x2, y2 = old.ravel()
        mask = cv2.line(mask, (x1, y1), (x2, y2), color[idx].tolist(), 2)
        frame = cv2.circle(frame, (x1, y1), 5, color[idx].tolist(), -1)
    
    out = cv2.add(frame, mask)
    writer.write(out)
    prev_gray = gray.copy()
    features = good_new.reshape(-1, 1, 2)
    batch += 1
    if batch % 30 == 0:  # clear previous flow and create new to avoid mess in output
        mask = mask = np.zeros_like(prev_frame)
    
cap.release()
writer.release()

video = io.open('../resources/flow.mp4', 'rb').read()
encoded = base64.b64encode(video)
HTML(
    '''
    <video alt="test" controls>
        <source src="data:video/mp4;base64,{0}" type="video/mp4" />
    </video>
    '''.format(encoded.decode('ascii'))
)

# Dense Optical Flow
Lucas-Kanade method works only for a sparse set (in above example, corners / features detected). Farneback's algorithm is used to compute optical flow for all points in a frame.

We get a 2-channel array with optical flow vectors, (u,v). We find their magnitude and direction. We color code the result for better visualization. Direction corresponds to Hue value of the image. Magnitude corresponds to Value plane.

Consider translation motion d encountered at pixel(x, y) in image.
$$f_{1}(x) = x^{T}A_{1}x + B_{1}^{T}x + C_{1}$$
$$f_{2}(x) = f_{1}(x - d) = x^{T}A_{1}x + (B_{1} - 2A_{1}d)^{T}x + d^{T}A_{1}d - B_{1}^{T}d + C_{1}$$
$$f_{2}(x) = x^{T}A_{2}x + B_{2}^{T}x + C_{2}$$
Assuming intensity constancy,
$$A_{2} = A_{1}, B_{2} = B_{1} - 2A_{1}d, C_{2} = d^{T}A_{1}d - B_{1}^{T}d + C_{1}$$
Assuming A<sub>1</sub> to be non-singular,
$$d = -\frac{1}{2}A_{1}^{-1}(B_{2} - B_{1})$$

Suppose we have an estimate of displacement l, we extract ROI about neigbourhood at P<sub>1</sub>(x, y) and P<sub>2</sub>(x + dx, y + dy). The total displacement d can be calculated as
$$B_{2} = B_{1} - 2A_{1}l$$, where B<sub>1</sub>, A<sub>1</sub> and l are known
$$d = -\frac{1}{2}A_{1}^{-1}(B_{2} - B_{1})$$

Thus, an iterative solution is developed, where every successive iteration produces better estimate of displacement vector, and can be maintained by a threshold_displacement.

Method Used - cv2.calcOpticalFlowFarneback(prev, next, flow, pyr_scale, levels, winsize, iterations, poly_n, poly_sigma, flags) -> flow

In [3]:
cap = cv2.VideoCapture("../resources/football.avi")

prev_frame = cap.read()[1]
prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)

hsv = np.zeros_like(prev_frame)
hsv[...,1] = 255

flow_params = dict(pyr_scale=0.5, levels=3, winsize=15, iterations=3, poly_n=5, poly_sigma=1.2, flags=0)

writer = cv2.VideoWriter(
    "../resources/dense_flow.mp4",
    cv2.VideoWriter_fourcc('H', '2', '6', '4'),
    10,
    prev_frame.shape[:2][::-1],
    True
)
writer.write(prev_frame)

writer_hsv = cv2.VideoWriter(
    "../resources/hsv_flow.mp4",
    cv2.VideoWriter_fourcc('H', '2', '6', '4'),
    10,
    prev_frame.shape[:2][::-1],
    True
)

mask = np.zeros_like(prev_frame)
batch = 0

while True:
    frame = cap.read()[1]
    if frame is None: break
    
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None, **flow_params)
    prev_gray = gray.copy()
    
    mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
    hsv[..., 0] = ang * 180 / np.pi / 2
    hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)
    out = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
    writer_hsv.write(out)
    
    h, w = frame.shape[:2]
    y, x = np.mgrid[8:h:16, 8:w:16].reshape(2, -1).astype(int)
    fx, fy = flow[y, x].T
    
    lines = np.vstack([x, y, x + fx, y + fy]).T.reshape(-1, 2, 2)
    lines = np.int32(lines + 0.5)
    
    cv2.polylines(mask, lines, 0, (0, 255, 0))
    
    out = cv2.add(frame, mask)
    writer.write(out)
    
    batch += 1
    if batch % 30 == 0:  # clear previous flow and create new to avoid mess in output
        mask = mask = np.zeros_like(prev_frame)
    
cap.release()
writer.release()
writer_hsv.release()

video1 = io.open('../resources/dense_flow.mp4', 'rb').read()
encoded1 = base64.b64encode(video1)
video2 = io.open('../resources/hsv_flow.mp4', 'rb').read()
encoded2 = base64.b64encode(video2)

data1 = '''
    <video alt="test" controls>
        <source src="data:video/mp4;base64,{0}" type="video/mp4" />
    </video>
    '''.format(encoded1.decode('ascii'))
data2 = '''
    <video alt="test" controls>
        <source src="data:video/mp4;base64,{0}" type="video/mp4" />
    </video>
    '''.format(encoded2.decode('ascii'))

HTML(data1 + data2)