In [None]:
import sys

if sys.platform == 'win32':
    file_path = 'C:\\videos\\TheGreat_2_2001_169_2398_ProRes422HQ_ENG20_ENG51_BPO20_BPO51_Primary_A17913780.mov'
elif sys.platform == 'linux':
    file_path = '/mnt/c/videos/TheGreat_2_2001_169_2398_ProRes422HQ_ENG20_ENG51_BPO20_BPO51_Primary_A17913780.mov'
else:
    file_path = '/Users/leandro.correia/Documents/videos/TheGreat_2_2001_169_2398_ProRes422HQ_ENG20_ENG51_BPO20_BPO51_Primary_A17913780.mov'

In [None]:
from pymediainfo import MediaInfo
import numpy as np


# Extract media information
media_info = MediaInfo.parse(file_path)

# Initialize variables to store audio track count and metadata
audio_tracks = []
audio_metadata = []

# Iterate through tracks to find audio tracks
for track in media_info.tracks:
    if track.track_type == "Audio":
        audio_tracks.append(track)
        audio_metadata.append(track.to_data())

# Output the number of audio tracks and their metadata
print(f"Number of audio tracks: {len(audio_tracks)}")
print("Audio track metadata:")
for idx, metadata in enumerate(audio_metadata, start=1):
    print(f"Track {idx}: {metadata}")

In [None]:
for key, value in audio_metadata[0].items():
    print(f"{key}: {value}")

In [None]:
import numpy as np
from typing import List, Tuple


class SilenceNumpyDetector:
    def __init__(self, threshold: float = 0.01, silence_duration: float = 0.5):
        self.threshold = threshold
        self.silence_duration = silence_duration  # in seconds

    def detect_silence(self, buffer: np.ndarray, sample_rate: int) -> bool:
        amplitude = np.mean(np.abs(buffer))
        return amplitude < self.threshold

    def find_nearest_silence(self, buffer: np.ndarray, sample_rate: int, search_start: int, search_end: int) -> int:
        """Search for 500ms silence between search_start and search_end (both in samples)."""
        silence_samples = int(self.silence_duration * sample_rate)
        step = int(sample_rate * 0.1)  # Slide every 100ms

        best_candidate = None

        for i in range(search_start, search_end - silence_samples, step):
            window = buffer[i:i + silence_samples]
            if self.detect_silence(window, sample_rate):
                best_candidate = i
                break  # Optionally: break to get first found, or keep searching to find nearest to 30s

        return best_candidate

    def split_buffer(self, input_data: np.ndarray, sample_rate: int) -> List[np.ndarray]:
        max_duration = 30  # seconds
        max_size_bytes = 25 * 1024 * 1024  # 25MB
        silence_search_window = 10  # seconds before cut

        chunks = []
        cursor = 0
        total_samples = len(input_data)

        while cursor < total_samples:
            target_samples = int(max_duration * sample_rate)
            search_offset = int(silence_search_window * sample_rate)
            target_end = cursor + target_samples

            if target_end >= total_samples:
                chunks.append(input_data[cursor:])
                break

            # Search for silence between (target_end - 10s) and (target_end)
            search_start = max(cursor, target_end - search_offset)
            search_end = target_end

            silence_pos = self.find_nearest_silence(input_data, sample_rate, search_start, search_end)

            if silence_pos is None:
                cut_pos = target_end
            else:
                cut_pos = silence_pos

            chunk = input_data[cursor:cut_pos]
            estimated_size = chunk.nbytes

            if estimated_size > max_size_bytes:
                # fallback to size-based cut
                max_samples = int(max_size_bytes / input_data.itemsize)
                cut_pos = cursor + min(max_samples, target_samples)
                chunk = input_data[cursor:cut_pos]

            chunks.append(chunk)
            cursor = cut_pos

        return chunks


In [None]:
import sys
import os
import numpy as np
sys.path.insert(0, os.path.abspath(os.path.join(os.getcwd(), '..')))

from extractor.audio_ffmpeg import AudioFFmpegExtractor
from pymediainfo import MediaInfo


SAMPLING_RATE = 16000

# Extract media information
media_info = MediaInfo.parse(file_path)

# Initialize variables to store audio track count and metadata
audio_tracks = []

ffmpeg_extractor = AudioFFmpegExtractor()
# Iterate through tracks to find audio tracks
for track in media_info.tracks:
    if track.track_type == "Audio":
        buffer = ffmpeg_extractor.extract(file_path, track.to_data().get('stream_identifier'), SAMPLING_RATE)
        
        norm_flat_buffer = buffer.flatten().astype(np.float32) / 32768.0
        audio_tracks.append({
            'id': track.to_data().get('stream_identifier'),
            'data': norm_flat_buffer,
            'channel_layout': track.to_data().get('channel_layout')
            })


for audio in audio_tracks:
    print(audio.get('id'), audio.get('channel_layout'))

In [None]:
detector = SilenceNumpyDetector(threshold=0.01)

chunks = detector.split_buffer(audio_tracks[0].get('data'), sample_rate=16000)

for i, chunk in enumerate(chunks):
    duration_seconds = len(chunk) / 16000  # sample_rate
    print(f"Chunk {i}: {duration_seconds:.2f} seconds, {len(chunk)} samples")


In [None]:
import matplotlib.pyplot as plt
import librosa
import librosa.display

def plot_waveform(chunk, sample_rate, silence_sample_index=None, title_prefix=""):
    plt.figure(figsize=(7, 4))

    # Waveform
    time = np.linspace(0, len(chunk) / sample_rate, num=len(chunk))
    plt.plot(time, chunk, alpha=0.7)
    plt.title(f'{title_prefix} - Waveform')
    plt.xlabel("Time (s)")
    plt.ylabel("Amplitude")
    if silence_sample_index is not None:
        plt.axvline(silence_sample_index / sample_rate, color='red', linestyle='--', label="Silence Cut")
        plt.legend()

    plt.tight_layout()
    plt.show()


In [None]:
class SilenceNumpyDetector():
    def __init__(self, threshold: float = 0.01):
        self.threshold = threshold

    def split_buffer(self, audio: np.ndarray, sample_rate: int = 16000):
        chunk_duration = 30  # seconds
        min_silence_duration = 0.5  # seconds
        pre_check_duration = 10  # seconds
        max_bytes = 25 * 1024 * 1024  # 25 MB

        max_samples = int((max_bytes / 2))  # int16 = 2 bytes
        chunk_samples = int(chunk_duration * sample_rate)
        pre_check_samples = int(pre_check_duration * sample_rate)
        silence_samples = int(min_silence_duration * sample_rate)

        chunks = []
        silence_points = []  # store split points (in samples)

        position = 0
        while position < len(audio):
            end_pos = min(position + chunk_samples, len(audio))

            buffer = audio[position:end_pos]

            # search for silence after pre-check
            search_start = min(pre_check_samples, len(buffer) - silence_samples)
            search_buffer = buffer[search_start:]
            found = False
            for i in range(0, len(search_buffer) - silence_samples):
                segment = search_buffer[i:i + silence_samples]
                if np.mean(np.abs(segment)) < self.threshold:
                    split_sample = search_start + i
                    chunks.append(buffer[:split_sample])
                    silence_points.append(position + split_sample)
                    position += split_sample
                    found = True
                    break
            if not found:
                chunks.append(buffer)
                silence_points.append(position + len(buffer))
                position += len(buffer)

            if position * 2 > max_bytes:
                break

        return chunks, silence_points


In [None]:
detector = SilenceNumpyDetector(threshold=0.01)
chunks, silence_points = detector.split_buffer(audio_tracks[0].get('data'), sample_rate=16000)

start_sample = 0
for i, (chunk, silence_sample) in enumerate(zip(chunks, silence_points)):
    silence_relative = silence_sample - start_sample
    plot_waveform(chunk, sample_rate=16000, silence_sample_index=silence_relative, title_prefix=f"Chunk {i}")
    start_sample = silence_sample

print("Split points (s):", silence_points)
print("Number of chunks:", len(chunks))
print("Chunk durations (s):", [len(c) / 16000 for c in chunks])

In [None]:
plt.figure(figsize=(16, 4))
full_audio = audio_tracks[0]['data']
time = np.linspace(0, len(full_audio) / SAMPLING_RATE, num=len(full_audio))
plt.plot(time, full_audio, alpha=0.7, label='Waveform')

for sp in silence_points:
    plt.axvline(sp / SAMPLING_RATE, color='red', linestyle='--', alpha=0.7)

plt.title("Full Audio Waveform with Split Points")
plt.xlabel("Time (s)")
plt.ylabel("Amplitude")
plt.legend(['Waveform', 'Silence Split Points'])
plt.tight_layout()
plt.show()

In [None]:
import IPython.display as ipd

# Play each chunk in the notebook
for i, chunk in enumerate(chunks):
    print(f"Playing chunk {i} ({len(chunk)/SAMPLING_RATE:.2f} seconds)")
    ipd.display(ipd.Audio(chunk, rate=SAMPLING_RATE))

In [None]:
import numpy as np

class SilencePointDetector:
    def __init__(self, threshold=0.01, silence_duration=0.5):
        self.threshold = threshold  # amplitude threshold
        self.silence_duration = silence_duration  # min silence length (seconds)

    def detect_silence_points(self, audio: np.ndarray, sample_rate: int = 16000):
        silence_samples = int(self.silence_duration * sample_rate)
        abs_audio = np.abs(audio)

        # Detect contiguous silence spans using convolution
        silence_mask = abs_audio < self.threshold
        silence_convolved = np.convolve(silence_mask.astype(np.int32), 
                                        np.ones(silence_samples, dtype=np.int32), mode='valid')

        # Indices where a silent span starts
        silence_start_indices = np.where(silence_convolved == silence_samples)[0]

        # Convert to seconds and avoid near-duplicates
        silence_points = []
        last_point = -float('inf')
        min_gap = silence_samples  # avoid nearby silences

        for idx in silence_start_indices:
            if idx - last_point >= min_gap:
                silence_points.append(idx / sample_rate)
                last_point = idx

        return silence_points


In [None]:
import matplotlib.pyplot as plt

def plot_silence_points(audio: np.ndarray, silence_points: list, sample_rate: int = 16000):
    duration = len(audio) / sample_rate
    times = np.linspace(0, duration, len(audio))

    plt.figure(figsize=(16, 4))
    plt.plot(times, audio, label="Waveform", alpha=0.7)
    
    for i, pos in enumerate(silence_points):
        plt.axvline(x=pos, color='red', linestyle='--', label="Silence Split Points" if i == 0 else "")
    
    plt.title("Full Audio Waveform with Split Points")
    plt.xlabel("Time (s)")
    plt.ylabel("Amplitude")
    plt.legend()
    plt.tight_layout()
    plt.show()


In [None]:
# Example usage
detector = SilencePointDetector(threshold=0.01, silence_duration=0.5)
silence_points = detector.detect_silence_points(audio_tracks[0]['data'], sample_rate=16000)

plot_silence_points(audio_tracks[0]['data'], silence_points, sample_rate=16000)


In [None]:
import numpy as np

class SilencePointDetector:
    def __init__(self, threshold=0.01, silence_duration=0.5, min_chunk_duration=10.0, max_chunk_duration=30.0):
        self.threshold = threshold  # amplitude threshold for silence
        self.silence_duration = silence_duration  # duration of silence to detect (in seconds)
        self.min_chunk_duration = min_chunk_duration
        self.max_chunk_duration = max_chunk_duration

    def detect_silence_points(self, audio: np.ndarray, sample_rate: int = 16000):
        silence_samples = int(self.silence_duration * sample_rate)
        abs_audio = np.abs(audio)

        # Create a mask of where the audio is silent
        silence_mask = abs_audio < self.threshold

        # Smooth the mask to find longer silent regions
        silence_convolved = np.convolve(
            silence_mask.astype(np.int32),
            np.ones(silence_samples, dtype=np.int32),
            mode='valid'
        )

        # Indices where silence starts
        silence_start_indices = np.where(silence_convolved == silence_samples)[0]

        # Convert sample indices to time (seconds)
        all_silence_times = []
        last_point = -float('inf')
        for idx in silence_start_indices:
            time = idx / sample_rate
            if time - last_point >= self.silence_duration:
                all_silence_times.append(time)
                last_point = idx / sample_rate

        # Now filter silence points to obey chunk duration limits
        filtered_points = [0.0]  # always start at 0
        last_time = 0.0

        for t in all_silence_times:
            gap = t - last_time
            if self.min_chunk_duration <= gap <= self.max_chunk_duration:
                filtered_points.append(t)
                last_time = t
            elif gap > self.max_chunk_duration:
                # Insert a split at max_chunk_duration if silence is too far
                forced_split = last_time + self.max_chunk_duration
                filtered_points.append(forced_split)
                last_time = forced_split

        # Ensure final point
        total_duration = len(audio) / sample_rate
        if total_duration - last_time >= self.min_chunk_duration:
            filtered_points.append(total_duration)

        return filtered_points


In [None]:
detector = SilencePointDetector(
    threshold=0.01,
    silence_duration=0.5,
    min_chunk_duration=10.0,
    max_chunk_duration=30.0
)

# Get split points (in seconds)
split_points = detector.detect_silence_points(audio_tracks[0]['data'], 16000)

print("Split points (s):", split_points)
plot_silence_points(audio_tracks[0]['data'], split_points, sample_rate=16000)