### Hein Lab Data Filtering Pipeline ###
**Tianyi Chen, January 2025**

Objective: Program for Abigail Grassick, Phd student at Hein Lab of Computational Ecology at Cornell University.
This program will be utilized to sift through thousands of clips of marine video data within zip files. The program will extract and filter the most accurate video data to train a computer vision model. 

The program will:

**Part 1**
1. Asks for then accesses a Zip File. Precondition: Zip File is filled with valid video files, all of which are titled (not untitled).
2. Sorts videos by length (largest file size to smallest file size).
3. Assigns index numbers to each video by creating a dataframe of video title to corresponding index number.
4. Creates a list with the 20 longest video *titles*.
5. Selects a random 20 videos from the remaining videos (not 20 longest).
6. Appends the 20 randomly chosen video *titles* to the list.

The returned list will be a list of 40 video *titles*

**Part 2**
1. Create a new folder and put those video files in it
2. Asks for user input for folder name

In [5]:
#import packages
import zipfile
#This library helps us access zip files
import os
#os helps us check file paths
import pandas as pd
#pandas will help us extract data
import shutil
#this will help us copy the tracks to a new folder

import argparse


In [6]:
#helper function for file path validation
def validatepath(zipfilepath):
    '''Validates if the user's provided path points to a zip file.
    Precondition: zipfilepath is a string and is the path to the zip file.
    Returns True if the path is valid and raises an error otherwise.'''
    
    #Check if the zipfile is valid: 
    if not os.path.exists(zipfilepath):  
        raise FileNotFoundError("Your file path does not exist.")
    elif not os.path.isfile(zipfilepath):
        raise IsADirectoryError("Unsupported type; your path leads to a folder.")
    elif not zipfilepath.endswith('.zip'):
        raise TypeError("Unsupported type; your path does not lead to a zip file.")
    return True

In [7]:
#helper function to ensure zip file is openable
def validatezip(zipfilepath):
    '''Ensures that the zip file pointed to by a path is valid and openable.
    Precondition: zipfilepath is a string, the path to the zip file.
    Returns a success statement and opens the zip file, or returns an error message.'''
    try:
        zipfile.ZipFile(zipfilepath, mode='r')
        return True
    except zipfile.BadZipfile:
        ("The zip file is a bad zip file.")
    except FileNotFoundError:
        print("The zip file doesn't exist.")
    except Exception as e:
        print("Error.")

A note about the zip file path:

The bite detection pipeline outputs videos in the following directory format "/videotitle/local/workdir/agg75/feeding_detection". In order to extract the videos from the final "feeding detection" directory, you can either un zip the file, remove this directory, rename and rezip the file or you can run the following code block to extract the videos and rezip the file.

In [None]:
!python extract-rename-rezip.py videotitle.zip

In [10]:
#Get user input set the string name of the zipfilepath
zipfilepath = input("Input zip file path:").strip() #use strip to take away blank spaces

#Open zip file if no other errors arise
if validatepath(zipfilepath) and validatezip(zipfilepath):
    openfile = zipfile.ZipFile(zipfilepath, mode='r')
else:
    print("Error.")

Input zip file path: /Users/abigailgrassick/Desktop/eco_scaling/bite_validation/processed/ground_truth_GX017103.zip


In [11]:
#sorting the videos by length. (WIP)
list_of_videonames = openfile.namelist()

#extract _MACOSX files, which are metadata files 
for videoname in list_of_videonames:
    if '__MACOSX' in videoname:
        list_of_videonames.remove(videoname)
    #take away the file path

list_of_videosizes = []

#create video sizes
for videoname in list_of_videonames:
    #extract the information from the videoname 
    info = openfile.getinfo(videoname)
    #extract the size from the information object
    list_of_videosizes.append(info.file_size)

#concatonate and transpose using {}
data_names_sizes = {'Video Names': list_of_videonames, 'Video Sizes': list_of_videosizes}

#create a dataframe with video names and sizes
df_names_sizes = pd.DataFrame(data_names_sizes)

In [12]:
#sort by size
df_sorted = df_names_sizes.sort_values(by='Video Sizes', ascending=False)

In [13]:
#first twenty tracks
twenty_largest_tracks = df_sorted.head(20)

In [14]:
#turn into list
track_list = list(twenty_largest_tracks['Video Names'])

#aggregate rest of the tracks

remaining_tracks = df_sorted.iloc[20:]
remaining_tracks = remaining_tracks['Video Names'].tolist()

#pick random 20 videos from remaining tracks
from random import sample, seed
seed(123)
samplevideos = sample(remaining_tracks, 20)

#add list of 20 random videos to track list
for video in samplevideos:
    track_list.append(video)

In [15]:
#clean names and create an 'uncleaned' copy

uncleaned_track_list = []

for i in range(0, len(track_list)):
    if '/' in track_list[i]:
        index = track_list[i].find('/')
        withmp4 = track_list[i][index + 1:]
        track_list[i] = withmp4
        uncleaned_track_list.append(withmp4)
            
    if '.mp4' in track_list[i]:
        index2 = track_list[i].find('.mp4')
        track_list[i] = track_list[i][: index2]

In [17]:
#create a new folder using os
# Specify the path for the new folder with user input
folder_path_input = input("Input the path for a new folder.").strip()

#Check if input already exists

folder_name_input = input("Input the name for the new folder.").strip()

try: 
    # Combine the folder path and folder name if it works properly
    full_path = os.path.join(folder_path_input, folder_name_input)
    folder = os.makedirs(full_path)
        
except FileExistsError:
    print("Error: The folder already exists.")
except Exception as e:
    print("Unexpected error.")

Input the path for a new folder. /Users/abigailgrassick/Desktop
Input the name for the new folder. test_output


Error: The folder already exists.


In [20]:
#add the track list of 40 videos to the new folder. 
for videoname in uncleaned_track_list:
    # Find matching files in the zip file
    matched_files = [f for f in openfile.namelist() if os.path.basename(f) == videoname]
    
    if matched_files:
        for match in matched_files:
            extracted_path = openfile.extract(match, full_path)
            final_destination = os.path.join(full_path, os.path.basename(match))
            
            if extracted_path != final_destination:  # Move the file if it's in a subdirectory
                shutil.move(extracted_path, final_destination)
            
            print(f"copied {match} to {final_destination}")
    else:
        print(f"File not found in zip: {videoname}")