In [1]:
import cv2
import os
import shutil
from tqdm import tqdm
from collections.abc import Generator
import pathlib
import pandas as pd

## How to use:
- extract frames into folder in desired directory called "frames"
- label transition frames in folders called "holding_transitions" and "not_holding_transitions" in same directory
- these transition frames are "firsts" i.e. first frame in a row with contact or without
- set base_dir to the directory containing the three folders

In [10]:
def move_files_in_range(source_dir: str, destination_dir: str, start: int, end: int, suffix: str) -> None:
    """
    Move files with names in a specific numeric range from the source directory to the destination directory.

    Args:
        source_dir (str): The source directory containing the files to move.
        destination_dir (str): The destination directory where files will be moved to.
        start (int): The start of the numeric range (inclusive).
        end (int): The end of the numeric range (exclusive).
        suffix (str): Suffix: like 1238c.jpg
    """
    # make sure min and max are in right order
    tmp = start
    start = min(start, end)
    end = max(tmp, end)
    
    files_to_move = []
    for file_num in range(start, end):
        files_to_move.append(str(file_num) + "_" + suffix)

    for filename in files_to_move:
        source_path = os.path.join(source_dir, filename)
        destination_path = os.path.join(destination_dir, filename)
        shutil.copy(source_path, destination_path)


def create_pairs(hold: list[str], not_hold: list[str]) -> Generator[str, str]:
    """
    Generate pairs of filenames indicating transition points for holding actions.

    This function creates pairs of filenames from the hold and not-hold lists, representing the 
    start and end points of holding and not-holding actions. It is designed to manage frames from 
    a video where an object is repeatedly picked up and put down.

    Args:
        hold (list[str]): A list of filenames indicating the start of a holding action, sorted by frame order.
        not_hold (list[str]): A list of filenames indicating the start of a not-holding action, sorted by frame order.

    Yields:
        tuple[str, str]: A pair of filenames (start_frame, end_frame) indicating the transition points for holding actions.

    Example:
        Given lists: 
        hold = ['446_1238c.jpg', '2072_1238c.jpg']
        not_hold = ['897_1238c.jpg', '2785_1238c.jpg']
        This function will yield:
        ('0_anything.jpg', '446_1238c.jpg'), ('446_1238c.jpg', '897_1238c.jpg'),
        ('897_1238c.jpg', '2072_1238c.jpg'), ('2072_1238c.jpg', '2785_1238c.jpg')
    """
    # Starting with a dummy filename to pair with the first holding action
    previous = '0_anything.jpg'  
    
    # Iterate over each start and end frame simultaneously
    for start, end in zip(hold, not_hold):
        # Yield the pair: (previous, start of contact)
        yield previous, start

        # Yield the pair: (start of holding, start of non-contact)
        yield start, end
        
        # Update the 'previous' variable for the next iteration
        previous = end


def listdir_no_hidden(path: str) -> list[str]:
    """Same as os.listdir, but doesn't include hidden files (like .DSstore), returns list of the file, not the whole directory"""
    ls = os.listdir(path)
    return [item.split("/")[-1] for item in ls if not item.startswith('.')]


def move_frames_from_transitions(whole_dir: str, hold_template_dir: str, not_hold_template_dir: str, base_dir: str) -> None:
    """
    - Move frames between transition points to appropriate holding or not-holding directories.
    
    - This function is designed to manage frames from a video where an object is repeatedly picked up and put down. It uses the provided template directories to determine the start and end frames of each transition and then moves the frames to the appropriate output directories.
    
    - Strictly requires all frames in format num_suffix.png
    
    Args:
        whole_dir (str): The directory containing all frames to be sorted.
        hold_template_dir (str): Directory containing labeled frames indicating the start of contact.
        not_hold_template_dir (str): Directory containing labeled frames indicating the start non-contact.
        base_dir (str): The base directory where the sorted frames will be moved to.
    
    Note:
        The function expects filenames in the format of "xxxx_suffix.jpg", where xxxx is a numeric part.
        The suffix is used to filter and move the right files. The suffix is determined based on the files 
        in the hold_template_dir.
    """
    # Get all template files from the hold and not-hold directories, sorted by their numeric prefix.
    hold_template_files = sorted(listdir_no_hidden(hold_template_dir), key=lambda name: int(name.split("_")[0]))
    not_hold_template_files = sorted(listdir_no_hidden(not_hold_template_dir), key=lambda name: int(name.split("_")[0]))
    
    # Extract the suffix (with the file extension) from the first hold template file to use as a filter.
    suffix = hold_template_files[0].split("_")[-1]
    
    # Define the output directories for holding and not-holding frames.
    output_holding = os.path.join(base_dir, "holding")
    output_not_holding = os.path.join(base_dir, "not_holding")
    
    # Create the output directories if they do not exist.
    os.makedirs(output_holding, exist_ok=True)
    os.makedirs(output_not_holding, exist_ok=True)
    
    # assume first frame till first transition point is not holding
    holding = False
    
    # Process each pair of start and end frames.
    with tqdm(total=len(hold_template_files) + len(not_hold_template_files) + 1) as progress_bar:
        for start, end in create_pairs(hold_template_files, not_hold_template_files):
            # Choose the appropriate output directory based on the holding flag.
            if holding:
                current_output_dir = output_holding
            else:
                current_output_dir = output_not_holding
            
            # Move the frames that fall between the current start and end points to the chosen directory.
            move_files_in_range(whole_dir, current_output_dir, int(start.split('_')[0]), int(end.split('_')[0]), suffix)
            
            # Toggle the holding flag for the next iteration.
            holding = not holding  
            
            # Update the progress bar.
            progress_bar.update(1)

        # After all pairs are processed, handle the frames after the last transition point.
        last_file_number = int(sorted(listdir_no_hidden(hold_template_dir), key=lambda name: int(name.split("_")[0]))[-1].split("_")[0])
        
        move_files_in_range(whole_dir, current_output_dir, int(end.split('_')[0]), last_file_number, suffix)
        progress_bar.update(1)
        

def transition_dataframe(directory: str, just_num: bool = True, save: bool = False) -> pd.DataFrame:
    """
    Create a Pandas DataFrame with subdirectory names as columns and non-hidden file names, which are changepoints as rows.

    Args:
        directory (str): The path to the main directory containing "holding_transitions"
                        and "not_holding_transitions" subdirectories.
        just_num (bool): If True, index of transition kept, not whole filename
        save (bool): if True, saves to directory in file called "changepoints.csv"

    Returns:
        pd.DataFrame: A DataFrame with subdirectory names as columns and non-hidden file names as rows.
    """
    # Initialize an empty dictionary to store data
    data = {}

    # List of subdirectories to look for
    subdirectories = ("holding_transitions", "not_holding_transitions")

    # Iterate through the subdirectories
    for subdirectory in subdirectories:
        # Get the full path to the subdirectory
        subdirectory_path = os.path.join(directory, subdirectory)

        # Check if the subdirectory exists
        if os.path.exists(subdirectory_path) and os.path.isdir(subdirectory_path):
            # List non-hidden files in the subdirectory
            files = [f for f in os.listdir(subdirectory_path) if not f.startswith('.')] # avoid hidden files starting with "."
            if just_num:
                files = [int(f.split("_")[0]) for f in files]
            
            files.sort()
            # Add the file names to the data dictionary with subdirectory name as key
            data[subdirectory] = files

    # Create a DataFrame from the data dictionary
    df = pd.DataFrame(data)
    
    if save:
        df.to_csv(directory + "/changepoints.csv")
    return df

In [10]:
base_dir = "/Users/NoahRipstein/Downloads/shafee group stuff/sr labels/sh1"
move_frames_from_transitions(base_dir + "/frames", base_dir + "/holding_transitions", base_dir + "/not_holding_transitions", base_dir)

/Users/NoahRipstein/Downloads/shafee group stuff/sr labels


100%|██████████| 45/45 [00:05<00:00,  7.82it/s]


In [11]:
base = "/Users/NoahRipstein/Downloads/shafee group stuff"
dirs = (base + "/sr labels/sh1", base + "/sr labels/sh2", base + "/sr labels/sh3", base + "/sr labels/sh4", base + "/sv labels/sv1_frames", base + "/sv labels/sv2_frames", base + "/sv labels/sv3_frames", base + "/sv labels/sv4_frames")
for dir in dirs:
    move_frames_from_transitions(dir + "/frames", dir + "/holding_transitions", dir + "/not_holding_transitions", dir)
    transition_dataframe(dir, True, True)

100%|██████████| 45/45 [00:05<00:00,  8.10it/s]
100%|██████████| 11/11 [00:01<00:00,  8.13it/s]
100%|██████████| 33/33 [00:04<00:00,  8.16it/s]
100%|██████████| 11/11 [00:01<00:00,  7.48it/s]
100%|██████████| 39/39 [00:06<00:00,  6.14it/s]
100%|██████████| 11/11 [00:01<00:00,  6.66it/s]
100%|██████████| 15/15 [00:02<00:00,  6.40it/s]
100%|██████████| 19/19 [00:03<00:00,  6.23it/s]


# EXTRACTING FRAMES
# EVERYTHING ABOVE HERE CAN BE DONE BY streamlit_preprocess.py frontend

In [ ]:
def vert_flip(input_video_path: str, output_video_path: str = None) -> None:
    """saves a copy of a video but vertically flipped"""
    if output_video_path is None:
        name, extension = os.path.splitext(input_video_path)
        output_video_path = name + "_flip" + extension
    # Open the input video file
    cap = cv2.VideoCapture(input_video_path)

    # Check if the video file was opened successfully
    if not cap.isOpened():
        print("Error: Could not open input video file.")
        return

    # Get the video's width, height, and frames per second (fps)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # Define the codec and create VideoWriter object to save the output video
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

    with tqdm(total=total_frames, desc="Flipping Video") as pbar:
        while True:
            ret, frame = cap.read()  # Read a frame from the input video
    
            if not ret:
                break  # Break the loop if we have reached the end of the video
    
            # Flip the frame upside down
            flipped_frame = cv2.flip(frame, 0)
    
            # Write the flipped frame to the output video
            out.write(flipped_frame)
            
            # Update the progress bar
            pbar.update(1)

    # Release the video objects
    cap.release()
    out.release()

    print(f"Video flipped and saved as {output_video_path}")
    

def extract_frames(video_path: str, output_folder: str, suffix: str = None) -> None:
    """
    Extract frames from a video and save them as image files.

    Args:
        video_path (str): The path to the input video file.
        output_folder (str): The path to the folder where extracted frames will be saved.
        suffix (str): string containing suffix (excluding file extension) for frame file after frame number  

    Returns:
        None

    This function takes an input video file and extracts its frames, saving each frame as a separate
    image file in the specified output folder. The image files are named sequentially, starting from
    'frame_0.jpg' and incrementing for each subsequent frame.
    """
    # Ensure that the output folder exists; create it if it doesn't.
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    # Open the video file for reading.
    cap = cv2.VideoCapture(video_path)
    
    # Initialize a counter to keep track of the frame number (full accuracy).
    frame_count = 0
    
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) # frame count (sometimes makes errors) for progress bar
    
    if suffix is None:
        # Split the tail again to get the second-to-last element
        path_components = output_folder.split("/")
        if len(path_components) >= 2:
            if path_components[-1] == "":
                suffix = path_components[-3]
            else:
                suffix = path_components[-2]
            print(f"suffix: {suffix}")
        else:
            raise KeyError("Path does not contain a second last element.")
    
    with tqdm(total=total_frames, desc="Flipping Video") as pbar:
        # Loop through the video frames until there are no more frames to read.
        while True:
            ret, frame = cap.read()
            
            # If there are no more frames, break out of the loop.
            if not ret:
                break
            
            # Define the path to save the current frame as an image file.
            frame_path = os.path.join(output_folder, f'{frame_count}_{suffix}.jpg')
            
            # Save the current frame as an image file.
            cv2.imwrite(frame_path, frame)
            
            # Increment the frame count for the next frame.
            frame_count += 1
            
            # Update the progress bar
            pbar.update(1)
        
    # Release the video file.
    cap.release()
    print(output_folder)
    print(os.path.join(output_folder, "holding_transitions"))
    os.makedirs(os.path.join(pathlib.Path(output_folder).parent, "holding_transitions"), exist_ok=True)
    os.makedirs(os.path.join(pathlib.Path(output_folder).parent, "not_holding_transitions"), exist_ok=True)
    
# modify strings with input and output for different files
output_dir = "media/split Shafee vids/sh3/frames/"
extract_frames("media/split Shafee vids/sh3_zoom_flip.mp4", output_dir)
print(f"{len(os.listdir(output_dir))} frames extracted")