In [9]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [13]:
cd /content/drive/MyDrive/CSE455 Computer Vision Final Project

/content/drive/MyDrive/CSE455 Computer Vision Final Project


In [14]:
ls

 [0m[01;34mCombined_Frames[0m/               'Project Proposal.gdoc'
 Farneback_Method.ipynb         'Project timeline.gsheet'
 Interpolated.mov                [01;34mSampleVideo1_Frames[0m/
 Interpolated.mp4                [01;34mSampleVideo1_Frames_Interpolated[0m/
 InterpolatedSampleVideo1.mov    SampleVideo1.mov
 InterpolatedSampleVideo1.mp4    SampleVideo1.mp4
'Meeting Notes 3 17 2023.gdoc'   [01;34mTutorials[0m/
'Meeting Notes.gdoc'             VideoFrameInterpolationWithOpticalFlow.ipynb


# CSE 455 Computer Vision Final Project: Video Frame Rate Interpolation with Farneback Optical Flow
## Author: Ciel Sun, Chien Van Le
## Date: March 10, 2023
## Overview: 
## This script will perform a video frame interpolation using the technique of optical flow. We will use Lucas Kanade(our own implemenation) and Farneback(cv2 library) optical flow to interpolate the video sequence. The generated video sequence will have a frame rate about twice that of the original/source video sequence(i.e. original frame rate 30fps---> new frame rate 59fps).



In [31]:
import cv2
import numpy as np
import os
from google.colab.patches import cv2_imshow

from scipy import signal

# Farneback Optical Flow Implementation

**Helper Method** 
This function takes a frame as input, extracts the red, green, and blue channels of the frame using array slicing, and computes the grayscale value using the formula. It then rounds the grayscale value to the nearest integer and converts it to a uint8 data type using NumPy's astype() function. Finally, it returns the grayscale frame.

In [124]:
def cvtColor(frame):
    # extract the red, green, and blue channels of the frame
    r = frame[:, :, 0]
    g = frame[:, :, 1]
    b = frame[:, :, 2]

    # compute the grayscale value using the formula
    gray = 0.299 * r + 0.587 * g + 0.114 * b

    # convert the grayscale value to uint8
    gray = np.round(gray).astype(np.uint8)

    return gray

**Helper Method**
This function takes an image as input, along with several optional parameters for computing the derivatives. It first creates Sobel filters for computing the horizontal and vertical derivatives, and then applies the filters to the input image using the scipy.signal.convolve2d() function. It then computes the magnitude and orientation of the derivatives using NumPy's sqrt() and arctan2() functions. Finally, it converts the magnitude to the specified data type using NumPy's astype() function, and returns the derivative in the specified direction (horizontal, vertical, or magnitude). You can adjust the optional parameters to get the desired results.

In [135]:
def sobel(image, ddepth, dx, dy, ksize=3):
    # create Sobel filters for computing the derivatives
    # sobel_x = np.array([[-1, 0, 1],
    #                     [-2, 0, 2],
    #                     [-1, 0, 1]])
    sobel_s1 = np.array([[1], [2], [1]])
    sobel_s2 = np.array([[-1, 0, 1]])
    # sobel_y = np.array([[-1, -2, -1],
    #                     [0, 0, 0],
    #                     [1, 2, 1]])

    # compute the horizontal and vertical derivatives using the Sobel filters
    fx = signal.convolve2d(image, sobel_s1, mode='same')
    fx = signal.convolve2d(fx, sobel_s2, mode='same')
    fy = signal.convolve2d(image, sobel_s2.T, mode='same')
    fy = signal.convolve2d(fy, sobel_s1.T, mode='same')

    # compute the magnitude and orientation of the derivatives
    mag = np.sqrt(fx**2 + fy**2)
    angle = np.arctan2(fy, fx)

    # convert the magnitude to the specified depth
    if ddepth == -1:
        mag = np.abs(mag)
        mag = np.round(mag * 255 / np.max(mag)).astype(np.uint8)
    else:
        mag = mag.astype(ddepth)

    # return the derivative in the specified direction
    if dx == 1 and dy == 0:
        return fx
    elif dx == 0 and dy == 1:
        return fy
    else:
        return mag

In [134]:
sx1 = np.array([[1], [2], [1]])
sx2 = np.array([[-1, 0, 1]])
sx1.T

array([[1, 2, 1]])

**Main Method**:
This method uses the defined cvtColor() function to convert the frames to grayscale, and the defined Sobel() function to compute the spatial gradients. It then computes the temporal gradient between the two frames by subtracting the grayscale frames, and computes the optical flow vectors using a least-squares solution to the optical flow equation. Finally, it returns the computed flow. You can adjust the optional parameters to get the desired results.

In [137]:
def calcOpticalFlowLucasKanade(prev_frame, next_frame, pyr_scale=0.5, levels=3, winsize=15, iterations=3,
                             poly_n=5, poly_sigma=1.2, flags=0):
    # convert frames to grayscale
    prev_gray = cvtColor(prev_frame)
    next_gray = cvtColor(next_frame)
    
    # compute the spatial gradients
    fx = sobel(prev_gray, -1, 1, 0, ksize=3)
    fy = sobel(prev_gray, -1, 0, 1, ksize=3)
    
    # compute the temporal gradient
    ft = next_gray - prev_gray
    
    # compute the optical flow vectors
    flow = np.zeros((prev_gray.shape[0], prev_gray.shape[1], 2))
    for i in range(prev_gray.shape[0]):
        for j in range(prev_gray.shape[1]):
            A = np.array([[np.sum(fx[i,j]**2), np.sum(fx[i,j]*fy[i,j])],
                          [np.sum(fx[i,j]*fy[i,j]), np.sum(fy[i,j]**2)]])
            b = -np.array([np.sum(fx[i,j]*ft[i,j]), np.sum(fy[i,j]*ft[i,j])])
            # d = np.linalg.solve(A + noise, b)  # Adding noise to avoid having correlated columns in A
            
            noise = np.array([[0.0001, 0], [0, 0.0001]])
            v = np.linalg.solve(A + noise, b)  # Adding noise to avoid having correlated columns in A
            flow[i,j] = v
    return flow

References: 

https://theailearner.com/tag/cv2-sobel/

https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.convolve2d.html

https://www.geeksforgeeks.org/python-opencv-cv2-cvtcolor-method/

https://docs.opencv.org/3.4/dc/d6b/group__video__track.html

https://www.geeksforgeeks.org/opencv-the-gunnar-farneback-optical-flow/

Two-Frame Motion Estimation Based on
Polynomial Expansion:

http://www.diva-portal.org/smash/get/diva2:273847/fulltext01.pdf

# Video Interpolatio

## 1. Decompose a video sequence into individual frame images

In [138]:
ls

 [0m[01;34mCombined_Frames[0m/               'Project Proposal.gdoc'
'CSE455 Final Project.gslides'  'Project timeline.gsheet'
 Farneback_Method.ipynb          [01;34mSampleVideo1_Frames[0m/
 Interpolated.mov                [01;34mSampleVideo1_Frames_Interpolated[0m/
 Interpolated.mp4                SampleVideo1.mov
 InterpolatedSampleVideo1.mov    SampleVideo1.mp4
 InterpolatedSampleVideo1.mp4    [01;34mTutorials[0m/
'Meeting Notes 3 17 2023.gdoc'   VideoFrameInterpolationWithOpticalFlow.ipynb
'Meeting Notes.gdoc'


In [139]:
im_path = "SampleVideo1_Frames"
if not os.path.exists(im_path):
  os.mkdir(im_path)
video_name = "SampleVideo1.mp4"
capture = cv2.VideoCapture(video_name)

success, frame1 = capture.read()
count = 1                                                                       # starting with "frame1.jpg"
if success: cv2.imwrite(im_path + f"/frame{count}-0.jpg", frame1)               # i.e. 'frame1-0.jpg', 'frame2-0.jpg'
fps_og = capture.get(cv2.CAP_PROP_FPS)                                          # original frame rate per second 25
# while success:
#   success, im = capture.read()
#   if success:
#     count += 1
#     cv2.imwrite(im_path + f"/frame{count}-0.jpg", im)                           # frames are saved as JPEG files
#     if cv2.waitKey(10) == 27:                                                 # exit if Escape is hit
#       break
totalFrames = len(os.listdir(im_path))
print(f'{totalFrames} frames decomposed from video {video_name}')
print(f'Original frame rate is {fps_og} fps')



175 frames decomposed from video SampleVideo1.mp4
Original frame rate is 25.0 fps


## 2. Generate interpolated frames

In [140]:
def interpolate(prvs, next, method = 'LucasKanade'):
  '''
  Generate and return a mid frame based on Farneback opitcal flow of the previous frame and next frame
  Parameters:
    prvs: previous frame as ndarray with BGR format
    next: next frame as ndarray with BGR format
  Return:
    mid_frame: generated middle frame between previous and next frame
  '''
  y_max = prvs.shape[0] - 1
  x_max = prvs.shape[1] - 1
  prvs_gray = cv2.cvtColor(prvs, cv2.COLOR_BGR2GRAY)
  next_gray = cv2.cvtColor(next, cv2.COLOR_BGR2GRAY)
  # Use gray images to calculate flow with dimension: n x m x 2; 2 channels:[dx/dt, dy/dt]
  if method == 'Farneback':
    flow = cv2.calcOpticalFlowFarneback(prvs_gray, next_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)          # farneback optical flow from cv2
  else:
    flow = calcOpticalFlowLucasKanade(prvs, next, pyr_scale=0.5, levels=3, winsize=15, iterations=3,
                             poly_n=5, poly_sigma=1.2, flags=0)                                        # our own implementation of Farneback optical flow
  mid_frame = prvs.copy()
  for y, row in enumerate(prvs):
    for x, col in enumerate(row):
      for z, pixVal in enumerate(col):
        new_y = y + int(flow[y, x, 1])
        new_x = x + int(flow[y, x, 0])
        new_y = new_y if new_y < y_max else y_max
        new_x = new_x if new_x < x_max else x_max
        mid_frame[y, x, z] = prvs[new_y, new_x, z]
  return mid_frame

In [141]:
# Test Block
a1 = np.array([[46656, 46656], [46656, 46656]]) + np.array([[0.0001, 0], [0, 0.0001]])
a2 = np.array([0, 0])
asolve = np.linalg.solve(a1, a2)
asolve


array([0., 0.])

In [143]:
# # Test block
# testFrame1 = frame1
# testFrame2 = cv2.imread('/content/drive/MyDrive/CSE455 Computer Vision Final Project/SampleVideo1_Frames/frame2-0.jpg')
# # cv2_imshow(testFrame1)
# # cv2_imshow(testFrame1)
# test_mid = interpolate(testFrame1, testFrame2)
# cv2_imshow(test_mid)

In [None]:
from natsort import natsorted, ns
file_list = natsorted(os.listdir(im_path))

out_path = im_path + "_Interpolated_LucasKanade"                                            # a path storing all interpolated frames
if not os.path.exists(out_path):
  os.mkdir(out_path)

for i in range(len(file_list) - 1):                                             # stop at the second last frame
# for i in range(3):
  prvs = cv2.imread(im_path + "/" + file_list[i])
  next = cv2.imread(im_path + "/" + file_list[i+1])
  mid_frame = interpolate(prvs, next)
  cv2.imwrite(out_path + f"/frame{i+1}-1.jpg", mid_frame)                       # i.e. 'frame1-1.jpg','frame2-1.jpg'...


In [None]:
out_path = im_path + "_Interpolated_LucasKanade"

## 3. Compose the original frames and interpolated frames back into one video sequence

In [None]:
mkdir Combined_Frames_LK

mkdir: cannot create directory ‘Combined_Frames’: File exists


In [None]:
cp SampleVideo1_Frames_Interpolated_LucasKanade/*.jpg Combined_Frames_LK

In [None]:
cp SampleVideo1_Frames/*.jpg Combined_Frames_LK

In [None]:
# Origianl frames and interpolation frames are both copied to directory "Combined_Frames"
interpolate_list = os.listdir(out_path)
combine_path = "Combined_Frames_LK"
combine_list = natsorted(os.listdir("Combined_Frames_LK"))
print(f"Original number of frames: {len(file_list)}")
print(f"Number of frames generated: {len(interpolate_list)}")
print(f"Total number of frames: {len(combine_list)}")

Original number of frames:175
Number of frames generated: 174
Total number of frames: 349


In [None]:
im_arr = []
h, w, c = frame1.shape
size = (w, h)
for fname in combine_list:
  im_path = os.path.join(combine_path, fname)
  im = cv2.imread(im_path)
  im_arr.append(im)

output = cv2.VideoWriter(f"Interpolated_LK_fps{2*fps_og}"+video_name, 
                         cv2.VideoWriter_fourcc(*'MP4V'),
                         2*fps_og, size)
for image in im_arr:
  output.write(image)
output.release()