In [1]:
import numpy as np
import cv2
import matplotlib.pyplot as plt


In [9]:
def computeBilinerWeights(q):

    ## TODO 2.1
    ## - Compute bilinear weights for point q
    ## - Entry 0 weight for pixel (x, y)
    ## - Entry 1 weight for pixel (x + 1, y)
    ## - Entry 2 weight for pixel (x, y + 1)
    ## - Entry 3 weight for pixel (x + 1, y + 1)

    weights = [1, 0, 0, 0]
    a,b = q
    weights[0] = np.dot(1-a,1-b)
    weights[1] = np.dot(a,1-b)
    weights[2] = np.dot(1-a,b)
    weights[3] = np.dot(a,b)

    return weights

def computeGaussianWeights(winsize, sigma):

    ## TODO 2.2
    ## - Fill matrix with gaussian weights
    ## - Note, the center is ((winSize.width - 1) / 2,winSize.height - 1) / 2)
    cx, cy = (winsize[0]-1)/2, (winsize[1]-1)/2
    xran = np.linspace(0,winsize[0],winsize[0],endpoint=False)
    yran = np.linspace(0,winsize[1],winsize[1],endpoint=False)
    xspace, yspace = np.meshgrid(xran,yran)
    xhat = (cx-xspace)/winsize[0]
    yhat = (cy-yspace)/winsize[1]

    weights = np.exp(-(xhat**2 + yhat**2)/(2*sigma**2))

    return np.array(weights)

def invertMatrix2x2(A):

    ## TODO 2.3
    ## - Compute the inverse of the 2 x 2 Matrix A
    invA = (1/(A[0,0]*A[1,1] - A[0,1]*A[1,0]))*np.array([[A[1,1],-A[0,1]], [-A[1,0], A[0,0]]])

    return invA



In [3]:
class OpticalFlowLK:

    def __init__(self, winsize, epsilon, iterations):

        self.winsize = winsize
        self.epsilon = epsilon
        self.iterations = iterations

    def compute(self, prevImg, nextImg, prevPts):

        assert prevImg.size != 0 and nextImg.size != 0, "check prevImg and nextImg"
        assert prevImg.shape[0] == nextImg.shape[0], "size mismatch, rows."
        assert prevImg.shape[1] == nextImg.shape[1], "size mismatch, cols."

        N = prevPts.shape[0]
        status = np.ones(N, dtype=int)
        nextPts = np.copy(prevPts)

        prevDerivx = []
        prevDerivy = []
        
        ## TODO 2.4
        ## Compute the spacial derivatives of prev using the Scharr function
        ## - Make sure to use the normalized Scharr filter!!
        prevDerivx = cv2.Scharr(prevImg,dx=1,dy=0,ddepth=cv2.CV_8U, scale=1/32)
        prevDerivy = cv2.Scharr(prevImg,dx=0,dy=1,ddepth=cv2.CV_8U, scale=1/32)


        halfWin = np.array([(self.winsize[0] - 1) * 0.5, (self.winsize[1] - 1) * 0.5])
        weights = computeGaussianWeights(self.winsize, 0.3)

        for ptidx in range(N):
            u0 = prevPts[ptidx]
            u0 -= halfWin

            u = u0
            iu0 = [int(np.floor(u0[0])), int(np.floor(u0[1]))]

            if iu0[0] < 0 or \
                    iu0[0] + self.winsize[0] >= prevImg.shape[1] - 1 or \
                    iu0[1] < 0 or \
                    (iu0[1] + self.winsize[1] >= (prevImg.shape[0] - 1)):
                status[ptidx] = 0
                continue

            bw = computeBilinerWeights(u0)

            bprev = np.zeros((self.winsize[0] * self.winsize[1], 1))
            A = np.zeros((self.winsize[0] * self.winsize[1], 2))
            AtWA = np.zeros((2, 2))
            invAtWA = np.zeros((2, 2))
            j = 0
            
            for y in range(self.winsize[1]):
                for x in range(self.winsize[0]):
                    gx = int(iu0[0] + x)
                    gy = int(iu0[1] + y)

                    ## TODO 3.1
                    ## Compute the following parts of step 2.
                    ##   bprev      Size: (w*h) x 1 Matrix
                    ##   A          Size: (w*h) x 2 Matrix
                    ##   AtWA       Size:     2 x 2 Matrix
                    ## Use the bilinear weights bw!
                    ## W is stored in 'weights', but not as a diagonal matrix!!!
                    bprev = prevImg[x,y]
                    A[j,0] = prevDerivx[gy,gx]
                    A[j,1] = prevDerivy[gy,gx]
                    j+=1
            
            AtWA = A.T * weights.flatten() @ A # A.t->(2,#pixels)*(#pixels,)->(2,#pixels)@(#pixels,2)->(2,2)


            ## TODO 3.1
            ## Compute invAtWA
            ## Use the function invertMatrix2x2
            invAtWA = invertMatrix2x2(AtWA)

            ## Estimate the target point with the previous point
            u = u0

            ## Iterative solver
            for j in range(self.iterations):
                iu = [int(np.floor(u[0])), int(np.floor(u[1]))]

                if iu[0] < 0 or iu[0] + self.winsize[0] >= prevImg.shape[1] - 1 \
                        or iu[1] < 0 or iu[1] + self.winsize[1] >= prevImg.shape[0] - 1:
                    status[ptidx] = 0
                    break

                bw = computeBilinerWeights(u)
                AtWbnbp = [0, 0]
                bnext = np.zeros((self.winsize[0]*self.winsize[1],1))
                k=0
                
                for y in range(self.winsize[1]):
                    for x in range(self.winsize[0]):
                        gx = iu[0] + x
                        gy = iu[1] + y

                        ## TODO 3.2
                        ## Compute the following parts of step 2
                        ## AtWbnbp    2 x 1 vector
                        bnext[k] = nextImg[x,y]
                        k+=1
                
                AtWbnbp = A.T*weights.flatten() @ (bnext-bprev)

                AtWA = np.matmul(A.transpose(), A)

                ## TODO 3.2
                ## - Solve the linear system for deltaU: At * W * A * deltaU = - At * W * (bnext - bprev)
                ## - Add deltaU to u
                ## - Implement the early termination condition (Step 4)
                deltaU = -invAtWA @ AtWbnbp
                deltaU = deltaU.reshape(-1)
                u += deltaU
                if np.linalg.norm(deltaU) < self.epsilon:
                    break


            nextPts[ptidx] = u + halfWin

        return nextPts, status



In [10]:
def test():
    ## Test for bilinear weights
    p = np.array([0.125, 0.82656])
    weights = np.array([0.15176, 0.02168, 0.72324, 0.10332])

    err = computeBilinerWeights(p) - weights
    e = np.linalg.norm(err)

    print("computeBilinearWeights error: " + str(e))
    if e < 1e-6:
        print("Test: SUCCESS!")
    else:
        print("Test: FAIL")

    ## Test for Gaussian kernel
    kernel = np.array(
        [[0.1690133273455962, 0.3291930023422986, 0.4111123050281957, 0.3291930023422986, 0.1690133273455962],
         [0.3291930023422986, 0.6411803997536073, 0.8007374099875735, 0.6411803997536073, 0.3291930023422986],
         [0.4111123050281957, 0.8007374099875735, 1, 0.8007374099875735, 0.4111123050281957],
         [0.3291930023422986, 0.6411803997536073, 0.8007374099875735, 0.6411803997536073, 0.3291930023422986],
         [0.1690133273455962, 0.3291930023422986, 0.4111123050281957, 0.3291930023422986, 0.1690133273455962]])
    res = computeGaussianWeights((5, 5), 0.3)
    err = res - kernel
    e = np.linalg.norm(err)
    print("computeGaussianWeights error: " + str(e))
    if e < 1e-6:
        print("Test: SUCCESS!")
    else:
        print("Reference " + str(kernel))
        print("Your result " + str(res))
        print("Test: FAIL")

    ## Tests for matrix inversion and gaussian weights
    A = np.array([[12, 4], [4, 8]])
    Ainv = np.array([[0.1, -0.05], [-0.05, 0.15]])
    err = invertMatrix2x2(A) - Ainv
    e = np.linalg.norm(err)

    print("invertMatrix2x2 error: " + str(e))
    if e < 1e-10:
        print("Test: SUCCESS!")
    else:
        print("Test: FAIL")

    print("=" * 30)


test()

computeBilinearWeights error: 2.797157557069881e-17
Test: SUCCESS!
computeGaussianWeights error: 6.177799171302894e-08
Test: SUCCESS!
invertMatrix2x2 error: 2.7755575615628914e-17
Test: SUCCESS!


In [None]:
def main():
    cap = cv2.VideoCapture('/Users/awritrojitbanerjee/FAU/Sem2/CV/Ex/submission_ex5_actually_ex4/slow_traffic_small.mp4')

    # Check if camera opened successfully
    if (cap.isOpened() == False):
        print("Error opening video stream or file")

    last_grey = None
    last_frame = None
    last_keypoints = []
    status = []

    winsize = [25, 25]

    while (cap.isOpened()):
        ret, frame = cap.read()
        scale = 0.7
        frame = cv2.resize(frame, None, fx=scale, fy=scale)

        grey = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        termcrit = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 20, 0.03)
        subPixWinSize = (10, 10)

        keypoints = cv2.goodFeaturesToTrack(grey, maxCorners=200, qualityLevel = 0.01, minDistance=10, mask=None, blockSize=3, useHarrisDetector=False, k=0.04)
        cv2.cornerSubPix(grey, keypoints, subPixWinSize, tuple([-1, -1]), termcrit)

        keypoints = np.array(keypoints)
        keypoints = np.squeeze(keypoints, axis=1)

        # keypoints = np.array([[302.279, 100.72]])

        flowPoints = 0
        if not len(last_keypoints) == 0:

            of = OpticalFlowLK(winsize, 0.03, 20)
            points, status = of.compute(last_grey, grey, np.copy(last_keypoints))

            for i in range(len(points)):

                if not status[i]:
                    continue

                diff = points[i] - last_keypoints[i]
                distance = np.linalg.norm(diff)

                if distance > 15 or distance < 0.2:
                    continue

                otherP = last_keypoints[i] + diff * 15
                flowPoints += 1

                color = tuple([0, 255, 0])
                cv2.circle(last_frame, center = tuple(last_keypoints[i].astype(int)), radius = 1, color=color)
                cv2.line(last_frame, tuple(last_keypoints[i].astype(int)), tuple(otherP.astype(int)), color)

            cv2.imshow("out", last_frame)
            cv2.waitKey(1)

            print("[Keypoints] moving/total: {} / {}".format(flowPoints, len(points)))
        last_keypoints = np.copy(keypoints)
        last_grey = np.copy(grey)
        last_frame = np.copy(frame)


main()
