In [1]:
import cv2
import matplotlib.pyplot as plt
import numpy as np

## **1. Optical Flow**

Optical flow is the pattern of apparent motion of image objects between two consecutive frames caused by the movement of object or camera. It is 2D vector field where each vector is a displacement vector showing the movement of points from first frame to second.

Optical flow works on several assumptions:
- The pixel intensities of an object do not change between consecutive frames.
- Neighbouring pixels have similar motion.

In [32]:
from collections import defaultdict, deque
from functools import partial

corner_params = dict(
    maxCorners=10,  # Number of corners to return
    qualityLevel=0.3,  # Minimal accepted quality of image corners
    minDistance=7,  # Minimum possible Euclidean distance between the returned corners
    blockSize=7,  # Size of an average block for computing a derivative covariation matrix over each pixel neighborhood
)
lk_params = dict(
    winSize=(15, 15),  # Size of the search window at each pyramid level
    maxLevel=2,  # 0-based maximal pyramid level number
    criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03),  # Termination criteria
)

cap = cv2.VideoCapture(0)
ret, prev_frame = cap.read()

prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
prev_pts = cv2.goodFeaturesToTrack(prev_gray, **corner_params)  # type: ignore

tracks = defaultdict(partial(deque, maxlen=100))

while True:
    ret, curr_frame = cap.read()
    if not ret:
        break

    curr_gray = cv2.cvtColor(curr_frame, cv2.COLOR_BGR2GRAY)
    curr_pts, status, err = cv2.calcOpticalFlowPyrLK(
        prev_gray, curr_gray, prev_pts, None, **lk_params
    )
    curr_good = curr_pts[status == 1]
    prev_good = prev_pts[status == 1]

    for k, (curr, prev) in enumerate(zip(curr_good, prev_good)):
        x0, y0 = prev.ravel().astype(np.int32)
        x1, y1 = curr.ravel().astype(np.int32)
        tracks[k].append((x1, y1))
        pts = np.asarray(tracks[k], dtype=np.int32).reshape(-1, 1, 2)
        cv2.polylines(curr_frame, [pts], isClosed=False, color=(0, 255, 0), thickness=2)
        cv2.circle(curr_frame, (x1, y1), 5, (255, 0, 0), -1)

    prev_gray = curr_gray.copy()
    prev_pts = curr_good.reshape(-1, 1, 2)

    cv2.imshow("Optical Flow", curr_frame)
    if cv2.waitKey(1) == ord("q"):
        break


cap.release()
cv2.destroyAllWindows()