In [1]:
# import av
import cv2
import numpy as np
import h5py

In [2]:
SOURCE_FILE = "output_video4.mp4"
OPTICAL_FLOW_OUTPUT_FILE_NPZ = "optical_flow.npz"
OPTICAL_FLOW_OUTPUT_FILE_H5 = "optical_flow.h5"

In [3]:
def draw_optical_flows(img, flow, step=16):
    h, w = img.shape[:2]
    y, x = (
        np.mgrid[step // 2 : h : step, step // 2 : w : step]
        .reshape(2, -1)
        .astype(int)
    )
    fx, fy = flow[y, x].T

    lines = np.vstack([x, y, x + fx, y + fy]).T.reshape(-1, 2, 2)
    lines = np.int32(lines + 0.5)

    # Draw motion vectors as lines
    for (x1, y1), (x2, y2) in lines:
        cv2.arrowedLine(img, (x1, y1), (x2, y2), (0, 255, 0), 1, tipLength=0.3)

    return img


def extract_optical_flows(video_path):
    cap = cv2.VideoCapture(video_path)

    ret, prev_frame = cap.read()
    prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        # Compute Farneback optical flow
        flow = cv2.calcOpticalFlowFarneback(
            prev_gray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0
        )

        # Draw motion vectors on the frame
        img_with_vectors = draw_optical_flows(frame, flow)

        # Show the image
        cv2.imshow("Motion Vectors", img_with_vectors)
        if cv2.waitKey(1) & 0xFF == ord("q"):
            break

        prev_gray = gray

    cap.release()
    cv2.destroyAllWindows()

In [4]:
def save_optical_flow(video_path, output_file):
    cap = cv2.VideoCapture(video_path)
    ret, prev_frame = cap.read()
    prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)

    flows = []

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        # Compute Farneback optical flow
        flow = cv2.calcOpticalFlowFarneback(
            prev_gray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0
        )

        # Store flow vectors
        flows.append(flow)

        prev_gray = gray

    cap.release()

    # Convert to numpy array and save
    np.savez(output_file, flows=np.array(flows))

    print(f"Optical flow saved to {output_file}")

In [5]:
def save_optical_flow_hdf5(video_path, output_file):
    cap = cv2.VideoCapture(video_path)
    ret, prev_frame = cap.read()
    prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)

    flows = []

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        # Compute Farneback optical flow
        flow = cv2.calcOpticalFlowFarneback(
            prev_gray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0
        )

        # Store flow vectors
        flows.append(flow)

        prev_gray = gray

    cap.release()

    # Save to HDF5
    with h5py.File(output_file, "w") as f:
        f.create_dataset("flows", data=np.array(flows))

    print(f"Optical flow saved to {output_file}")

In [5]:
def load_and_visualize_optical_flow(flow_file):
    data = np.load(flow_file)
    flows = data["flows"]

    for flow in flows:
        # Generate a blank image for visualization
        h, w = flow.shape[:2]
        blank_image = np.zeros((h, w, 3), np.uint8)

        # Draw motion vectors
        step = 16
        y, x = (
            np.mgrid[step // 2 : h : step, step // 2 : w : step]
            .reshape(2, -1)
            .astype(int)
        )
        fx, fy = flow[y, x].T

        lines = np.vstack([x, y, x + fx, y + fy]).T.reshape(-1, 2, 2)
        lines = np.int32(lines + 0.5)

        for (x1, y1), (x2, y2) in lines:
            cv2.arrowedLine(
                blank_image, (x1, y1), (x2, y2), (0, 255, 0), 1, tipLength=0.3
            )

        # Show the image
        cv2.imshow("Optical Flow", blank_image)
        if cv2.waitKey(1) & 0xFF == ord("q"):
            break

    cv2.destroyAllWindows()

In [None]:
def load_and_visualize_optical_flow_hdf5(flow_file):
    with h5py.File(flow_file, "r") as f:
        flows = f["flows"][:]

    for flow in flows:
        h, w = flow.shape[:2]
        blank_image = np.zeros((h, w, 3), np.uint8)

        step = 16
        y, x = (
            np.mgrid[step // 2 : h : step, step // 2 : w : step]
            .reshape(2, -1)
            .astype(int)
        )
        fx, fy = flow[y, x].T

        lines = np.vstack([x, y, x + fx, y + fy]).T.reshape(-1, 2, 2)
        lines = np.int32(lines + 0.5)

        for (x1, y1), (x2, y2) in lines:
            cv2.arrowedLine(
                blank_image, (x1, y1), (x2, y2), (0, 255, 0), 1, tipLength=0.3
            )

        cv2.imshow("Optical Flow", blank_image)
        if cv2.waitKey(1) & 0xFF == ord("q"):
            break

    cv2.destroyAllWindows()

In [6]:
# Display optical flow
extract_optical_flows(SOURCE_FILE)

In [7]:
# save optical flow to a file
save_optical_flow(SOURCE_FILE, OPTICAL_FLOW_OUTPUT_FILE_NPZ)

Optical flow saved to optical_flow.npz


In [None]:
# save optical flow to a file (HDF5)
save_optical_flow_hdf5(SOURCE_FILE, OPTICAL_FLOW_OUTPUT_FILE_H5)

In [8]:
# visualize optical flow from file
load_and_visualize_optical_flow(OPTICAL_FLOW_OUTPUT_FILE_NPZ)

In [None]:
# visualize optical flow from file (HDF5)
load_and_visualize_optical_flow_hdf5(OPTICAL_FLOW_OUTPUT_FILE_H5)

In [6]:
np.load("optical_flow.npz")


NpzFile 'optical_flow.npz' with keys: flows