<a href="https://colab.research.google.com/github/whatmakeart/ffmpeg-audio-mosh/blob/main/ffmpeg_audio_filter_datamosh.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Install necessary packages
!apt-get install -y ffmpeg
!pip install imageio
!pip install ipywidgets tqdm
!jupyter nbextension enable --py widgetsnbextension

import os
import subprocess
import shutil
from google.colab import files
from IPython.display import display, clear_output
import ipywidgets as widgets
import matplotlib.pyplot as plt
import imageio.v2 as imageio  # Updated import statement
from math import ceil
from tqdm.notebook import tqdm

# Updated list of audio filters available in FFmpeg on Colab
filters = [
    ('Echo', 'aecho=0.8:0.9:1000:0.3'),
    ('Gate', 'agate'),
    ('Phaser', 'aphaser'),
    ('Reverse', 'areverse'),
    ('Tempo', 'atempo=0.8'),
    ('Bandpass', 'bandpass=f=1000'),
    ('Bandreject', 'bandreject=f=1000'),
    ('Bass Boost', 'bass=g=3:f=110:w=0.6'),
    ('Chorus', 'chorus=0.5:0.5:1:0.1:1:2'),
    ('Compand', 'compand'),
    ('Dynamic Normalize', 'dynaudnorm'),
    ('Equalizer', 'equalizer=f=1000:t=q:w=1:g=5'),
    ('Flanger', 'flanger'),
    ('Highpass', 'highpass=f=200'),
    ('Lowpass', 'lowpass=f=800'),
    ('Multiband Compand', 'mcompand'),
    ('Treble Boost', 'treble=g=3'),
    ('Tremolo', 'tremolo'),
    ('Vibrato', 'vibrato'),
    ('Volume Increase', 'volume=3'),
    ('Stereo Tools', 'stereotools'),
]

# Function to upload a video
def upload_video():
    print("Please upload your video file:")
    uploaded = files.upload()
    video_path = next(iter(uploaded))
    print(f"Uploaded {video_path}")
    return video_path

# Function to get video duration
def get_video_duration(video_path):
    result = subprocess.run(['ffprobe', '-v', 'error', '-show_entries',
                             'format=duration', '-of',
                             'default=noprint_wrappers=1:nokey=1', video_path],
                            stdout=subprocess.PIPE,
                            stderr=subprocess.STDOUT)
    return float(result.stdout)

# Function to extract frames from video as BMP
def extract_frames(video_path, output_folder='frames'):
    os.makedirs(output_folder, exist_ok=True)
    cmd = [
        'ffmpeg', '-i', video_path, '-qscale:v', '2',
        os.path.join(output_folder, 'frame_%06d.bmp')
    ]
    subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    frame_files = sorted(os.listdir(output_folder))
    return [os.path.join(output_folder, f) for f in frame_files if f.endswith('.bmp')]

# Function to apply audio filter to BMP frame data
def apply_audio_filter_to_frame(frame_path, filter_str, output_path):
    # Read BMP header (first 36 bytes to match the first script)
    with open(frame_path, 'rb') as f:
        header = f.read(36)
        data = f.read()

    # Write raw data to temp file
    temp_data_path = frame_path + '.data'
    with open(temp_data_path, 'wb') as f:
        f.write(data)

    # Apply audio filter using FFmpeg with 'alaw' format
    temp_filtered_path = frame_path + '.filtered'
    cmd = [
        'ffmpeg', '-y', '-f', 'alaw', '-i', temp_data_path,
        '-af', filter_str, '-ac', '1', '-f', 'alaw', temp_filtered_path
    ]
    result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    if result.returncode != 0:
        print(f"Error applying audio filter '{filter_str}' to frame {frame_path}")
        print(result.stderr.decode())
        os.remove(temp_data_path)
        return False  # Indicate failure

    # Read filtered data
    with open(temp_filtered_path, 'rb') as f:
        filtered_data = f.read()

    # Reconstruct BMP image
    with open(output_path, 'wb') as f:
        f.write(header + filtered_data)

    # Clean up temporary files
    os.remove(temp_data_path)
    os.remove(temp_filtered_path)

    return True  # Indicate success

# Function to create sample images for filters
def create_sample_images(frame_path, sample_folder='sample_images'):
    os.makedirs(sample_folder, exist_ok=True)
    sample_images = {}
    print("Creating sample images...")
    for name, filter_str in tqdm(filters):
        output_image = os.path.join(sample_folder, f'{name}.bmp')
        success = apply_audio_filter_to_frame(frame_path, filter_str, output_image)
        if success:
            sample_images[name] = output_image
        else:
            print(f"Skipping filter '{name}' due to error.")
    return sample_images

# Function to display sample images with checkboxes
def display_sample_images(sample_images):
    num_filters = len(sample_images)
    cols = 4
    rows = ceil(num_filters / cols)
    items = []
    checkboxes = []
    for idx, (name, img_path) in enumerate(sample_images.items()):
        img = imageio.imread(img_path)
        image_widget = widgets.Image(value=imageio.imwrite(imageio.RETURN_BYTES, img, format='png'), format='png', width=200, height=150)
        checkbox = widgets.Checkbox(value=False, description=name)
        checkboxes.append(checkbox)
        vbox = widgets.VBox([image_widget, checkbox])
        items.append(vbox)
    grid = widgets.GridBox(items, layout=widgets.Layout(grid_template_columns="repeat(4, 220px)"))
    display(grid)
    return checkboxes

# Function to specify filter durations
def specify_filter_times(selected_filters, video_duration):
    time_widgets = {}
    widgets_list = []
    for name in selected_filters:
        start_time = widgets.FloatText(value=0, min=0, max=video_duration, description=f'{name} Start:')
        stop_time = widgets.FloatText(value=video_duration, min=0, max=video_duration, description=f'{name} Stop:')
        time_widgets[name] = (start_time, stop_time)
        widgets_list.extend([start_time, stop_time])
    return time_widgets, widgets.VBox(widgets_list)

# Function to process video frames with selected filters
def process_frames_with_filters(frames, selected_filters, time_ranges, fps):
    total_frames = len(frames)
    duration = total_frames / fps
    output_folder = 'processed_frames'
    os.makedirs(output_folder, exist_ok=True)
    print("Processing frames...")
    filter_dict = dict(filters)
    for idx, frame_path in enumerate(tqdm(frames)):
        current_time = idx / fps
        applicable_filters = []
        for name, (start_widget, stop_widget) in time_ranges.items():
            start = start_widget.value
            stop = stop_widget.value
            if start <= current_time <= stop:
                applicable_filters.append(name)
        if applicable_filters:
            # Combine filters
            filter_strings = [filter_dict[fname] for fname in applicable_filters]
            combined_filter_str = ','.join(filter_strings)
            output_path = os.path.join(output_folder, os.path.basename(frame_path))
            success = apply_audio_filter_to_frame(frame_path, combined_filter_str, output_path)
            if not success:
                shutil.copy(frame_path, output_folder)
        else:
            shutil.copy(frame_path, output_folder)
    processed_frames = sorted(os.listdir(output_folder))
    return [os.path.join(output_folder, f) for f in processed_frames if f.endswith('.bmp')]

# Function to reassemble video from frames
def reassemble_video(frames_folder, fps, output_path='output.mp4'):
    cmd = [
        'ffmpeg', '-y', '-framerate', str(fps), '-i',
        os.path.join(frames_folder, 'frame_%06d.bmp'),
        '-c:v', 'libx264', '-pix_fmt', 'yuv420p', output_path
    ]
    subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

# Main script
def main():
    # Step 1: Upload Video
    video_path = upload_video()

    # Step 2: Get Video Duration and FPS
    duration = get_video_duration(video_path)
    # Get FPS
    cmd = [
        'ffprobe', '-v', '0', '-of', 'csv=p=0', '-select_streams', 'v:0',
        '-show_entries', 'stream=r_frame_rate', video_path
    ]
    result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    fps_str = result.stdout.decode('utf-8').strip()
    num, denom = fps_str.split('/')
    fps = float(num) / float(denom)

    # Step 3: Extract Frames as BMP
    frames = extract_frames(video_path)

    # Step 4: Create Sample Images
    middle_frame = frames[len(frames) // 2]
    sample_images = create_sample_images(middle_frame)

    # Step 5: Display Sample Images with Checkboxes
    print("\nSelect the filters you want to apply by checking the boxes below each image:")
    checkboxes = display_sample_images(sample_images)

    # Next button to proceed
    next_button = widgets.Button(description='Next', button_style='success')

    # Create app layout
    app_layout = widgets.VBox([next_button])
    display(app_layout)

    def on_next_button_clicked(b):
        selected_filters = [cb.description for cb in checkboxes if cb.value]
        if not selected_filters:
            print("No filters selected. Please select at least one filter.")
            return
        # Step 7: Specify Filter Durations
        print("Specify start and stop times for each filter (in seconds):")
        time_widgets, time_widget_box = specify_filter_times(selected_filters, duration)

        # Process Video Button
        process_button = widgets.Button(description='Process Video', button_style='primary')
        def on_process_button_clicked(pb):
            # Collect time ranges
            time_ranges = time_widgets
            # Process Frames
            clear_output()
            print("Processing frames with selected filters. Please wait...")
            processed_frames_folder = 'processed_frames'
            process_frames_with_filters(
                frames, selected_filters, time_ranges, fps
            )
            # Reassemble Video
            print("Reassembling video...")
            reassemble_video(processed_frames_folder, fps)
            print("Processing complete.")
            # Provide download link
            files.download('output.mp4')
        process_button.on_click(on_process_button_clicked)
        # Update app layout
        app_layout.children = [time_widget_box, process_button]
    next_button.on_click(on_next_button_clicked)

main()
