In [1]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches

import os
os.makedirs("../out", exist_ok=True)

from tqdm import tqdm

# Q2: Affine Motion Subtraction

Make sure to comment your code and use proper names for your variables.

## Q2.1: Lucas-Kanade Tracking with Affine Motion

In [102]:
# We recommend using this function, but you can explore other methods as well (e.g., ndimage.shift).
from scipy.interpolate import RectBivariateSpline as RBS

# The function below could be useful as well :) 
from numpy.linalg import lstsq

def LucasKanadeAffine(It, It1, threshold, num_iters):
    """
    :param[np.array(H, W)] It   : Image frame at time-step t
    :param[np.array(H, W)] It1  : Image frame at time-step t+1
    :param[float] threshold     : If the length of dp < threshold, terminate the optimization
    :param[int] num_iters       : Number of iterations for running the optimization

    :return[np.array(2, 3)] M   : Affine warp matrix
    """
    # Initial M
    M = np.eye(3)
    #M = np.array([[1.0, 0.0, 0.0], [0.0, 1.0, 0.0]])
    p = np.zeros(6)
    dp_thresh = 1
    i = 0
    
    It_spline = RBS(np.arange(It.shape[0]), np.arange(It.shape[1]), It)
    It1_spline = RBS(np.arange(It1.shape[0]), np.arange(It1.shape[1]), It1)
    
    # ----------------------------------------------------------------------------------------------
    # TODO: Add your LK implementation here: 
    while (i <= num_iters) and (dp_thresh >= threshold):
        x_ = np.linspace(0, It.shape[0], It.shape[0])
        y_ = np.linspace(0, It.shape[1], It.shape[1])
        x_mesh, y_mesh = np.meshgrid(x_, y_)
        
        x_warped = (1 + p[0]) * x_mesh + p[1] * y_mesh + p[2]
        y_warped = p[3] * x_mesh + (1 + p[4]) * y_mesh + p[5]
        
        #Determining indices that meet the conditions         
        idx_valid1 = (x_warped >= 0)
        idx_valid2 = (x_warped <= It.shape[1])
        idx_valid_x = (idx_valid1 == idx_valid2)
        
        idx_valid3 = (y_warped >= 0)
        idx_valid4 = (y_warped <= It.shape[0])
        idx_valid_y = (idx_valid3 == idx_valid4)
        
        idx_valid = (idx_valid_x == idx_valid_y)
        
        x_warped_valid = x_warped[idx_valid].flatten()
        y_warped_valid = y_warped[idx_valid].flatten()
        
        It1_inter = It1_spline.ev(y_warped_valid, x_warped_valid).flatten()
        
        It1_grad_x = It1_spline.ev(y_warped_valid, x_warped_valid, dx = 1, dy = 0).flatten()
        It1_grad_y = It1_spline.ev(y_warped_valid, x_warped_valid, dx = 0, dy = 1).flatten()
        It1_grad = np.vstack((It1_grad_y, It1_grad_x)).T
        print(It1_grad.shape)
        
        A = np.zeros((It1_grad_x.shape[0], 6))
        #From paper: dW/dp = [x 0 y 0 1 0;
        #                    0 x 0 y 0 1]
        
        dW = np.array([[x_mesh.flatten(), 0, y_mesh.flatten(), 0, 1, 0 ], [0, x_mesh.flatten(), 0, y_mesh.flatten(), 0, 1]])
        
        A = It1_grad @ dW
        
        b = It[idx_valid].flatten() - It1_inter
        
        delta_p, dp_thresh,_, _ = lstsq(A, b, rcond = None)
        
        p += delta_p
        i += 1
        
    M = np.array([[1 + p[0], p[1], p[2]], [p[3], 1 + p[4], p[5]], [0, 0, 1]])
    # ----------------------------------------------------------------------------------------------
    return M

# Q2.2: Dominant Motion Subtraction

In [103]:
# These functions could be useful for your implementation. 
from scipy.ndimage import binary_erosion, binary_dilation, affine_transform
import cv2

def SubtractDominantMotion(It, It1, num_iters, threshold, tolerance):
    """
    :param[np.array(H, W)] It   : Image frame at time-step t
    :param[np.array(H, W)] It1  : Image frame at time-step t+1
    :param[float] threshold     : For LucasKanadeAffine --> If the length of dp < threshold, 
                                  terminate the optimization
    :param[int] num_iters       : For LucasKanadeAffine --> Number of iterations for running the 
                                  optimization
    :param[float] tolerance     : Binary threshold of intensity difference when computing the mask.
   
    :return[np.array(H, W)] mask: Binary mask indicating moving pixels. 
    """
    mask = np.ones(It1.shape, dtype=bool)

    # ----------------------------------------------------------------------------------------------
    # TODO: Add your code here:
    M = LucasKanadeAffine(It, It1, threshold, num_iters)
    It_warp = cv2.warpAffine(It, M)
    
    error = np.absolute(It1 - It_warp)
    mask = error > tolerance #Locations where movement occurs
    mask = binary_erosion(mask)
    mask = binary_dilation(mask)
    
    # ----------------------------------------------------------------------------------------------
    return mask 

## Q2.3: Track Sequence

In [104]:
def TrackSequenceAffineMotion(seq, num_iters, threshold, tolerance):
    """
    :param[np.array(H, W, N)] seq : sequence of frames
    :param[int] num_iters         : Number of iterations for running the optimization
    :param[float] threshold       : If the length of dp < threshold, terminate the optimization
    :param[float] tolerance       : Binary threshold of intensity difference when computing the mask.

    :return[np.array(H, W)] mask: Binary mask indicating moving pixels.
    """
    H, W, N = seq.shape
    print(H, W)

    masks = np.empty((H, W, N))
    It = seq[:,:,0]

    # ----------------------------------------------------------------------------------------------
    # TODO: Add your code here:
    for i in tqdm(range(1, seq.shape[2])):
        It1 = seq[:, :, i]
        
        mask = SubtractDominantMotion(It, It1, num_iters, threshold, tolerance)
        masks[:, :, i] = mask

    # ----------------------------------------------------------------------------------------------
    
    masks = np.stack(masks, axis=2)
    return masks

### Q2.3: Track Ant Sequence

Feel free to play with these snippets of code; run ablations, visualize a gif with the whole sequence, etc.

Just make sure the bounding boxes for the car are clearly visible, and report those of the frames we requested. 

In [105]:
seq = np.load("../data/antseq.npy")

# NOTE: feel free to play with these parameters
num_iters = 1e4
threshold = 1e-2
tolerance = 0.2

masks = TrackSequenceAffineMotion(seq, num_iters, threshold, tolerance)
np.save(f'../out/antseqmasks.npy', masks)

256 256


  0%|          | 0/124 [00:00<?, ?it/s]

Valid index shape: (256, 256)
Image template: [0.66470588 0.66666667 0.67941176 ... 0.67156863 0.68431373 0.68627451]


  dW = np.array([[x_mesh.flatten(), 0, y_mesh.flatten(), 0, 1, 0 ], [0, x_mesh.flatten(), 0, y_mesh.flatten(), 0, 1]])
  0%|          | 0/124 [02:12<?, ?it/s]
Exception ignored in sys.unraisablehook: <built-in function unraisablehook>
Traceback (most recent call last):
  File "C:\Users\prana\anaconda3\lib\site-packages\ipykernel\iostream.py", line 512, in write
    if not isinstance(string, str):
KeyboardInterrupt


MemoryError: Unable to allocate 512. KiB for an array with shape (65536,) and data type float64

In [None]:
# TODO: visualize
frames_to_save = []

for idx in frames_to_save:
    pass
    # frame = 
    # mask = 
   
    # plt.figure()
    # plt.imshow(frame, cmap="gray", alpha=0.5)
    # plt.imshow(np.ma.masked_where(np.invert(mask), mask))
    # plt.axis('off')
    # plt.savefig(f"../out/sol_2.3_antseq_{idx+1}.png")

### Q2.3: Test Aerial Sequence

Feel free to play with these snippets of code; run ablations, visualize a gif with the whole sequence, etc.

Just make sure the bounding boxes for the car are clearly visible, and report those of the frames we requested. 

In [13]:
seq = np.load("../data/aerialseq.npy")

# NOTE: feel free to play with these parameters
num_iters = 1000
threshold = 0.01
tolerance = 0.2

masks = TrackSequenceAffineMotion(seq, num_iters, threshold, tolerance)
np.save(f'../out/aerialseqmasks.npy', masks)

In [15]:
# TODO: visualize
frames_to_save = []

for idx in frames_to_save:
    pass
    # frame = 
    # mask = 
   
    # plt.figure()
    # plt.imshow(frame, cmap="gray", alpha=0.5)
    # plt.imshow(np.ma.masked_where(np.invert(mask), mask))
    # plt.axis('off')
    # plt.savefig(f"../out/sol_2.3_aerialseq_{idx+1}.png")

In [11]:
#Logic test
x = np.arange(4)
print(x)

valid = x > 0
print(valid)
print(x[valid])

x_stack = np.stack(np.reshape(x, (2,2)), axis = 1)

print(x_stack)

[0 1 2 3]
[False  True  True  True]
[1 2 3]
[[0 2]
 [1 3]]
