## DVPA Assignment 1 : Video Prediction using Optical Flow
### Mohit Kumar 
### SR No. : 04-01-03-10-51-21-1-19825
### MTech Artificial Intelligence

In [None]:
import cv2
import numpy as np
import scipy.ndimage
import matplotlib.pyplot as plt
import glob
import regex as re
import scipy.ndimage
import scipy.signal
import os
from skimage.metrics import structural_similarity as ssim
from matplotlib.pyplot import figure, draw, pause, gca
from pathlib import Path

In [None]:
# This function is used for visualizing the estimated optical flow 
def plot_opt_flow(u, v, Inew, scale: int = 3, quivstep: int = 5, fn: Path = None):
    """
    makes quiver
    """

    ax = figure().gca()
    ax.imshow(Inew, cmap="gray", origin="lower")
    # plt.scatter(POI[:,0,1],POI[:,0,0])
    for i in range(0, u.shape[0], quivstep):
        for j in range(0, v.shape[1], quivstep):
            ax.arrow(
                j,
                i,
                v[i, j] * scale,
                u[i, j] * scale,
                color="red",
                head_width=0.5,
                head_length=1,
            )

        # plt.arrow(POI[:,0,0],POI[:,0,1],0,-5)
    if fn:
        ax.set_title(fn)

    draw()
    pause(0.01)

In [None]:
# This function computes the mean square error between two images
def mse(imageA, imageB):

    err = np.sum((np.array(imageA, dtype=np.float32) - np.array(imageB, dtype=np.float32)) ** 2)
    err /= float(imageA.shape[0] * imageA.shape[1])

    return err

# This function compares two images in terms of mse and ssim and plots them
def compare_images(imageA, imageB):

    m = mse(imageA, imageB)
    s = ssim(imageA, imageB)

    fig = plt.figure()
    ax = fig.add_subplot(1, 2, 1)
    plt.imshow(imageA, cmap = plt.cm.gray)
    plt.axis("off")
    plt.title(f'MSE: {m:.2f}')
    ax = fig.add_subplot(1, 2, 2)
    plt.imshow(imageB, cmap = plt.cm.gray)
    plt.axis("off")
    plt.title(f"SSIM: {s:.2f}")
    plt.show()

## Discrete Horn Schunck Optical Flow Algorithm

[Code taken from : View source](https://github.com/vineeths96/Video-Interpolation-using-Optical-Flow)


In [None]:
def compute_gradients(firstImage, secondImage):
    """
    Compute gradients in x, y and t direction between images
    :param firstImage: First image
    :param secondImage: Second image
    :return: Gradients
    """

    firstImage = firstImage / 255
    secondImage = secondImage / 255

    # Kernels for finding gradients Ix, Iy, It
    kernel_x = np.array([[-1, 1]])
    kernel_y = np.array([[-1], [1]])
    kernel_t = np.array([[1]])

    Ix = scipy.ndimage.convolve(input=firstImage, weights=kernel_x, mode="nearest")
    Iy = scipy.ndimage.convolve(input=firstImage, weights=kernel_y, mode="nearest")
    It = scipy.ndimage.convolve(input=secondImage, weights=kernel_t, mode="nearest") + scipy.ndimage.convolve(
        input=firstImage, weights=-kernel_t, mode="nearest"
    )

    I = [Ix, Iy, It]

    return I


def horn_schunck(firstImage, secondImage, lambada):
    """
    Horn Schunk Optical flow estimation between firstImage and secondImage
    :param firstImage: First image
    :param secondImage: Second Image
    :param lambada: Regularization parameter
    :return: Optical flow, Gradients
    """

    u = np.zeros([firstImage.shape[0], firstImage.shape[1]])
    v = np.zeros([firstImage.shape[0], firstImage.shape[1]])

    [Ix, Iy, It] = compute_gradients(firstImage, secondImage)

    # Optical flow averaging kernel
    kernel = np.array([[0, 1 / 4, 0], [1 / 4, 0, 1 / 4], [0, 1 / 4, 0]], dtype=np.float32)

    for _ in range(500):
        u_avg = scipy.ndimage.convolve(input=u, weights=kernel, mode="nearest")
        v_avg = scipy.ndimage.convolve(input=v, weights=kernel, mode="nearest")

        grad = (Ix * u_avg + Iy * v_avg + It) / (lambada ** 2 + Ix ** 2 + Iy ** 2)

        u = u_avg - lambada * Ix * grad
        v = v_avg - lambada * Iy * grad

    flow = [u, v]
    I = [Ix, Iy, It]

    return flow, I


## Lucas Kanade Optical Flow Algorithm
[Code taken from : View source](https://github.com/vineeths96/Video-Interpolation-using-Optical-Flow)

In [None]:
def lucas_kanade(firstImage, secondImage, N, tau=1e-3):
    """
    Lucas Kanade Optical flow estimation between firstImage and secondImage
    :param firstImage: First image
    :param secondImage: Second Image
    :param N: Block size N x N
    :param tau: Threshold parameter
    :return: Optical flow, Gradients
    """

    firstImage = firstImage / 255
    secondImage = secondImage / 255
    image_shape = firstImage.shape
    half_window_size = N // 2

    # Kernels for finding gradients Ix, Iy, It
    kernel_x = np.array([[-1, 1]])
    kernel_y = np.array([[-1], [1]])
    kernel_t = np.array([[1]])

    Ix = scipy.ndimage.convolve(input=firstImage, weights=kernel_x, mode="nearest")
    Iy = scipy.ndimage.convolve(input=firstImage, weights=kernel_y, mode="nearest")
    It = scipy.ndimage.convolve(input=secondImage, weights=kernel_t, mode="nearest") + scipy.ndimage.convolve(
        input=firstImage, weights=-kernel_t, mode="nearest"
    )

    u = np.zeros(image_shape)
    v = np.zeros(image_shape)

    # Find Lucas Kanade OF for a block N x N with least squares solution
    for row_ind in range(half_window_size, image_shape[0] - half_window_size):
        for col_ind in range(half_window_size, image_shape[1] - half_window_size):
            Ix_windowed = Ix[
                row_ind - half_window_size : row_ind + half_window_size + 1,
                col_ind - half_window_size : col_ind + half_window_size + 1,
            ].flatten()
            Iy_windowed = Iy[
                row_ind - half_window_size : row_ind + half_window_size + 1,
                col_ind - half_window_size : col_ind + half_window_size + 1,
            ].flatten()
            It_windowed = It[
                row_ind - half_window_size : row_ind + half_window_size + 1,
                col_ind - half_window_size : col_ind + half_window_size + 1,
            ].flatten()

            A = np.asarray([Ix_windowed, Iy_windowed]).reshape(-1, 2)
            b = np.asarray(It_windowed)

            A_transpose_A = np.transpose(A) @ A

            A_transpose_A_eig_vals, _ = np.linalg.eig(A_transpose_A)
            A_transpose_A_min_eig_val = np.min(A_transpose_A_eig_vals)

            # Noise thresholding
            if A_transpose_A_min_eig_val < tau:
                continue

            A_transpose_A_PINV = np.linalg.pinv(A_transpose_A)
            w = A_transpose_A_PINV @ np.transpose(A) @ b

            u[row_ind, col_ind], v[row_ind, col_ind] = w

    flow = [u, v]
    I = [Ix, Iy, It]

    return flow, I


In [None]:
def predict_frame(secondImage, forward_flow, If, image_ind, dataset, param, algo):
    """
    Future frame prediction
    :param secondImage: Second image (Frame N+1)
    :param forward_flow: Optical flow from Frame N to Frame N+1
    :param If: Forward gradients [Ix, Iy, It]
    :param image_ind: Current image index
    :param dataset: Dataset name
    :param algo: lucas_kanade/ horn_schunck
    """

    uf, vf = forward_flow
    x, y = np.float32(np.meshgrid(np.arange(secondImage.shape[0]), np.arange(secondImage.shape[1])))
    x1, y1 = np.float32(x + uf), np.float32(y + vf)
    warped_image = cv2.remap(secondImage, x1, y1, interpolation=cv2.INTER_CUBIC)
  #  plot_opt_flow(uf, vf, secondImage)
    

    if not os.path.exists(f"./results/{algo}/{param}/predicted_frames/{dataset}/"):
        os.makedirs(f"./results/{algo}/{param}/predicted_frames/{dataset}/")
    cv2.imwrite(f"./results/{algo}/{param}/predicted_frames/{dataset}/forward_prediction_{image_ind}.png",
    warped_image,)

    return cv2.imread(f"./results/{algo}/{param}/predicted_frames/{dataset}/forward_prediction_{image_ind}.png", flags=cv2.IMREAD_GRAYSCALE)


In [None]:
def corridor_frame_prediction(param, algo):
    """
    Corridor dataset: prediction of Frame N+2 using Frame N and Frame N+1
    :param : parameter
    :algo : lucas_kanade/ horn_schunck
    :return: None
    """
    commands = {
    'lucas_kanade': lucas_kanade,
    'horn_schunck': horn_schunck}

    func = commands[algo]

    images = glob.glob("./input/corridor/*.pgm")
    images.sort(key=lambda f: int(re.sub("\D", "", f)))

    for ind in range(0, len(images) - 1, 2):
        firstImage = cv2.imread(images[ind], flags=cv2.IMREAD_GRAYSCALE)
        secondImage = cv2.imread(images[ind + 1], flags=cv2.IMREAD_GRAYSCALE)
        thirdImage = cv2.imread(images[ind + 2], flags=cv2.IMREAD_GRAYSCALE)

        firstImage = np.array(firstImage, dtype=np.float32)
        secondImage = np.array(secondImage, dtype=np.float32)
        
        forward_flow, If = func(firstImage, secondImage, param)

        compare_images(predict_frame(secondImage, forward_flow, If, ind+2, "corridor", param, algo), thirdImage)

In [None]:
def sphere_frame_prediction(param, algo):
    """
    Sphere dataset: prediction of Frame N+2 using Frame N and Frame N+1
    :param : parameter
    :algo : lucas_kanade/ horn_schunck
    :return: None
    """

    commands = {
    'lucas_kanade': lucas_kanade,
    'horn_schunck': horn_schunck}

    func = commands[algo]

    images = glob.glob("./input/sphere/*.ppm")
    images.sort(key=lambda f: int(re.sub("\D", "", f)))

    for ind in range(0, len(images) - 2, 2):
        firstImage = cv2.imread(images[ind], flags=cv2.IMREAD_GRAYSCALE)
        secondImage = cv2.imread(images[ind + 1], flags=cv2.IMREAD_GRAYSCALE)
        thirdImage = cv2.imread(images[ind + 2], flags=cv2.IMREAD_GRAYSCALE)

        firstImage = np.array(firstImage, dtype=np.float32)
        secondImage = np.array(secondImage, dtype=np.float32)

        forward_flow, If = func(firstImage, secondImage, param)

        compare_images(predict_frame(secondImage, forward_flow, If, ind+2, "sphere", param, algo), thirdImage)

In [None]:
print('Horn Schunck Algorithm--------------------------------')

for wt in [0.01, 0.25, 0.5, 1]:
    print(f'lambda = {wt}')
    corridor_frame_prediction(wt, 'horn_schunck')
    sphere_frame_prediction(wt, 'horn_schunck')

In [None]:
print('Lucas Kanade Algorithm-----------------------------------')
for win_size in [5, 8, 11, 14]:
    print(f'window size = {win_size}')
    corridor_frame_prediction(win_size, 'lucas_kanade')
    sphere_frame_prediction(win_size, 'lucas_kanade')