# Cut Detection with Lucas-Kanade Optical Flow

A script for detecting cuts using OpenCV's implementation of Lucas-Kanade optical flow. Comparing the optical flow of certain points between one frame and the next can tell us if there was a cut between the two frames, since we expect the optical flow between one shot and the next to be completely different (different objects, different scenery, etc.)

Adapted from this OpenCV tutorial: https://docs.opencv.org/3.4/d4/dee/tutorial_optical_flow.html

See https://docs.opencv.org/3.4/dc/d6b/group__video__track.html#ga473e4b886d0bcc6b65831eb88ed93323 for more information about the calcOpticalFlowPyrLK OpenCV implementation of Lucas-Kanade optical flow.

This script assumes that there is a `frames` folder with subfolders (e.g. `amadeus_frames`) containing film clip frames (e.g. `amadeus_00001.jpg`)  in the same directory as the script.

In [None]:
import cv2
import numpy as np

import os
import pickle

In [None]:
def load_rgb_img(img_path):
    """
    Takes in img_path, the file path to an image.
    Reads the image at that path, and returns the RGB image.
    """
    img_raw = cv2.imread(img_path)
    
    # OpenCV imread reads in images in BGR color space, so must convert from BGR to RGB
    img_rgb = cv2.cvtColor(img_raw, cv2.COLOR_BGR2RGB)
    
    return img_rgb

In [None]:
def detect_cuts_using_lucas_kanade_optical_flow(frames_dir_path, clip_name, err_threshold, num_votes_threshold):
    """
    Detects cuts between frames, using OpenCV's implementation of Lucas Kanade optical flow.
    
    This function is adapted from this OpenCV tutorial: https://docs.opencv.org/3.4/d4/dee/tutorial_optical_flow.html
    See https://docs.opencv.org/3.4/dc/d6b/group__video__track.html#ga473e4b886d0bcc6b65831eb88ed93323 for more information about the calcOpticalFlowPyrLK OpenCV implementation of Lucas-Kanade optical flow.
    
    Parameters:
    - frames_dir_path: the filepath to a folder with folders of film clip frames.
    - clip_name: the name of the film clip we wish to perform cut detection for.
    - err_threshold: a window should have err vector value above this err_threshold to contribute to a vote for this frame being part of a cut.
    - num_votes_threshold: proportion of the windows that should have err vector value above err_threshold for a cut to be detected between old_frame and frame.

    Returns a dictionary that stores cut detection results.
    Keys are frame numbers, and values are whether or not there is a cut between the previous frame and this frame, or between this frame and the next (0 if no cut; 1 if there is a cut).
    """
    # TODO: tweak?
    # Parameters for ShiTomasi corner detection
    feature_params = dict( maxCorners = 100,
                           qualityLevel = 0.3,
                           minDistance = 7,
                           blockSize = 7 )

    # TODO: tweak?
    # Parameters for Lucas-Kanade optical flow
    lk_params = dict( winSize  = (15, 15),
                      maxLevel = 2,
                      criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

    # Take first frame and find corners in it
    frame_num = 1
    old_frame_path = frames_dir_path + "/" + clip_name + "_frames/" + clip_name + "_{0:05d}".format(frame_num) + ".jpg"
    old_frame = load_rgb_img(old_frame_path)

    # Must convert image to gray in order to calculate Lucas-Kanade optical flow using OpenCV implementation
    old_gray = cv2.cvtColor(old_frame, cv2.COLOR_BGR2GRAY)
    
    # Decide what points to track
    p0 = cv2.goodFeaturesToTrack(old_gray, mask = None, **feature_params)
    
    # Dictionary that will store cut detection results using Lucas Kanade optical flow
    # Keys are frame numbers, and values are whether or not there is a cut between the previous frame and this frame, or between this frame and the next (0 if no cut; 1 if there is a cut).
    cut_label_dict = {}
    
    # For drawing optical flow onto the current frame
    color = np.random.randint(0, 255, (100, 3))   # create some random colors
    mask = np.zeros_like(old_frame)   # create a mask for drawing optical flow displacement between previous frames

    # While there is a next frame...
    frame_num = 2
    frame_path = frames_dir_path + "/" + clip_name + "_frames/" + clip_name + "_{0:05d}".format(frame_num) + ".jpg"
    while os.path.exists(frame_path):
        frame = load_rgb_img(frame_path)
        frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        # Calculate optical flow using Lucas-Kanade
        # p1: output vector of 2D points (with single-precision floating-point coordinates) containing the calculated new positions of input features in the second image.
        # status: output status vector; each element of the vector is set to 1 if the flow for the corresponding features has been found, otherwise, it is set to 0.
        # err: output vector of errors; each element of the vector is set to an error for the corresponding feature; if the flow wasn't found then the error is not defined (use the status parameter to find such cases).
        p1, status, err = cv2.calcOpticalFlowPyrLK(old_gray, frame_gray, p0, None, **lk_params)

        cut_detected = False # Assume no cut between old_frame and frame to begin with
        
        # Check if flow was detected for any features.
        if len(p1[status==1]) == 0:
            # No elements of status vector are set to 1, so no flow was detected for any of the features.
            # Assume that means that there was a cut between previous frame and current frame.
            cut_detected = True
            cut_label_dict[frame_num - 1] = 1
            cut_label_dict[frame_num] = 1
        else:
            num_votes = 0
            for error in err:
                # A window should have err vector value above err_threshold to contribute to a vote for a cut.
                if error > err_threshold:
                    num_votes += 1
            
            # > num_votes_threshold proportion of the windows should have err above err_threshold for a cut to be detected btwn old_frame and frame.
            if num_votes / len(err) > num_votes_threshold:
                cut_detected = True
                cut_label_dict[frame_num - 1] = 1
                cut_label_dict[frame_num] = 1
            else:
                cut_label_dict[frame_num] = 0

        if cut_detected:
            # print("Cut detected in", clip_name, "frame", frame_num)
            # Clear drawing mask of prior optical flow displacement lines for previous windows (points to track)
            mask = np.zeros_like(old_frame)
            
            # If current frame marks the beginning of a new shot, update the points to track based on new shot.
            good_new = cv2.goodFeaturesToTrack(frame_gray, mask = None, **feature_params)
            if good_new is None:
                 # Can't do anything with this frame, so move onto next frame.
                frame_num += 1
                frame_path = frames_dir_path + "/" + clip_name + "_frames/" + clip_name + "_{0:05d}".format(frame_num) + ".jpg"
                continue
            good_old = []
        else:
            # Select good points based on the points for which the flow has been found in current frame
            if p1 is not None:
                good_new = p1[status==1]
                good_old = p0[status==1]
            else:
                # Can't do anything with this frame, so move onto next frame.
                frame_num += 1
                frame_path = frames_dir_path + "/" + clip_name + "_frames/" + clip_name + "_{0:05d}".format(frame_num) + ".jpg"
                continue
                
        # Draw optical flow tracks on top of current frame
        if len(good_old) > 0:
            for i, (new, old) in enumerate(zip(good_new, good_old)):
                a, b = new.ravel()
                c, d = old.ravel()
                # Draw displacement as line from old optical flow point location to current optical flow point location
                mask = cv2.line(mask, (int(a), int(b)), (int(c), int(d)), color[i].tolist(), 2)
                # Draw circles for current optical flow point locations
                frame = cv2.circle(frame, (int(a), int(b)), 5, color[i].tolist(), -1)
        else:
            # If no old optical flow points for this frame, just draw circles for current optical flow points
            for i, new in enumerate(good_new):
                a, b = new.ravel()
                frame = cv2.circle(frame, (int(a), int(b)), 5, color[i].tolist(), -1)

        optical_flow_img = cv2.add(frame, mask)
        
        # If a directory for optical flow visualization does not currently exist, make that directory
        optical_flow_visualization_outer_dir_path = "lk_optical_flow_visualization_err_" + str(err_threshold) + "_votes_" + str(num_votes_threshold)
        if not os.path.isdir(optical_flow_visualization_outer_dir_path):
            ! mkdir {optical_flow_visualization_outer_dir_path}
            
        # If a directory for optical flow visualization for this clip does not currently exist, make that directory
        optical_flow_visualization_dir_path = optical_flow_visualization_outer_dir_path + "/" + clip_name + "_frames"
        if not os.path.isdir(optical_flow_visualization_dir_path):
            ! mkdir {optical_flow_visualization_dir_path}
        
        # Save visualization of optical flow on current frame
        # As the loop runs, this file will keep getting overridden; at the end, we'll just have the optical flow on the clip's last frame
        # OpenCV imwrite expects BGR image, so convert from RGB to BGR color space before using OpenCV's imwrite function to save it
        cv2.imwrite(optical_flow_visualization_dir_path + "/" + clip_name + "_{0:05d}".format(frame_num) + ".jpg", cv2.cvtColor(optical_flow_img, cv2.COLOR_RGB2BGR))

        # To set up for the next iteration of the loop, update the previous frame and previous points.
        old_gray = frame_gray.copy()
        p0 = good_new.reshape(-1, 1, 2)

        frame_num += 1
        frame_path = frames_dir_path + "/" + clip_name + "_frames/" + clip_name + "_{0:05d}".format(frame_num) + ".jpg"

    # Return dictionary of cut detection results, where keys are frame numbers.
    return cut_label_dict

**Run the block below to run cut detection on all the clip frames in the `frames` folder, using Lucas-Kanade optical flow.**

In [None]:
# Experiment with thresholds to find most optimal ones for cut detection using Lucas-Kanade.

# A window should have err vector value above this err_threshold to contribute to a vote for a cut.
# Error measure: L1 distance between patches around the original and a moved point, divided by number of pixels in a window.
err_thresholds = [10, 20, 25, 30, 40, 50]
# > 50%, 60%, 70%, 80%, 90% of the windows should have err above err_threshold for a cut to be detected btwn old_frame and frame.
num_votes_thresholds = [0.5, 0.6, 0.7, 0.8, 0.9]

# If a directory for cut label dictionaries detected using Lucas Kanade optical flow does not currently exist, make that directory
cut_dict_outer_dir_path = "cut_label_dicts_lk_optical_flow"
if not os.path.isdir(cut_dict_outer_dir_path):
    ! mkdir {cut_dict_outer_dir_path}

for err_threshold in err_thresholds:
    for num_votes_threshold in num_votes_thresholds:
        # Get list of all files and directories in frames directory
        frames_dir_path = "frames"
        clips_list = os.listdir(frames_dir_path)

        # For each clip's frames folder...
        for clip_folder in clips_list:
            if clip_folder != ".DS_Store":
                clip_name = clip_folder.replace("_frames", "")

                # Detect cuts in the frames for current clip, using Lucas Kanade optical flow
                cut_label_dict = detect_cuts_using_lucas_kanade_optical_flow(frames_dir_path, clip_name, err_threshold, num_votes_threshold)

                # If a directory for cut label dictionaries detected using Lucas Kanade optical flow with this err_threshold and this num_votes_threshold pair does not currently exist, make that directory
                cut_dict_dir_path = cut_dict_outer_dir_path + "/cut_label_dicts_lk_optical_flow_err_" + str(err_threshold) + "_votes_" + str(num_votes_threshold)
                if not os.path.isdir(cut_dict_dir_path):
                    ! mkdir {cut_dict_dir_path}

                # Save (serialize) pickle of the Lucas Kanade optical flow cut detection result dictionary
                pickled_dict_filepath = cut_dict_dir_path + "/" + clip_name + "_frame_to_cut_dict" + ".pkl"
                with open(pickled_dict_filepath, "wb") as f:
                    pickle.dump(cut_label_dict, f)

                # Test loading (deserializing) pickled data to make sure pickled file saved correctly
                with open(pickled_dict_filepath, "rb") as f:
                    pass
                     # print(pickle.load(f))
