In [6]:
import cv2
import numpy as np

# ---- INITIALIZATION ----
# Set up camera with lower resolution for better performance
cap = cv2.VideoCapture(0)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)

# Parameters for Shi-Tomasi corner detection - reduced for better performance
feature_params = dict(maxCorners=50,      # Reduced number of corners to track
                    qualityLevel=0.3,     # Minimum quality level for corner detection
                    minDistance=10,       # Increased minimum distance between corners
                    blockSize=7)          # Size of window for corner detection

# Parameters for Lucas-Kanade optical flow - simplified
lk_params = dict(winSize=(15, 15),        # Size of search window
               maxLevel=2,                # Number of pyramid levels
               criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

# ---- STEP 1: INITIALIZE FIRST FRAME ----
# Read initial frame
ret, old_frame = cap.read()
if not ret:
   print("Failed to grab first frame")
   exit()

# Reduce image size for faster processing
scale = 0.5
old_frame = cv2.resize(old_frame, None, fx=scale, fy=scale)
old_gray = cv2.cvtColor(old_frame, cv2.COLOR_BGR2GRAY)

# Detect initial points to track
p0 = cv2.goodFeaturesToTrack(old_gray, mask=None, **feature_params)
if p0 is None:
   print("No points detected in first frame")
   exit()

# Initialize mask for drawing tracks
mask = np.zeros_like(old_frame)

# ---- MAIN PROCESSING LOOP ----
while True:
   # STEP 2: CAPTURE NEW FRAME
   ret, frame = cap.read()
   if not ret:
       print("Failed to grab frame")
       break
   
   # Resize frame for faster processing
   frame = cv2.resize(frame, None, fx=scale, fy=scale)
   frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
   
   # STEP 3: CALCULATE OPTICAL FLOW
   # Calculate Lucas-Kanade optical flow for tracked points
   p1, st, err = cv2.calcOpticalFlowPyrLK(old_gray, frame_gray, p0, None, **lk_params)
   
   if p1 is not None:
       # Filter good points where flow was successfully calculated
       good_new = p1[st == 1]
       good_old = p0[st == 1]
       
       # Reset mask for each frame to avoid trail accumulation
       mask = np.zeros_like(frame)
       
       # STEP 4: VISUALIZATION
       # Draw motion tracks and current points
       for new, old in zip(good_new, good_old):
           a, b = new.ravel()
           c, d = old.ravel()
           # Draw line showing motion path (green)
           mask = cv2.line(mask, (int(a), int(b)), (int(c), int(d)), 
                         (0, 255, 0), 2)
           # Draw current point position (red)
           frame = cv2.circle(frame, (int(a), int(b)), 3, (0, 0, 255), -1)
       
       # Combine original frame with motion tracks
       lk_output = cv2.add(frame, mask)
       
       # STEP 5: DENSE OPTICAL FLOW (Optional - press 'd' to view)
       if cv2.waitKey(1) & 0xFF == ord('d'):
           # Calculate Farneback dense optical flow
           flow = cv2.calcOpticalFlowFarneback(old_gray, frame_gray, None,
                                             0.5, 3, 15, 3, 5, 1.2, 0)
           
           # Convert flow to color visualization
           hsv = np.zeros_like(frame)
           hsv[..., 1] = 255  # Set saturation to maximum
           mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
           hsv[..., 0] = ang * 180 / np.pi / 2  # Set hue according to flow direction
           hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)  # Set value according to flow magnitude
           dense_output = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
           cv2.imshow("Dense Optical Flow", dense_output)
       
       # Display sparse optical flow result
       cv2.imshow("Lucas-Kanade Optical Flow", lk_output)
       
       # STEP 6: UPDATE FOR NEXT FRAME
       old_gray = frame_gray.copy()
       p0 = good_new.reshape(-1, 1, 2)

   # Exit on 'q' press
   if cv2.waitKey(1) & 0xFF == ord('q'):
       break

# ---- CLEANUP ----
cap.release()
cv2.destroyAllWindows()

cv2.waitKey(1)

-1

In [None]:
# Create a mask for drawing Lucas-Kanade optical flow tracks
mask = np.zeros_like(old_frame)

while True:
   # Capture a new frame
   ret, frame = cap.read()
   
   if not ret:
       break
       
   frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
   
   # ---- Lucas-Kanade Optical Flow (Sparse) ----
   # Calculate optical flow using Lucas-Kanade for sparse feature points
   p1, st, err = cv2.calcOpticalFlowPyrLK(old_gray, frame_gray, p0, None, **lk_params)
   
   # Select good points (those successfully tracked)
   if p1 is not None:
       good_new = p1[st == 1]
       good_old = p0[st == 1]
   
       # Draw the tracks for Lucas-Kanade Optical Flow 
       for i, (new, old) in enumerate(zip(good_new, good_old)):
           a, b = new.ravel()
           c, d = old.ravel()
           mask = cv2.line(mask, (a, b), (c, d), (0, 255, 0), 2)
           frame = cv2.circle(frame, (a, b), 5, (0, 0, 255), -1)
   
       # Overlay the Lucas-Kanade tracks on the original frame
       lk_output = cv2.add(frame, mask)
   
       # ---- Dense Optical Flow ----
       # Calculate dense optical flow using Farneback method
       flow = cv2.calcOpticalFlowFarneback(old_gray, frame_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
   
       # Convert flow to HSV format for visualization
       hsv = np.zeros_like(frame)
       hsv[..., 1] = 255
   
       # Convert flow to polar coordinates to get the magnitude and angle
       mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
       hsv[..., 0] = ang * 180 / np.pi / 2  # Hue represents direction
       hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)  # Value represents magnitude
   
       # Convert HSV to BGR for visualization
       dense_output = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
   
       # Display the results
       cv2.imshow("Lucas-Kanade Optical Flow (Sparse)", lk_output)
       cv2.imshow("Dense Optical Flow", dense_output)
   
       # Update the previous frame and points for the next loop iteration
       old_gray = frame_gray.copy()
   
       if good_new is not None:
           p0 = good_new.reshape(-1, 1, 2)
   
   # Break the loop on 'q' key press
   if cv2.waitKey(1) & 0xFF == ord('q'):
       break

# Release the camera and close all windows
cap.release()
cv2.destroyAllWindows()