# Classical Optical Flow Estimation

## Lucas-Kanade Optical Flow (IJCAI 1981)

<img src="https://drive.google.com/uc?id=1IwXipsiw_wO5L7b247O9pdD3CMeOx0_9" height = 150 width=390>

**Citation**
```
@inproceedings{lucas1981iterative,
  title={An iterative image registration technique with an application to stereo vision},
  author={Lucas, Bruce D and Kanade, Takeo},
  booktitle={IJCAI'81: 7th international joint conference on Artificial intelligence},
  volume={2},
  pages={674--679},
  year={1981}
}
```

In [1]:
from typing import *
import numpy as np
from scipy import signal
from PIL import Image
import cv2
import matplotlib.pyplot as plt
import torch
import math

In [2]:
def flow_to_arrow(img: np.ndarray, flow: np.ndarray, scale=20, color=(255, 0, 0), step=25):
    _, h, w = flow.shape
    img_with_flow = img.copy()
    for y in range(0, h, step):
        for x in range(0,w,step):
            dx, dy = flow[:, y, x]
            if abs(dx) > 0. or abs(dy) > 0.:
                cv2.arrowedLine(
                    img_with_flow, (x,y), (int(x + dx*scale), int(y + dy*scale)), color, 2)

    return img_with_flow

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## The LK Algorithm

<img src="https://drive.google.com/uc?id=1SAzDo6bzjWFdwfTF0VV7UyCqmZysFtb1" height = 300 width=600>

### Good features


<img src="https://drive.google.com/uc?id=1M9ITDLhJ3wJ2PebZCbKriRWuSpzOrHIx" height = 25 width=550>

In [4]:
def optical_flow(I1g: np.ndarray, I2g: np.ndarray, window_size: int, tau:float =0.1, alpha:float = 0.1) \
    -> Tuple[np.ndarray, np.ndarray]:

    kernel_x = np.array([[-1., 1.], [-1., 1.]])
    kernel_y = np.array([[-1., -1.], [1., 1.]])
    kernel_t = np.array([[1., 1.], [1., 1.]])
    w = window_size//2
    I1g = I1g / 255.
    I2g = I2g / 255.

    mode = 'same'
    fx = signal.convolve2d(I1g, kernel_x, mode=mode)
    fy = signal.convolve2d(I1g, kernel_y, mode=mode)
    ft = signal.convolve2d(I2g, kernel_t, mode=mode) \
         + signal.convolve2d(I1g, -kernel_t, mode=mode)
    u = np.zeros(I1g.shape)
    v = np.zeros(I1g.shape)
    valid_pred = np.ones_like(u)

    for i in range(w, I1g.shape[0]-w):
        for j in range(w, I1g.shape[1]-w):
            Ix = fx[i-w:i+w+1, j-w:j+w+1].flatten()
            Iy = fy[i-w:i+w+1, j-w:j+w+1].flatten()
            It = ft[i-w:i+w+1, j-w:j+w+1].flatten()
            b = It
            A = np.concatenate([Ix[:,None], Iy[:,None]], axis=1)
            ATA = A.T @ A
            trace = np.trace(ATA)
            det = np.linalg.det(ATA)
            R = det - alpha * trace**2
            if R < tau:
              valid_pred[i,j]=0
              continue
            d = np.linalg.inv(ATA) @ A.T @ b
            u[i,j]=d[0]
            v[i,j]=d[1]
    return np.concatenate([u[None, ...],v[None, ...]], axis=0), valid_pred

In [5]:
# configurate
start_idx, end_idx = 13, 17
window_size = 59 # try different window sizes.
tau = window_size*math.sqrt(2)*10
img1_filefmt: str = "/content/drive/MyDrive/AIExpert_MotionTracking_Opticalflow/dataset/KittiFlow/training/image_2/%06d_10.png"
img2_filefmt: str = "/content/drive/MyDrive/AIExpert_MotionTracking_Opticalflow/dataset/KittiFlow/training/image_2/%06d_11.png"
gt_filefmt: str = "/content/drive/MyDrive/AIExpert_MotionTracking_Opticalflow/dataset/KittiFlow/training/flow_occ/%06d_10.png"

## Data

### KITTI Flow 2015

Real dynamic scenes of 200 image pairs in a driving scenario

<img src="https://drive.google.com/uc?id=1kPICEzjM8gmYOxU4wDv2qlKDQFtMEEsq" height = 150 width=430>
<img src="https://drive.google.com/uc?id=1pGLDKXWy4xtEAmEQouivjqrPo0QTplhs" height = 150 width=430>

Dataset website: https://www.cvlibs.net/datasets/kitti/eval_scene_flow.php?benchmark=flow

In [6]:
# load images
img1, img2 = [], []
img1_vis, img2_vis = [], []
gts, valids = [], []
for fidx in range(start_idx, end_idx+1):
    im1 = np.asarray(Image.open(img1_filefmt % fidx))
    im2 =  np.asarray(Image.open(img2_filefmt % fidx))
    img1_vis.append(im1)
    img2_vis.append(im2)

    im1 = cv2.cvtColor(im1, cv2.COLOR_RGB2GRAY)
    im2 = cv2.cvtColor(im2, cv2.COLOR_RGB2GRAY)
    img1.append(im1)
    img2.append(im2)

    gt = cv2.imread(gt_filefmt % fidx, cv2.IMREAD_ANYDEPTH|cv2.IMREAD_COLOR)
    gt = gt[...,::-1].astype(np.float32)
    gt, valid = gt[...,:2], gt[...,2]
    gt = (gt - 2**15) / 64.0
    gts.append(gt.transpose(2,0,1))
    valids.append(valid)

assert len(img1) == len(img2)

In [7]:
# run the Lucas-Kanade algorithm
results = []
flow_imgs = []
valid_preds = []
i=0
while i < len(img1):
    print(f"Start {i}")
    pred_flow, valid_pred = optical_flow(img1[i], img2[i], window_size=window_size, tau=tau)
    print(f"Finished {i}")
    results.append(pred_flow)
    valid_preds.append(valid_pred)

    flow_img = flow_to_arrow(img1_vis[i], pred_flow)
    flow_imgs.append(flow_img)

    flow_img = flow_to_arrow(img2_vis[i], pred_flow)
    flow_imgs.append(flow_img)

    i += 1

Start 0
Finished 0
Start 1
Finished 1
Start 2
Finished 2
Start 3
Finished 3
Start 4
Finished 4


## Evaluation

In [8]:
# evaluate
result_flows = np.stack(results)
ground_truths = np.stack(gts)
valid_masks = np.stack(valids)
valid_preds = np.stack(valid_preds)
valid_masks = valid_masks >= 0.5
valid_preds = valid_preds >= 0.5
epe = np.sqrt(np.sum((result_flows - ground_truths)**2,axis=1,keepdims=False))
epe = epe[np.logical_and(valid_masks,valid_preds)]
print(f"End point error: {np.mean(epe):.2f}, Image size: {results[0].shape[1]} x {results[0].shape[2]}")

End point error: 9.64, Image size: 375 x 1242


## Results

In [9]:
# visualize
import imageio

imageio.mimwrite('drive/MyDrive/output.gif', flow_imgs)
for i, img in enumerate(flow_imgs):
    img =Image.fromarray(img)
    img.save(f"drive/MyDrive/output_{i//2}_{i%2}.png")

In [10]:
i=0
while i < len(img1):
    print(f"Start {i}")
    pt0 = cv2.goodFeaturesToTrack(img1[i], maxCorners=1000, qualityLevel=0.3, minDistance=7, blockSize=7)
    pt1, st, err = cv2.calcOpticalFlowPyrLK(img1[i], img2[i], pt0, None)
    good_prev = pt0[st==1]
    good_next = pt1[st==1]
    flow = np.zeros((2,img1[0].shape[0], img1[0].shape[1]))
    for j, (new, old) in enumerate(zip(good_next, good_prev)):
        a, b = new.ravel()
        c, d = old.ravel()
        u, v = (a-c), (b-d)
        c, d = int(c), int(d)
        flow[0,d,c] = u
        flow[1,d,c] = v
    vis = flow_to_arrow(img1_vis[i],flow,step=1, scale=1)
    vis2 = flow_to_arrow(img2_vis[i],flow,step=1, scale=1)
    vis, vis2 = Image.fromarray(vis), Image.fromarray(vis2)
    vis.save(f'drive/MyDrive/opencv_{i}_0.png')
    vis2.save(f'drive/MyDrive/opencv_{i}_1.png')
    i+=1

Start 0
Start 1
Start 2
Start 3
Start 4
