# Import Library

In [33]:
import numpy as np
import matplotlib.pyplot as plt
import cv2
from typing import List
import time # 計算時間差
import concurrent.futures # for opencv paralle_for_

# Define some utilize function

In [34]:
def GetPixelValue(image: np.ndarray, x: float, y: float)->float:
  # boundary check
  yLimit, xLimit = image.shape
  if x<0:
    x=0
  if y<0:
    y=0
  if x>(xLimit-1):
    x = xLimit-2
  if y>(yLimit-1):
    y = yLimit-2
  xx = x-np.floor(x)
  yy = y-np.floor(y)
  x_a1 = np.min([xLimit - 1, int(x) + 1])
  y_a1 = np.min([yLimit - 1, int(y) + 1])

  return (1 - xx) * (1 - yy) * image[y][x]
  + xx * (1 - yy) * image[y][x_a1]
  + (1 - xx) * yy * image[y_a1][x]
  + xx * yy * image[y_a1][x_a1]

# GetPixelValue(img1, 0, 0)
# Q: why not just img1[0][0]?

In [35]:
'''
 single level optical flow
'''
def OpticalFlowSingleLevel(
    img1: np.ndarray,
    img2: np.ndarray,
    kp1: List[cv2.KeyPoint],
    kp2: List[cv2.KeyPoint],
    success: List[bool],
    inverse: bool = False,
    has_initial_guess: bool = False)->None:
    print('OpticalFlowSingleLevel')
    kp2=kp2[:len(kp1)]
    success=success[:len(kp1)]
    tracker = OpticalFlowTracker(img1, img2, kp1, kp2, success, inverse, has_initial_guess)

    with concurrent.futures.ThreadPoolExecutor() as executor:
      executor.map(tracker.calculateOpticalFlow, range(len(kp1)))

In [36]:
import time
milliseconds = int(time.time() * 1000)

print("当前时间的毫秒数：", milliseconds)

当前时间的毫秒数： 1692800792133


In [37]:
'''
 multi level optical flow, scale of pyramid is set to 2 by default
 the image pyramid will be create inside the function
'''
def OpticalFlowMultiLevel(
    img1: np.ndarray,
    img2: np.ndarray,
    kp1: List[cv2.KeyPoint],
    kp2: List[cv2.KeyPoint],
    success: List[bool],
    inverse: bool=False,)->None:
    print('OpticalFlowMultiLevel')
    # parameters
    pyramids = 4
    pyramid_scale = 0.5
    scales = [1.0, 0.5, 0.25, 0.125]
    # create pyramids
    t1 = time.perf_counter()
    pyr1 = []
    pyr2 = []
    for i in range(pyramids):
      if i==0:
        pyr1.append(img1)
        pyr2.append(img2)
      else:
        new_width = int(pyr1[i - 1].shape[1] * pyramid_scale)
        new_height = int(pyr1[i - 1].shape[0] * pyramid_scale)
        img1_pyr = cv2.resize(pyr1[i-1], (new_width, new_height))
        new_width = int(pyr2[i - 1].shape[1] * pyramid_scale)
        new_height = int(pyr2[i - 1].shape[0] * pyramid_scale)
        img2_pyr = cv2.resize(pyr2[i-1], (new_width, new_height))
        pyr1.append(img1_pyr)
        pyr2.append(img2_pyr)
    t2 = time.perf_counter()
    time_used = t2-t1
    print('build pyramid time: ', time_used)
    # coarse-to-fine LK tracking in pyramids
    kp1_pyr = []
    kp2_pyr = []
    for kp in kp1:
      kp_top = cv2.KeyPoint(kp.pt[0]*scales[pyramids-1], kp.pt[1]*scales[pyramids-1], kp.size)
      kp1_pyr.append(kp_top)
      kp2_pyr.append(kp_top)

    for level in range(pyramids-1, -1, -1):
      success = []
      t1 = time.perf_counter()
      OpticalFlowSingleLevel(pyr1[level], pyr2[level], kp1_pyr, kp2_pyr, success, inverse, True)
      t2 = time.perf_counter()
      time_used = t2-t1
      print('track pyr %d cost time %d'%(level, time_used))
      if level > 0:
        for i in range(len(kp1_pyr)):
          kp = kp1_pyr[i]
          cv2.KeyPoint(kp.pt[0]/pyramid_scale, kp.pt[1]/pyramid_scale, kp.size)
          kp1_pyr[i] = kp
        for kp in kp1_pyr:
          kp.pt[0] /= pyramid_scale
          kp[1] /= pyramid_scale
          kp /= pyramid_scale
        for kp in kp2_pyr:
          kp[0] /= pyramid_scale
          kp[1] /= pyramid_scale

    for kp in kp2_pyr:
      kp2.append(kp)

In [38]:
a = 1
b = 2
a /=b
print(a)

0.5


# Define Optical Flow Tracker Class

In [39]:
from typing import List

class OpticalFlowTracker:
  def __init__(self,
               img1: np.ndarray,
               img2: np.ndarray,
               kp1: List[cv2.KeyPoint],
               kp2: List[cv2.KeyPoint],
               success: List[bool],
               inverse: bool=True,
               hasInit: bool=False
               )->None:
    self.img1 = img1
    self.img2 = img2
    self.kp1 = kp1
    self.kp2 = kp2
    self.success = success
    self.inverse = inverse
    self.hasInit = hasInit
    print('hello')

  def calculateOpticalFlow(self, range:List[int]):
    # print('calculate optical flow')
    half_patch_size = 4
    iterations = 10
    start, end = range
    for i in range(start, end):
      kp = self.kp1[i]
      dx = 0
      dy = 0
      if self.hasInit:
        dx = self.kp2[i].pt[0]-kp.pt[0]
        dy = self.kp2[i].pt[1]-kp.pt[1]

      cost = 0
      lastCost = 0
      success = True # indicate if this point succeeded
      H = np.zeros((2, 2)) # Hessian
      b = np.zeros(2)     # bias
      J = np.zeros(2)

      for j in range(iterations):
        if not self.inverse:
          H = np.zeros((2, 2)) # Hessian
          b = np.zeros(2)     # bias
        else:
          # Only reset b
          b = np.zeros(2)

        cost = 0

        # compute cost and Jacobian
        for x in range(-half_patch_size, half_patch_size):
          for y in range(-half_patch_size, half_patch_size):
            error = GetPixelValue(img1, kp.pt[0] + x, kp.pt[1] + y) - GetPixelValue(img2, kp.pt[0] + x + dx, kp.pt[1] + y + dy) # Jacobian
            if not self.inverse:
              J = -1.0 * np.array([
                  0.5 * (GetPixelValue(img2, kp.pt.x + dx + x + 1, kp.pt.y + dy + y) - GetPixelValue(img2, kp.pt.x + dx + x - 1, kp.pt.y + dy + y)),
                  0.5 * (GetPixelValue(img2, kp.pt.x + dx + x, kp.pt.y + dy + y + 1) - GetPixelValue(img2, kp.pt.x + dx + x, kp.pt.y + dy + y - 1))
                  ])
            elif iter==0:
              # in inverse mode, J keeps same for all iterations
              # NOTE this J does not change when dx, dy is updated, so we can store it and only compute error
              J = -1.0 * np.array([
                  0.5 * (GetPixelValue(img1, kp.pt.x + x + 1, kp.pt.y + y) - GetPixelValue(img1, kp.pt.x + x - 1, kp.pt.y + y)),
                  0.5 * (GetPixelValue(img1, kp.pt.x + x, kp.pt.y + y + 1) - GetPixelValue(img1, kp.pt.x + x, kp.pt.y + y - 1))
                  ])
            # compute H, b and set cost;
            b += -error * J;
            cost += error * error;
            if (self.inverse == False or iter == 0):
              # Update H
              H += J * J.transpose()
        # Compute update
        update = np.linalg.solve(H, b)

        if update is None:
          print('update is none. Fail')
          break
        if iter>0 and cost > lastCost:
          break

        # update dx, dy
        dx += update[0];
        dy += update[1];
        lastCost = cost;
        succ = True;

        if update.norm() < 1e-2:
          # converge
          break
    success[i] = succ;
    # set kp2
    self.kp2[i].pt = kp.pt + cv2.Point2f(dx, dy);

# OpticalFlowTracker(img1, img2, keypoint1, keypoint2, True)

hello


<__main__.OpticalFlowTracker at 0x7e4e1c202050>

# Load Image

In [40]:
import os
if os.path.exists('/content/drive/MyDrive/GoogleCoLabWorkSpace/OpticalFlow/LK1.png'):
  image1_path = '/content/drive/MyDrive/GoogleCoLabWorkSpace/OpticalFlow/LK1.png'
  image2_path = '/content/drive/MyDrive/GoogleCoLabWorkSpace/OpticalFlow/LK2.png'
else:
  print('No file exist')

In [41]:
# images, note they are CV_8UC1, not CV_8UC3
img1 = cv2.imread(image1_path, cv2.IMREAD_GRAYSCALE)
img2 = cv2.imread(image2_path, cv2.IMREAD_GRAYSCALE)

In [42]:
# Check Files in Current Directory
current_path = os.getcwd()
file_list = os.listdir(current_path)

# Detect Keypoint

In [43]:
gftt = cv2.GFTTDetector_create()
keypoint1 = gftt.detect(img1, None)
# keypoint2 = gftt.detect(img2, None)
kp2_single = []
success_single = []
OpticalFlowSingleLevel(img1, img2, keypoint1, kp2_single, success_single);
# then test multi-level LK
kp2_multi = []
success_multi = []
t1 = time.perf_counter()
OpticalFlowMultiLevel(img1, img2, keypoint1, kp2_multi, success_multi, True);
t2 = time.perf_counter()
time_used = t2-t1
print('optical flow by gaussian-newton: ', time_used)


OpticalFlowSingleLevel
hello
OpticalFlowMultiLevel
build pyramid time:  0.0002851940007531084


TypeError: ignored

In [None]:

'''
    // use opencv's flow for validation
    vector<Point2f> pt1, pt2;
    for (auto &kp: kp1) pt1.push_back(kp.pt);
    vector<uchar> status;
    vector<float> error;
    t1 = chrono::steady_clock::now();
    cv::calcOpticalFlowPyrLK(img1, img2, pt1, pt2, status, error);
    t2 = chrono::steady_clock::now();
    time_used = chrono::duration_cast<chrono::duration<double>>(t2 - t1);
    cout << "optical flow by opencv: " << time_used.count() << endl;

    // plot the differences of those functions
    Mat img2_single;
    cv::cvtColor(img2, img2_single, CV_GRAY2BGR);
    for (int i = 0; i < kp2_single.size(); i++) {
        if (success_single[i]) {
            cv::circle(img2_single, kp2_single[i].pt, 2, cv::Scalar(0, 250, 0), 2);
            cv::line(img2_single, kp1[i].pt, kp2_single[i].pt, cv::Scalar(0, 250, 0));
        }
    }

    Mat img2_multi;
    cv::cvtColor(img2, img2_multi, CV_GRAY2BGR);
    for (int i = 0; i < kp2_multi.size(); i++) {
        if (success_multi[i]) {
            cv::circle(img2_multi, kp2_multi[i].pt, 2, cv::Scalar(0, 250, 0), 2);
            cv::line(img2_multi, kp1[i].pt, kp2_multi[i].pt, cv::Scalar(0, 250, 0));
        }
    }

    Mat img2_CV;
    cv::cvtColor(img2, img2_CV, CV_GRAY2BGR);
    for (int i = 0; i < pt2.size(); i++) {
        if (status[i]) {
            cv::circle(img2_CV, pt2[i], 2, cv::Scalar(0, 250, 0), 2);
            cv::line(img2_CV, pt1[i], pt2[i], cv::Scalar(0, 250, 0));
        }
    }

    cv::imshow("tracked single level", img2_single);
    cv::imshow("tracked multi level", img2_multi);
    cv::imshow("tracked by opencv", img2_CV);
    cv::waitKey(0);

    return 0;
}'''

array([1., 1.])

In [58]:
kp = cv2.KeyPoint(1,1,1)
kp.size

1.0