Combine spatial and temporal processing to emaphsize subtle spatial changes.
1. take standard video sequences as input and decompose it into different spatial frequency band using laplacian pyramid
2. take the sequence of pixel values over time and apply a temporal bandpass filter to extract the frequency band of interest
3. resulting signal is amplified and added back to the frames
4. collapse pyramid to generate output video

In [1]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import convolve2d
from scipy import ndimage
import skimage as sk
from skimage import io, color
import cv2
import matplotlib.pyplot as plt
import scipy.signal
from IPython.display import HTML
from base64 import b64encode
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
baby_path = "/content/drive/MyDrive/cs180/Final Projects/Video Magnification/baby.mp4"
face_path = "/content/drive/MyDrive/cs180/Final Projects/Video Magnification/face.mp4"
engine_path = "/content/drive/MyDrive/cs180/Final Projects/Video Magnification/engine.mp4"

## Miscellaneous

In [3]:
def rgb_to_yiq(rgb_image):
  transform_matrix = np.array([[0.299, 0.587, 0.114],
                              [0.596, -0.275, -0.321],
                              [0.212, -0.523, 0.311]])
  reshaped_rgb = rgb_image.reshape((-1, 3))
  yiq = reshaped_rgb @ transform_matrix.T
  yiq_image = yiq.reshape(rgb_image.shape)
  return yiq_image

def yiq_to_rgb(yiq_image):
  inverse_transform_matrix = np.array([[1, 0.956, 0.621],
                              [1, -0.272, -0.647],
                              [1, -1.106, 1.703]])
  reshaped_yiq = yiq_image.reshape((-1,3))
  rgb = reshaped_yiq @ inverse_transform_matrix.T
  rgb_image = np.clip(rgb, 0, 255).reshape(yiq_image.shape).astype(np.uint8)
  return rgb_image

def convert_frames_to_yiq(rgb_frames):
  yiq_frames = []
  for frame in rgb_frames:
    yiq_frames.append(rgb_to_yiq(frame))
  return yiq_frames
def convert_frames_to_rgb(yiq_frames):
  rgb_frames = []
  for frame in yiq_frames:
    rgb_frames.append(yiq_to_rgb(frame))
  return rgb_frames

In [4]:
def display_video(file_path):
  video = open(file_path, 'rb').read()
  video_encoded = b64encode(video).decode('ascii')
  video_tag = f'<video controls alt="output video" src="data:video/mp4;base64,{video_encoded}">'
  return HTML(video_tag)



## Laplacian pyramid

In [10]:
def generate_gaussian_pyramid(image, num_levels, kernel_size, sigma):
  gaussian_pyramid = [image]
  for _ in range(1, num_levels):
    image = cv2.pyrDown(image)
    gaussian_pyramid.append(image)
  return gaussian_pyramid

def generate_laplacian_pyramid(image, num_levels, kernel_size=80, sigma=1):
  gaussian_pyramid = generate_gaussian_pyramid(image, num_levels, kernel_size, sigma)
  laplacian_pyramid = []
  for i in range(len(gaussian_pyramid) - 1):
    size = (gaussian_pyramid[i].shape[1],gaussian_pyramid[i].shape[0])
    expanded = cv2.pyrUp(gaussian_pyramid[i + 1], dstsize=size)
    layer = cv2.subtract(gaussian_pyramid[i], expanded)
    laplacian_pyramid.append(layer)
  laplacian_pyramid.append(gaussian_pyramid[-1])
  return laplacian_pyramid

def collapse_laplacian_pyramid(laplacian_pyramid):
  reconstructed_image = laplacian_pyramid[-1]
  for level in reversed(laplacian_pyramid[:-1]):
    size = (level.shape[1], level.shape[0])  # (width, height)
    reconstructed_image = cv2.pyrUp(reconstructed_image, dstsize=size)
    reconstructed_image = cv2.add(reconstructed_image, level)
  return reconstructed_image

##Temporal filtering

In [9]:
#=============================================
# extract time series
#=============================================
def extract_time_series(frames, num_levels):
  # laplacian pyramids for each frame in frames
  laplacian_pyramids = [generate_laplacian_pyramid(frame, num_levels) for frame in frames]

  shapes_per_layer = [layer.shape for layer in laplacian_pyramids[0]]
  time_series = [np.zeros((shape[0], shape[1], shape[2], len(frames))) for shape in shapes_per_layer]

  for laplacian_index, laplacian in enumerate(laplacian_pyramids):
    for level_index, layer in enumerate(laplacian):
      time_series[level_index][:, :, :, laplacian_index] = layer
  return time_series, laplacian_pyramids

#=============================================
# filter time series using freqz
#=============================================


# Band-pass Filtering, filter time series before convert to FFT
def filter_timeseries(time_series, lowcut = 0.2, highcut = 1.5, fs = 60, n = 6):
  '''
  input: timeseries: a list of NumPy arrays,each has shape (height, width, number of color channels, number of frames)
        lowcut: low cut frequency in Hz
        highcut: high cut frequency in Hz
        fs: sampling frequency
        n: sample order

  output: filtered time series
  '''
  filtered_time_series_per_level = []
  for ts in time_series:
    # create a Butterworth band-pass filter; coefficients of the filter's transfer function
    b, a = scipy.signal.butter(n, [lowcut, highcut], btype='band', fs=fs)

    # convert to frequency domain
    fft_series = np.fft.fft(ts, axis = 3)
    # get the frequency components of the filter
    # w: frequencies, h: frequency responses at each frequency in w
    w, h = scipy.signal.freqz(b, a, worN=fft_series.shape[3])

    # apply filter's effect
    filtered_fft = fft_series * h

    # fft -> time series
    filtered_time_series = np.fft.ifft(filtered_fft)
    filtered_time_series_per_level.append(filtered_time_series)

  return filtered_time_series_per_level

##Image reconstruction

In [7]:
def amplify_changes(time_domain_series, amplification_factor = 50):
  return [ts * amplification_factor for ts in time_domain_series]


def integrate_amplified_changes(laplacian_pyramids, amplified_time_series):
  '''
  laplacian_pyramids : num_framaes, num_pyramid_layers, y,x,c
  amplified_time_series: num_pyramid_layers, y,x,c, num_frames
  '''
  modified_pyramids = []
  # iterate through each laplacian pyramid for each frame
  for pyramid_index, pyramid in enumerate(laplacian_pyramids):
    # construct a new pyramid for that frame
    curr_pyramid = []
    # for each layer of this pyramid
    for level_index, layer in enumerate(pyramid):
      # convert layer to real number
      layer = layer
      # this is the amplified layer at given frame
      amplified_layer = amplified_time_series[level_index][:,:,:,pyramid_index] # (y,x,c, num_frames)
      assert amplified_layer.shape == layer.shape
      modified_layer = amplified_layer + layer
      modified_layer = np.real(modified_layer).astype(np.float32)
      curr_pyramid.append(modified_layer)
    modified_pyramids.append(curr_pyramid)
  return modified_pyramids

def collapse_laplacian_pyramid(laplacian_pyramid):
  reconstructed_image = laplacian_pyramid[-1]
  for level in reversed(laplacian_pyramid[:-1]):
    size = (level.shape[1], level.shape[0])  # (width, height)
    reconstructed_image = cv2.pyrUp(reconstructed_image, dstsize=size)
    reconstructed_image = cv2.add(reconstructed_image, level)
  return reconstructed_image

def reconstruct_frames_from_pyramids(modified_pyramids):
  reconstructed_frames = []
  for pyramid in modified_pyramids:
    frame = collapse_laplacian_pyramid(pyramid)
    reconstructed_frames.append(frame)
  return reconstructed_frames


In [11]:
import gc
def video_magnification(video_path, output_dir, batch_size = 25):
  cap = cv2.VideoCapture(video_path)

  fps = cap.get(cv2.CAP_PROP_FPS)
  fourcc = cv2.VideoWriter_fourcc(*'mp4v')
  width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
  height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
  out = cv2.VideoWriter(output_dir, fourcc, fps, (width, height))

  # Read each frame from the video
  while True:

    input_frame = []

    for _ in range(batch_size):
      ret, frame = cap.read()
      if not ret:
          break
      rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
      input_frame.append(rgb_frame)

    if not input_frame:
      break

    yiq_input_frames = convert_frames_to_yiq(input_frame)
    time_series, laplacian_pyramids = extract_time_series(yiq_input_frames, num_levels = 8)
    filtered_time_series_list = filter_timeseries(time_series)
    amplified_time_series = amplify_changes(filtered_time_series_list)
    modified_pyramids = integrate_amplified_changes(laplacian_pyramids, amplified_time_series)
    reconstructed_frames = reconstruct_frames_from_pyramids(modified_pyramids)
    reconstructed_frames_rgb = convert_frames_to_rgb(reconstructed_frames)

    for frame in reconstructed_frames_rgb:
      frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
      out.write(frame_bgr)


    del input_frame
    del yiq_input_frames
    del reconstructed_frames_rgb
    del reconstructed_frames
    del time_series
    del laplacian_pyramids
    del filtered_time_series_list
    del amplified_time_series
    del modified_pyramids
    gc.collect()
    # break

  cap.release()

video_magnification(engine_path, '/content/engine_color.mp4')