adding path for Jupyter Notebooks to recognize ffmpeg

In [1]:
import os
os.environ['PATH'] = '/usr/local/bin:' + os.environ['PATH']

checks path configuration

In [2]:
import shutil
shutil.which("ffmpeg")

'/usr/local/bin/ffmpeg'

check version

In [3]:
!ffmpeg -version

ffmpeg version 6.0 Copyright (c) 2000-2023 the FFmpeg developers
built with Apple clang version 14.0.3 (clang-1403.0.22.14.1)
configuration: --prefix=/usr/local/Cellar/ffmpeg/6.0_1 --enable-shared --enable-pthreads --enable-version3 --cc=clang --host-cflags= --host-ldflags= --enable-ffplay --enable-gnutls --enable-gpl --enable-libaom --enable-libaribb24 --enable-libbluray --enable-libdav1d --enable-libjxl --enable-libmp3lame --enable-libopus --enable-librav1e --enable-librist --enable-librubberband --enable-libsnappy --enable-libsrt --enable-libsvtav1 --enable-libtesseract --enable-libtheora --enable-libvidstab --enable-libvmaf --enable-libvorbis --enable-libvpx --enable-libwebp --enable-libx264 --enable-libx265 --enable-libxml2 --enable-libxvid --enable-lzma --enable-libfontconfig --enable-libfreetype --enable-frei0r --enable-libass --enable-libopencore-amrnb --enable-libopencore-amrwb --enable-libopenjpeg --enable-libspeex --enable-libsoxr --enable-libzmq --enable-libzimg --disable-l

#### install ipywidgets to play videos with dropdown

In [4]:
!pip install ipywidgets



#### install pandas and tabulate

In [5]:
!pip install pandas
!pip install tabulate



FFmpeg installation for Coursera. Downloads a static build of FFmpeg, unzips it, and adds the directory to the environment variable “PATH”.

credit: code from lab "Exercise 18. Encoding audio with ffmpeg"

https://www.coursera.org/learn/uol-cm3065-intelligent-signal-processing/ungradedLab/Dp6la/9-108-exercise-18-encoding-audio-with-ffmpeg/lab?path=%2Fnotebooks%2FExercises%2FExercise%252018.%2520Encoding%2520audio%2520with%2520ffmpeg.ipynb

In [6]:
# Download latest FFmpeg static build.  
exist = !which ffmpeg
if not exist:
  !curl https://johnvansickle.com/ffmpeg/releases/ffmpeg-release-amd64-static.tar.xz -o ffmpeg.tar.xz \
     && tar -xf ffmpeg.tar.xz && rm ffmpeg.tar.xz
  ffmdir = !find . -iname ffmpeg-*-static
  path = %env PATH
  path = path + ':' + ffmdir[0]
  %env PATH $path

!which ffmpeg

/usr/local/bin/ffmpeg


The application will analyze the following videos:
- Cosmos_War_of_the_Planets.mp4
- Last_man_on_earth_1964.mov
- The_Gun_and_the_Pulpit.avi
- The_Hill_Gang_Rides_Again.mp4
- Voyage_to_the_Planet_of_Prehistoric_Women.mp4

TRIED: cmd = f'ffmpeg -i {file_path} -c:v h264 -c:a aac -r 25 -s 640x360 -b:v 3M -b:a 256k {output_file_path}'
WORKED: cmd = f'ffmpeg -i {file_path} -c:v h264 -pix_fmt yuv420p -profile:v high -c:a aac -r 25 -s 640x360 -b:v 3M -b:a 256k {output_file_path}'

by addint pix_fmt yuv420p and -profile:v high fix issue with mov file conversion: 

By converting the video to H.264 with the -pix_fmt yuv420p and -profile:v high settings, you transformed it from a format that's mainly for professional use (ProRes) to one that's designed for broad compatibility (H.264) with widely accepted parameters (yuv420p and high profile). This is why after the conversion, the video was able to be played back successfully on your device or software.

<!-- 
- "pix_fmt yuv420p" This sets the pixel format of the video to yuv420p. The yuv420p format is one of the most common pixel formats used in video. In this format, color images are represented in the YUV color space, with 4:2:0 chroma subsampling. This essentially means that for every 4 Y (luminance) values, there are 2 U and 2 V (chrominance) values. This helps in reducing the size of the video without significant loss in visual quality.

- "-profile:v high:" This sets the profile for the h264 codec to high. The profile determines the set of features or tools that can be used in encoding the video. The high profile allows for a higher quality and more features compared to other profiles but might not be supported by older devices.
    try: -->

In [7]:
import os
import subprocess
import json
from typing import Dict, List, Union, NewType, Tuple
from tabulate import tabulate
import os
import ipywidgets as widgets
from IPython.display import display, clear_output, Video
from concurrent.futures import ThreadPoolExecutor

# Constants

# Allowed video extensions for processing
VIDEO_EXTENSIONS = ('.mp4', '.avi', '.mkv', '.flv', '.mov')

# Directory to store report files
REPORT_DIR = "folder1"

# Directory to store output files after processing
OUTPUT_DIR = "folder2"

# Pre-defined properties for festival videos
FESTIVAL_PROPERTIES = {
    'format': 'mp4',  # Target video format
    'video_codec': 'h264',  # Target video codec
    'audio_codec': 'aac',  # Target audio codec
    'frame_rate': 25,  # Target frames per second
    'resolution': (640, 360),  # Target resolution width x height
    'aspect_ratio': '16:9',  # Target aspect ratio
    'bit_rate_range': (2, 5),  # Acceptable video bit rate range in Mbps
    'audio_bit_rate': 256,  # Acceptable audio bit rate in kbps
    'audio_channels': 'stereo'  # Target audio channels
}



Stream = NewType('Stream', Dict[str, Union[int, str]])
VideoData = NewType('VideoData', Dict[str, Union[str, List[Stream]]])
AudioProperties = NewType('AudioProperties', Dict[str, Union[str, int]])
VideoProperties = NewType('VideoProperties', Dict[str, Union[str, int, float]])
GeneralProperties = NewType('GeneralProperties', Dict[str, Union[str, int, float]])


def create_directory(directory_name: str) -> None:
    """Creates a directory if it doesn't exist."""
    if not os.path.exists(directory_name):
        os.makedirs(directory_name)  # Use os.makedirs to create the directory

def safe_frame_rate_eval(frame_rate_str: str) -> float:

    numerator, denominator = map(int, frame_rate_str.split('/'))
    
    return numerator / denominator if denominator != 0 else 0.0



def get_video_stream_properties(video_data: VideoData) -> VideoProperties:

    video_stream = next(s for s in video_data['streams'] if s['codec_type'] == 'video')

    width = video_stream['width']
    height = video_stream['height']
    aspect_ratio = f"{width}:{height}"  # Calculate the aspect ratio

    # Calculate the video bit rate as video size divided by duration
    duration = float(video_data['format']['duration'])
    video_size = float(video_data['format']['size'])
    # video_bit_rate = (video_size * 8) / (duration * 1e6)  # in Mbps
    video_bit_rate = (video_size / 1e6 * 8) / duration  # Convert bytes to Mbits and then divide by duration

    # Extract and return the key properties of the video stream, including calculated bit rate
    properties = {
        'video_codec': video_stream['codec_name'],
        'frame_rate': safe_frame_rate_eval(video_stream['avg_frame_rate']),
        'resolution': (width, height),
        'aspect_ratio': aspect_ratio,
    }
    
    # Include calculated bit rate if available in the video properties
    if 'bit_rate' in video_stream:
        properties['bit_rate'] = f"{video_bit_rate:.2f} Mbps"


    return properties


def calculate_aspect_ratio_change(properties: GeneralProperties, new_properties: GeneralProperties) -> str:
    aspect_ratio = properties.get('aspect_ratio')
    new_aspect_ratio = new_properties.get('aspect_ratio')
    
    if aspect_ratio != new_aspect_ratio:
        return f'aspect_ratio: {aspect_ratio} -> {new_aspect_ratio}'
    else:
        return ''


def get_audio_stream_properties(video_data: VideoData) -> AudioProperties:
    audio_stream = next(s for s in video_data['streams'] if s['codec_type'] == 'audio')
    bit_rate = int(audio_stream['bit_rate']) if audio_stream['bit_rate'].isdigit() else 0
    return {
        'audio_codec': audio_stream['codec_name'],
        'audio_channels': 'stereo' if audio_stream['channels'] == 2 else 'other',
        'audio_bit_rate': f"{bit_rate / 1000} kb/s"  # Convert to kbps with units
    }


def check_video_properties(file_path: str) -> GeneralProperties:
    """Fetch properties of a video file using ffprobe."""
    # Command to get video metadata using ffprobe
    cmd = f'ffprobe -v quiet -print_format json -show_format -show_streams {file_path}'
    
    try:
        # Suppressing subprocess output and errors using os.devnull
        with open(os.devnull, 'w') as fnull:
            result = subprocess.check_output(cmd, shell=True, stderr=fnull)
        
        # Convert the result bytes to dictionary
        video_data = json.loads(result)

        # Extract and assemble video and audio properties from the data
        properties = {
            'format': video_data['format']['format_name'],  # Format name of the video
            # 'bit_rate': int(video_data['format']['bit_rate']) / 1e6  # Convert bitrate from bits to Mbps
            'bit_rate': int(video_data['format']['bit_rate']) / 1e6  # Convert from bps to Mbps
        }
        # Add the video stream properties to the properties dictionary
        properties.update(get_video_stream_properties(video_data))
        # Add the audio stream properties to the properties dictionary
        properties.update(get_audio_stream_properties(video_data))

    except subprocess.CalledProcessError:
        raise ValueError(f"Unable to fetch properties for {file_path}. It might not be a valid video file.")
    
    return properties


def validate_properties(properties: GeneralProperties) -> List[str]:
    """Validate video properties against festival requirements."""
    problematic_fields = []

    
    # 1. Validate video format
    if FESTIVAL_PROPERTIES['format'] not in properties.get('format'):
        problematic_fields.append('format')


    # 2. Validate video codec
    if properties.get('video_codec') != FESTIVAL_PROPERTIES['video_codec']:
        problematic_fields.append('video_codec')

    # 3. Validate audio codec
    if properties.get('audio_codec') != FESTIVAL_PROPERTIES['audio_codec']:
        problematic_fields.append('audio_codec')

    # 4. Validate frame rate
    if properties.get('frame_rate') != FESTIVAL_PROPERTIES['frame_rate']:
        problematic_fields.append('frame_rate')

       
    # 5. Validate resolution
    if properties.get('resolution') != FESTIVAL_PROPERTIES['resolution']:
        problematic_fields.append('resolution')
    else:
        # Set the aspect_ratio property to the correct aspect ratio value
        properties['aspect_ratio'] = FESTIVAL_PROPERTIES['aspect_ratio']


    # 6. Validate video bit rate
    bit_rate = float(properties.get('bit_rate', "0 Mbps").split(" ")[0])  # Extract number from "x.xx Mbps"
    low, high = FESTIVAL_PROPERTIES['bit_rate_range']
    if not (low <= bit_rate <= high):
        problematic_fields.append('bit_rate')

    # 7. Validate audio bit rate
    raw_audio_bit_rate = properties.get('audio_bit_rate', '0 kb/s')  # Assume kb/s as default if not found

    if isinstance(raw_audio_bit_rate, str) and "kb/s" in raw_audio_bit_rate:
        audio_bit_rate = float(raw_audio_bit_rate.split(" ")[0])  # Extract the number before the space
    else:
        audio_bit_rate = float(raw_audio_bit_rate)  # If it's already a float

    if audio_bit_rate > FESTIVAL_PROPERTIES['audio_bit_rate']:
        problematic_fields.append('audio_bit_rate')



    # 8. Validate audio channels
    if properties.get('audio_channels') != FESTIVAL_PROPERTIES['audio_channels']:
        problematic_fields.append('audio_channels')

    return problematic_fields




def convert_video(file_path: str) -> str:
    """Convert video to meet festival requirements using ffmpeg."""
    # Determine the output file name based on the input file name
    base_name = os.path.basename(file_path)
    output_file_name = os.path.splitext(base_name)[0] + '_formatOK.mp4'
    output_file_path = os.path.join(OUTPUT_DIR, output_file_name)

    # If the output file already exists, remove it
    if os.path.exists(output_file_path):
        os.remove(output_file_path)

    # Command to convert the video using ffmpeg
    cmd = f'ffmpeg -i {file_path} -c:v h264 -pix_fmt yuv420p -profile:v high -c:a aac -r 25 -s 640x360 -b:v 3M -b:a 256k {output_file_path}'


    try:
        # Suppressing subprocess output and errors using os.devnull
        with open(os.devnull, 'w') as fnull:
            subprocess.run(cmd, shell=True, stdout=fnull, stderr=fnull)
    except subprocess.CalledProcessError:
        raise ValueError(f"Failed to convert {file_path}")

    return output_file_path


def process_videos(directory: str) -> List[str]:
    """Process all videos in the given directory."""
    report = []

    # Iterate over all files in the directory
    for file in os.listdir(directory):
        # Filter only files with the defined video extensions
        if file.endswith(VIDEO_EXTENSIONS):
            file_path = os.path.join(directory, file)
            try:
                # Fetch video properties
                properties = check_video_properties(file_path)
                # Validate video properties against the festival's requirements
                problematic_fields = validate_properties(properties)

                # If there are problematic fields, convert the video
                if problematic_fields:
                    converted_video_path = convert_video(file_path)
                    new_properties = check_video_properties(converted_video_path)
                    new_problematic_fields = validate_properties(new_properties)
                    
                    # List changes between original and converted videos
                    changes = [f"{key}: {properties[key]} -> {new_properties[key]}" for key in problematic_fields]
                                        # Check if the aspect ratio has changed and add it to the changes list
                    if properties.get('aspect_ratio') != new_properties.get('aspect_ratio'):
                        changes.append(f"aspect_ratio: {properties['aspect_ratio']} -> {new_properties['aspect_ratio']}")
                    
                    new_data = ", ".join([f"{k}: {v}" for k, v in new_properties.items()])
                    
                    # Create a report message with old and new video properties
                    current_data = ", ".join([f"{k}: {v}" for k, v in properties.items()])
                    report_msg = f"{file} | {current_data} | {new_data} | {', '.join(changes)} | {os.path.basename(converted_video_path)}"

                    # Check if the converted video still has issues
                    if new_problematic_fields:
                        issues = ", ".join(new_problematic_fields)
                        report_msg += f" | Converted video issues: {issues}"
                        
                    report.append(report_msg)

            except ValueError as e:
                # Append errors related to video properties fetching to the report
                report.append(f"Error with {file}: {str(e)}")

    return report





def display_report(reports: List[str]) -> None:
    """Display the video processing report."""
    if not reports:
        print("All videos are in the specified format!")
        return

    problematic_videos = []

    # Open and write the report to a text file
    report_path = os.path.join(REPORT_DIR, "report.txt")
    with open(report_path, "w") as f:
        for report in reports:
            segments = report.split(" | ")

            # Ensure that the report has at least 5 segments
            if len(segments) < 5:
                print("Unexpected report format!")
                continue

            video_file = segments[0]
            current_data = segments[1]
            new_data = segments[2]
            changes = segments[3]
            new_file = segments[4]

            # Check if there's a sixth segment for conversion issues
            if len(segments) >= 6:
                conversion_issues = segments[5]
            else:
                conversion_issues = "No issues detected"

            # Format and display the report using tabulate
            data = [
                ["Video File", video_file],
                ["Current Data", current_data],

                ["New File", new_file],
                
                ["New Data", new_data],
                ["Changes", changes],
                ["Conversion Issues", conversion_issues]
            ]
            table_data = tabulate(data, tablefmt='plain')
            f.write(table_data + '\n\n')
            print(tabulate(data, tablefmt='grid') + '\n')

            # Track videos that still have issues after conversion
            if "Converted video issues:" in conversion_issues:
                problematic_videos.append(video_file)

    # Provide an overview after processing all videos
    if not problematic_videos:
        print("All files have been successfully converted and now conform to the festival's standards.")
    else:
        problematic_videos_str = ", ".join(problematic_videos)
        print(f"The following videos have issues: {problematic_videos_str}. Please review the report for more details.")

    print(f"Report saved to {report_path}")


def main():
    create_directory(REPORT_DIR)
    create_directory(OUTPUT_DIR)
    videos_directory = "video"
    reports = process_videos(videos_directory)
    display_report(reports)

if __name__ == "__main__":
    main()


# if __name__ == "__main__":
#     create_directory(REPORT_DIR)
#     create_directory(OUTPUT_DIR)
#     videos_directory = "video"
#     reports = process_videos(videos_directory)
#     display_report(reports)


+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Video File        | Last_man_on_earth_1964.mov                                                                                                                                                                                                          |
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Current Data      | format: mov,mp4,m4a,3gp,3g2,mj2, bit_rate: 11.24 Mbps, video_codec: prores, frame_rate: 23.976023976023978, resolution: (640, 360), aspect_ratio: 16:9, audio_codec: pcm_s16le, audio_channels: stereo, audio_bit_rate: 1536.0

#### creates dropdown with ipywidgets to run submitted videos

In [8]:


def create_video_player(video_files):
    """Creates and displays a video player with a dropdown list of the given video files."""
    # Extracting just the filenames for display in the dropdown
    file_names = [os.path.basename(video) for video in video_files]

    # Create a dropdown widget for video selection
    video_dropdown = widgets.Dropdown(
        options={name: path for name, path in zip(file_names, video_files)},
        description='Videos:'
    )

    # Create a button widget to initiate video playback
    play_button = widgets.Button(description="Play Video")

    # Output widget to display the video
    output = widgets.Output()

    # Define the function that will be called when the "Play Video" button is clicked
    def on_play_button_click(b):
        with output:
            clear_output(wait=True)
            display(Video(video_dropdown.value))

    # Link the button's click event to the defined function
    play_button.on_click(on_play_button_click)

    # Display the created widgets: dropdown, play button, and the output area
    display(video_dropdown, play_button, output)

# List of original video files
original_video_files = [
    "./video/Cosmos_War_of_the_Planets.mp4",
    "./video/Last_man_on_earth_1964.mov",
    "./video/The_Gun_and_the_Pulpit.avi",
    "./video/The_Hill_Gang_Rides_Again.mp4",
    "./video/Voyage_to_the_Planet_of_Prehistoric_Women.mp4",
]

# List of converted video files
converted_video_files = [
    "./folder2/Cosmos_War_of_the_Planets_formatOK.mp4",
    "./folder2/Last_man_on_earth_1964_formatOK.mp4",
    "./folder2/The_Gun_and_the_Pulpit_formatOK.mp4",
    "./folder2/The_Hill_Gang_Rides_Again_formatOK.mp4",
    "./folder2/Voyage_to_the_Planet_of_Prehistoric_Women_formatOK.mp4",
]

print("Original Videos:")
create_video_player(original_video_files)
print("\nConverted Videos:")
create_video_player(converted_video_files)


Original Videos:


Dropdown(description='Videos:', options={'Cosmos_War_of_the_Planets.mp4': './video/Cosmos_War_of_the_Planets.m…

Button(description='Play Video', style=ButtonStyle())

Output()


Converted Videos:


Dropdown(description='Videos:', options={'Cosmos_War_of_the_Planets_formatOK.mp4': './folder2/Cosmos_War_of_th…

Button(description='Play Video', style=ButtonStyle())

Output()

#### checks video properties without all extra printed by default command

In [9]:


def display_video_details(video_files, header):
    """
    Display video details using ffprobe for a list of video file paths.
    
    Args:
    - video_files (list): List of video file paths.
    - header (str): A title or header to display before showing video details.
    """
    
    def run_ffprobe(file_path):
        """Run ffprobe on a given video file and return details."""
        cmd = f'ffprobe -hide_banner {file_path} 2>&1'
        result = subprocess.check_output(cmd, shell=True)
        return file_path, result.decode('utf-8')

    print(header)  # Display the provided header
    print("-" * len(header))  # Display a separator line for the header

    with ThreadPoolExecutor(max_workers=len(video_files)) as executor:
        results = list(executor.map(run_ffprobe, video_files))

    for file_path, result in results:
        file_name = os.path.basename(file_path)
        print(f"\nResults for {file_name}:")
        print("-" * 80)
        print(result)
        print("=" * 80)

# Original video files
original_video_files = [
    "./video/Cosmos_War_of_the_Planets.mp4",
    "./video/Last_man_on_earth_1964.mov",
    "./video/The_Gun_and_the_Pulpit.avi",
    "./video/The_Hill_Gang_Rides_Again.mp4",
    "./video/Voyage_to_the_Planet_of_Prehistoric_Women.mp4",
]

# Converted video files
converted_video_files = [
    "./folder2/Cosmos_War_of_the_Planets_formatOK.mp4",
    "./folder2/Last_man_on_earth_1964_formatOK.mp4",
    "./folder2/The_Gun_and_the_Pulpit_formatOK.mp4",
    "./folder2/The_Hill_Gang_Rides_Again_formatOK.mp4",
    "./folder2/Voyage_to_the_Planet_of_Prehistoric_Women_formatOK.mp4",
]

# Display details for both sets of videos
display_video_details(original_video_files, "Original Videos")
display_video_details(converted_video_files, "Converted Videos")


Original Videos
---------------

Results for Cosmos_War_of_the_Planets.mp4:
--------------------------------------------------------------------------------
Input #0, mov,mp4,m4a,3gp,3g2,mj2, from './video/Cosmos_War_of_the_Planets.mp4':
  Metadata:
    major_brand     : mp42
    minor_version   : 0
    compatible_brands: mp42mp41
    creation_time   : 2021-08-02T19:15:48.000000Z
  Duration: 00:00:20.02, start: 0.000000, bitrate: 3315 kb/s
  Stream #0:0[0x1](eng): Video: h264 (Main) (avc1 / 0x31637661), yuv420p(progressive), 628x354 [SAR 1:1 DAR 314:177], 2989 kb/s, 29.97 fps, 29.97 tbr, 30k tbn (default)
    Metadata:
      creation_time   : 2021-08-02T19:15:48.000000Z
      handler_name    : ?Mainconcept Video Media Handler
      vendor_id       : [0][0][0][0]
      encoder         : AVC Coding
  Stream #0:1[0x2](eng): Audio: aac (LC) (mp4a / 0x6134706D), 48000 Hz, stereo, fltp, 317 kb/s (default)
    Metadata:
      creation_time   : 2021-08-02T19:15:48.000000Z
      handler_name   

# References:
- https://ipywidgets.readthedocs.io/en/stable/
- https://ipython.readthedocs.io/en/stable/api/generated/IPython.display.html
- https://docs.python.org/3/library/subprocess.html
- https://docs.python.org/3/library/concurrent.futures.html
- https://docs.python.org/3/library/os.html
- https://docs.python.org/3/
- https://ffmpeg.org/ffprobe.html
- https://ffmpeg.org/documentation.html
- https://pypi.org/project/tabulate/
- https://peps.python.org/pep-0484/
- https://docs.python.org/3/library/json.html