This script looks for GoPro video files in AWS and concatenates them using the "dropID" part of the Key as its filename

# Requirements

In [None]:
# # Install ffmpeg if not installed already
# !conda install ffmpeg -c conda-forge -y

In [6]:
import subprocess
import logging
import os
import time
import boto3
import ffmpeg
import pandas as pd
from typing import List, Tuple, Iterator, Optional
from dataclasses import dataclass
from pathlib import Path
from botocore.exceptions import ClientError
from tqdm import tqdm
import getpass

# Configure logging with a more detailed format
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

@dataclass
class AWSCredentials:
    access_key_id: str
    secret_access_key: str
    
    @classmethod
    def from_user_input(cls) -> 'AWSCredentials':
        """Securely prompt user for AWS credentials."""
        access_key = getpass.getpass("Enter AWS Access Key ID: ")
        secret_key = getpass.getpass("Enter AWS Secret Access Key: ")
        return cls(access_key, secret_key)

class S3Client:
    def __init__(self, credentials: Optional[AWSCredentials] = None):
        self.client = self._initialize_client(credentials)

    def _initialize_client(self, credentials: Optional[AWSCredentials]) -> boto3.client:
        """Initialize S3 client with credentials from env vars, provided credentials, or user input."""
        if credentials is None:
            # Try environment variables first
            access_key = os.getenv("AWS_ACCESS_KEY_ID")
            secret_key = os.getenv("AWS_SECRET_ACCESS_KEY")
            
            if not access_key or not secret_key:
                logger.info("AWS credentials not found in environment variables. Please enter them manually.")
                credentials = AWSCredentials.from_user_input()
            else:
                credentials = AWSCredentials(access_key, secret_key)

        try:
            client = boto3.client(
                "s3",
                aws_access_key_id=credentials.access_key_id,
                aws_secret_access_key=credentials.secret_access_key,
            )
            # Test the credentials by making a simple API call
            client.list_buckets()
            logger.info("Successfully authenticated with AWS")
            return client
        except ClientError as e:
            logger.error("Failed to authenticate with AWS")
            if "InvalidAccessKeyId" in str(e) or "SignatureDoesNotMatch" in str(e):
                logger.error("Invalid credentials provided. Please try again.")
                credentials = AWSCredentials.from_user_input()
                return self._initialize_client(credentials)
            raise

    def list_objects(self, bucket: str, prefix: str = "", suffix: str = "") -> Iterator[dict]:
        """List objects in an S3 bucket with optional prefix and suffix filtering."""
        paginator = self.client.get_paginator("list_objects_v2")
        
        for prefix_item in [prefix] if isinstance(prefix, str) else prefix:
            try:
                for page in paginator.paginate(Bucket=bucket, Prefix=prefix_item):
                    if "Contents" not in page:
                        continue
                    
                    for obj in page["Contents"]:
                        if obj["Key"].endswith(suffix):
                            yield obj
            except ClientError as e:
                logger.error(f"Error listing objects: {e}")
                raise

    def download_file(self, bucket: str, key: str, filename: Path, version_id: Optional[str] = None) -> None:
        """Download a file from S3 with progress tracking."""
        try:
            kwargs = {"Bucket": bucket, "Key": key}
            if version_id:
                kwargs["VersionId"] = version_id

            object_size = self.client.head_object(**kwargs)["ContentLength"]
            
            with tqdm(total=object_size, unit='B', unit_scale=True, desc=str(filename)) as pbar:
                self.client.download_file(
                    Bucket=bucket,
                    Key=key,
                    Filename=str(filename),
                    Callback=pbar.update
                )
        except ClientError as e:
            logger.error(f"Error downloading {key}: {e}")
            raise

class VideoProcessor:
    MOVIE_EXTENSIONS = {'.wmv', '.mpg', '.mov', '.avi', '.mp4', '.MOV', '.MP4'}
    
    def __init__(self, s3_client: S3Client, bucket: str):
        self.s3_client = s3_client
        self.bucket = bucket
        self.download_dir = Path("downloaded_movies")
        self.output_dir = Path("concatenated_videos")
        
        # Create necessary directories
        self.download_dir.mkdir(exist_ok=True)
        self.output_dir.mkdir(exist_ok=True)
        
        # Find and verify ffmpeg
        self.ffmpeg_path = self._find_ffmpeg()
        if not self.ffmpeg_path:
            raise RuntimeError(
                "ffmpeg not found. Please install ffmpeg:\n"
                "1. Download from https://github.com/BtbN/FFmpeg-Builds/releases\n"
                "2. Extract the zip file\n"
                "3. Add the bin folder to your system PATH or place ffmpeg.exe in your working directory"
            )

    def _find_ffmpeg(self) -> Optional[str]:
        """Find ffmpeg executable in various locations."""
        try:
            # Check if ffmpeg is in PATH
            result = subprocess.run(['ffmpeg', '-version'], 
                                 capture_output=True, 
                                 check=False)
            if result.returncode == 0:
                return 'ffmpeg'
        except FileNotFoundError:
            pass

        # Check common Windows locations
        possible_paths = [
            Path.cwd() / "ffmpeg.exe",  # Current directory
            Path.cwd() / "bin" / "ffmpeg.exe",  # bin subdirectory
            Path(os.getenv('PROGRAMFILES', '')) / "ffmpeg" / "bin" / "ffmpeg.exe",
            Path(os.getenv('PROGRAMFILES(X86)', '')) / "ffmpeg" / "bin" / "ffmpeg.exe",
        ]

        # Add conda environment path if running in conda
        conda_prefix = os.getenv('CONDA_PREFIX')
        if conda_prefix:
            possible_paths.append(Path(conda_prefix) / "Library" / "bin" / "ffmpeg.exe")

        for path in possible_paths:
            if path.exists():
                logger.info(f"Found ffmpeg at: {path}")
                return str(path)

        return None

    def verify_video_file(self, file_path: Path) -> bool:
        """Verify that a video file exists and has non-zero size."""
        try:
            if not file_path.exists():
                logger.error(f"Video file does not exist: {file_path}")
                return False
            
            size = file_path.stat().st_size
            if size == 0:
                logger.error(f"Video file is empty: {file_path}")
                return False
                
            # Try to read video metadata
            cmd = [
                self.ffmpeg_path,
                '-v', 'error',
                '-i', str(file_path),
                '-f', 'null',
                '-'
            ]
            result = subprocess.run(cmd, capture_output=True, text=True)
            if result.returncode != 0:
                logger.error(f"Invalid video file {file_path}: {result.stderr}")
                return False
                
            logger.info(f"Verified valid video file: {file_path} (size: {size/1024/1024:.2f} MB)")
            return True
            
        except Exception as e:
            logger.error(f"Error verifying video file {file_path}: {str(e)}")
            return False

    def concatenate_videos(self, video_paths: List[Path], output_path: Path, verify_videos: bool) -> bool:
        """Concatenate multiple videos using ffmpeg."""
        list_file = None  # Define outside try block so it's available in finally
        try:
            if verify_videos:
                # Verify all input files exist and are valid
                logger.info(f"Verifying {len(video_paths)} input videos...")
                valid_videos = []
                for path in video_paths:
                    if self.verify_video_file(path):
                        valid_videos.append(path)
                    else:
                        logger.error(f"Skipping invalid video: {path}")
                        
                if not valid_videos:
                    raise ValueError("No valid videos found to concatenate")
                
                if len(valid_videos) != len(video_paths):
                    logger.warning(f"Only {len(valid_videos)} out of {len(video_paths)} videos are valid")
            else:
                valid_videos = video_paths
                
            total_input_size = sum(path.stat().st_size for path in valid_videos)
            logger.info(f"Total input size: {total_input_size/1024/1024:.2f} MB")
            
            # Create a temporary file list for ffmpeg
            list_file = self.download_dir / "file_list.txt"
            with open(list_file, 'w', encoding='utf-8') as f:
                for path in valid_videos:
                    f.write(f"file '{path.absolute()}'\n")
            
            logger.info(f"Created concat list file at {list_file}")
            
            # Build ffmpeg command with more detailed error reporting
            cmd = [
                self.ffmpeg_path,
                '-v', 'error',  # Only show errors
                '-f', 'concat',
                '-safe', '0',
                '-i', str(list_file),
                '-c', 'copy',
                '-y',  # Overwrite output if exists
                str(output_path)
            ]
            
            logger.info(f"Running ffmpeg command: {' '.join(cmd)}")
            
            # Run the ffmpeg command
            start_time = time.time()
            result = subprocess.run(
                cmd,
                capture_output=True,
                text=True
            )
            
            if result.returncode != 0:
                logger.error(f"FFmpeg concatenation failed: {result.stderr}")
                return False
            
            # Verify the output file
            if not self.verify_video_file(output_path):
                logger.error("Output video verification failed")
                return False
            
            output_size = output_path.stat().st_size
            if output_size < total_input_size * 0.9:  # Allow for some variation due to metadata
                logger.error(f"Output file suspiciously small: {output_size/1024/1024:.2f} MB vs expected {total_input_size/1024/1024:.2f} MB")
                return False
            
            duration = time.time() - start_time
            logger.info(f"Concatenation completed in {duration:.1f} seconds")
            logger.info(f"Output file size: {output_size/1024/1024:.2f} MB")
            return True
            
        except Exception as e:
            logger.error(f"Error during video concatenation: {str(e)}")
            return False
        finally:
            # Clean up the temporary file list
            if list_file and list_file.exists():
                try:
                    list_file.unlink()
                except Exception as e:
                    logger.error(f"Error cleaning up list file: {str(e)}")

    def _process_single_drop(self, drop_data: pd.DataFrame, delete_originals: bool, test_mode: bool, verify_videos: bool) -> None:
        """Process a single drop's worth of videos."""
        downloaded_files = []
        output_path = None
        
        try:
            # Download and process files
            downloaded_files = self._download_videos(drop_data['Key'])
            
            # Sort files by name to ensure correct order
            downloaded_files.sort()
            logger.info(f"Processing files in order: {[f.name for f in downloaded_files]}")
            
            if verify_videos:
                # Verify files immediately after download
                valid_files = []
                for file_path in downloaded_files:
                    if self.verify_video_file(file_path):
                        valid_files.append(file_path)
                    else:
                        logger.error(f"Downloaded file is corrupted: {file_path}")
                
                if not valid_files:
                    raise RuntimeError("No valid video files available for processing")
            
            else:
                valid_files = downloaded_files            
            
            output_path = self._concatenate_drop_videos(valid_files, drop_data['DropID'].iloc[0], verify_videos)
            
            if not test_mode:
                self._upload_and_cleanup(output_path, drop_data, delete_originals)
        except Exception as e:
            logger.error(f"Error processing drop: {str(e)}")
            raise
        finally:
            # Ensure cleanup happens even if there's an error
            self._cleanup_files(downloaded_files, output_path)
            
    def _verify_ffmpeg(self) -> None:
        """Verify that ffmpeg is installed and accessible."""
        try:
            subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True)
            logger.info("ffmpeg installation verified successfully")
        except subprocess.SubprocessError:
            logger.error("ffmpeg is not installed or not accessible in system PATH")
            raise RuntimeError("ffmpeg is required but not found. Please install ffmpeg first.")
        except Exception as e:
            logger.error(f"Error verifying ffmpeg installation: {str(e)}")
            raise

    

    def process_gopro_videos(
        self,
        filtered_df: pd.DataFrame,
        delete_originals: bool = False,
        test_mode: bool = False,
        gopro_prefix: str = "GX",
        verify_videos: bool = False
    ) -> None:
        """Process GoPro videos by DropID."""
        for drop_id in filtered_df['DropID'].unique():
            drop_data = filtered_df[filtered_df['DropID'] == drop_id]
            
            if not all(str(name).startswith(gopro_prefix) for name in drop_data['fileName']):
                logger.warning(f"Skipping DropID {drop_id}: Not all videos start with {gopro_prefix}")
                continue

            try:
                self._process_single_drop(drop_data, delete_originals, test_mode, verify_videos)
            except Exception as e:
                logger.error(f"Error processing DropID {drop_id}: {str(e)}")
                continue

    def get_movies_df(self, prefix: str = "") -> pd.DataFrame:
        """Get DataFrame of movie files in S3 bucket with their sizes.    
        Args:
            prefix: Optional prefix to filter S3 objects            
        Returns:
            DataFrame with columns 'Key' and 'Size' (in bytes)
        """
        # Get all objects matching the prefix and movie extensions
        objects = self.s3_client.list_objects(
            self.bucket,
            prefix=prefix,
            suffix=tuple(self.MOVIE_EXTENSIONS)
        )
        
        # Extract both keys and sizes
        movie_data = [
            {
                'Key': obj['Key'],
                'Size': obj['Size']  # Size in bytes
            }
            for obj in objects
        ]
        
        return pd.DataFrame(movie_data)

    

    def _download_videos(self, keys: pd.Series) -> List[Path]:
        """Download all videos for a drop."""
        downloaded_files = []
        for key in keys:
            local_path = self.download_dir / Path(key).name
            self.s3_client.download_file(self.bucket, key, local_path)
            downloaded_files.append(local_path)
        return downloaded_files

    def _concatenate_drop_videos(self, video_paths: List[Path], drop_id: str, verify_videos: bool) -> Path:
        """Concatenate videos for a single drop."""
        output_path = self.output_dir / f"{drop_id}.mp4"
        if not self.concatenate_videos(video_paths, output_path, verify_videos):
            raise RuntimeError("Video concatenation failed")
        return output_path

    def _upload_and_cleanup(self, output_path: Path, drop_data: pd.DataFrame, delete_originals: bool) -> None:
        """Upload concatenated video and cleanup originals if requested."""
        new_key = f"{drop_data['SurveyID'].iloc[0]}/{drop_data['DropID'].iloc[0]}/{drop_data['DropID'].iloc[0]}.mp4"
        
        try:
            # Get file size for progress bar
            file_size = output_path.stat().st_size
        
            # Create a progress bar callback
            with tqdm(total=file_size, unit='B', unit_scale=True, desc=f"Uploading {output_path.name}") as pbar:
                def callback(bytes_transferred):
                    pbar.update(bytes_transferred - pbar.n)  # Update with the difference
                    
            # Upload concatenated file with progress tracking
            self.s3_client.client.upload_file(
                str(output_path),
                self.bucket,
                new_key,
                Callback=callback
            )
            logger.info(f"Successfully uploaded concatenated video to {new_key}")
            
            # Delete original files if requested
            if delete_originals:
                for key in drop_data['Key']:
                    self.s3_client.client.delete_object(Bucket=self.bucket, Key=key)
                    logger.info(f"Deleted original file {key}")
                    
        except Exception as e:
            logger.error(f"Error during upload of {new_key}: {str(e)}")
            raise

    @staticmethod
    def _cleanup_files(downloaded_files: List[Path], output_path: Optional[Path]) -> None:
        """Clean up local files."""
        for file_path in downloaded_files:
            if file_path.exists():
                file_path.unlink()
        
        if output_path and output_path.exists():
            output_path.unlink()

# Connect to s3

In [None]:
# Initialize the S3 client
s3_client = S3Client()

# Get info from go pro movies

In [7]:
# Create video processor
processor = VideoProcessor(s3_client, bucket="marine-buv")

# Get all movies available
movies_df = processor.get_movies_df(prefix="")

In [10]:
def get_filtered_movies_df(movies_df: pd.DataFrame, gopro_ext: str = "BNP") -> pd.DataFrame:
    """
    Filter movies DataFrame to remove DropID groups where a file has the same name as its DropID.
    
    Args:
        movies_df: DataFrame with Key column containing file paths
        gopro_ext: Prefix used to identify GoPro files
    
    Returns:
        Filtered DataFrame containing only valid GoPro groups
    """
    # Create the SurveyID, DropID, and fileName columns from the Key
    df = movies_df.assign(
        SurveyID=movies_df['Key'].str.split('/', expand=True)[0],
        DropID=movies_df['Key'].str.split('/', expand=True)[1],
        fileName=movies_df['Key'].str.split('/', expand=True)[2]
    )
    
    # Filter for GoPro movies
    go_pro_movies_df = df[df.fileName.str.startswith(gopro_ext)]
    
    # Remove .mp4 extension from fileName for comparison
    go_pro_movies_df['fileNameNoExt'] = go_pro_movies_df['fileName'].str.replace('.mp4', '')
    
    # Find DropIDs where any fileName (without extension) matches the DropID
    matching_dropids = go_pro_movies_df[
        go_pro_movies_df.apply(
            lambda row: row['fileNameNoExt'] == row['DropID'], 
            axis=1
        )
    ]['DropID'].unique()
    
    # Remove groups where DropID matches any fileName
    df_no_matching = go_pro_movies_df[~go_pro_movies_df['DropID'].isin(matching_dropids)]
    
    # Group by 'DropID' and count unique 'fileName' values for each group
    grouped_counts = df_no_matching.groupby('DropID')['fileName'].nunique()
    
    # Filter for 'DropID's with more than one unique 'fileName'
    filtered_dropids = grouped_counts[grouped_counts > 1].index
    
    # Filter the DataFrame to retain only rows with the filtered 'DropID's
    filtered_df = df_no_matching[df_no_matching['DropID'].isin(filtered_dropids)]
    
    # Drop the temporary fileNameNoExt column
    filtered_df = filtered_df.drop('fileNameNoExt', axis=1)
    
    return filtered_df

In [None]:
gopro_ext = "BNP"
filtered_df = get_filtered_movies_df(movies_df = movies_df, gopro_ext = gopro_ext)

with pd.option_context('display.max_rows', None, 'display.max_columns', None):
    print(filtered_df) 

In [None]:
processor.process_gopro_videos(
    filtered_df=filtered_df,
    delete_originals=False,
    test_mode=False,
    gopro_prefix= gopro_ext,
    verify_videos=False
)