In [None]:
!pip install webrtcvad -q
!pip install setuptools -q
!pip install pydub -q
#!pip install audioop-lts -q #only needed for python v3.13

In [1]:
import re
import datetime
from pydub import AudioSegment

import IPython.display as ipd

In [2]:
import webrtcvad
import wave

vad = webrtcvad.Vad(1)

def vad(audio_file,aggressiveness =1, frame_duration=30):
    vad = webrtcvad.Vad(aggressiveness)  # 0 to 3, while 3 is the most aggressive
    
    with wave.open(audio_file, 'rb') as wf:
        sample_rate = wf.getframerate()
        
        # Calculate frame size (must be 10, 20, or 30 ms for WebRTC VAD)
        if frame_duration not in [10, 20, 30]:
            frame_duration = 30
            
        frame_size = int(sample_rate * frame_duration / 1000)
        
        segments = []

        while True:
            frame = wf.readframes(frame_size)
            if not frame:
                break
                
            # Ensure we have enough samples for a complete frame
            if len(frame) < frame_size * 2:  # *2 because 16-bit samples
                break
                
            try:
                is_speech = vad.is_speech(frame, sample_rate)
                segments.append({
                    'start': wf.tell() / sample_rate - frame_duration / 1000,
                    'end': wf.tell() / sample_rate,
                    'is_speech': is_speech
                })
            except Exception as e:
                print(f"Error processing frame: {e}")
                continue

        return segments 

# VAD in one snippet

In [7]:
# Load .wav files into a list
import os
path = "../_data/audio_files/audio_files/"
audio_files = [f for f in os.listdir(path) if f.endswith('.wav')]


In [None]:
i = 1
# Show audio file as playable widget
audio_file = path + audio_files[i]
ipd.Audio(audio_file)

In [12]:
def append_same_segments(segments):
    previous_is_speech = None
    new_segments = []
    for segment in segments:
        if segment['is_speech'] != previous_is_speech:
            if previous_is_speech is not None:
                new_segments.append({
                    'start': start,
                    'end': segment['start'],
                    'is_speech': previous_is_speech
                })
            start = segment['start']
        previous_is_speech = segment['is_speech']
    # Append last segment
    new_segments.append({
        'start': start,
        'end': segment['end'],
        'is_speech': previous_is_speech
    })
    return new_segments

In [11]:
# Process one audio file
segments = vad(path + audio_files[i])

print(audio_files[i])
print()

print(append_same_segments(segments))

356_142_25-02-28_10-29-30_middle.wav

[{'start': 0.0, 'end': 0.15, 'is_speech': True}, {'start': 0.15, 'end': 0.8099999999999999, 'is_speech': False}, {'start': 0.8099999999999999, 'end': 3.9000000000000004, 'is_speech': True}, {'start': 3.9000000000000004, 'end': 3.93, 'is_speech': False}, {'start': 3.93, 'end': 4.17, 'is_speech': True}, {'start': 4.17, 'end': 5.67, 'is_speech': False}, {'start': 5.67, 'end': 6.39, 'is_speech': True}, {'start': 6.39, 'end': 6.6, 'is_speech': False}, {'start': 6.6, 'end': 8.940000000000001, 'is_speech': True}, {'start': 8.940000000000001, 'end': 9.3, 'is_speech': False}, {'start': 9.3, 'end': 9.99, 'is_speech': True}]


## VAD in multiple snippets. Plus (longer) silence threshold.

In [52]:
def parse_timestamp(filename: str) -> datetime.datetime:
    # Expecting format: session_index_yy-mm-dd_hh-mm-ss_suffix.wav
    pattern = r'_(\d{2}-\d{2}-\d{2})_(\d{2}-\d{2}-\d{2})_'
    m = re.search(pattern, filename)
    if not m:
        raise ValueError(f"Filename {filename} does not match the required format")
    date_str = m.group(1)
    time_str = m.group(2)
    return datetime.datetime.strptime(date_str + "_" + time_str, "%y-%m-%d_%H-%M-%S")

In [None]:
# Load .wav files into a list
import os
path = "../_data/audio_files/audio_files/"
audio_files = [f for f in os.listdir(path) if f.endswith('.wav')]

# Remove files with 0 bytes
audio_files = [f for f in audio_files if os.path.getsize(path + f) > 0]

try:
    file_list_sorted = sorted(audio_files, key=parse_timestamp)

    # Append all the audio files into a single AudioSegment
    full_audio = AudioSegment.empty()
    for audio_file in file_list_sorted:
        full_audio += AudioSegment.from_wav(path + audio_file)

    # Export the full audio to a single file
    full_audio.export(f"{path}full_audio.wav", format="wav")
except Exception as e:
    print(f"Error processing files: {e}")

<_io.BufferedRandom name='../_data/audio_files/audio_files/full_audio.wav'>

In [81]:
def apply_silence_threshold(segments, silence_threshold=0.5):
    new_segments = []
    for segment in segments:
        #Speech stays speech
        if segment['is_speech']:
            new_segments.append(segment)
        #Silence below threshold is not kept as silence
        elif not segment['is_speech'] and segment['end'] - segment['start'] < silence_threshold:
            new_segments.append({
                'start': segment['start'],
                'end': segment['end'],
                'is_speech': True
            })
        #Silence above threshold is kept as silence    
        elif not segment['is_speech'] and segment['end'] - segment['start'] > silence_threshold:
            new_segments.append(segment)
    
    new_segments = append_same_segments(new_segments)
    return new_segments

In [66]:
def pretty_print_segments(segments):
    for segment in segments:
        # Format timestamps to minutes and full seconds rounded 
        start = datetime.timedelta(seconds=round(segment['start']))
        end = datetime.timedelta(seconds=round(segment['end']))
        print(f"Start: {start} - End: {end} - Speech: {segment['is_speech']}")

In [82]:
path = "../_data/audio_files/audio_files/"

# Get segments for the full audio
audio_file = f"{path}full_audio.wav"

original_segments = vad(audio_file)

appended_segments = append_same_segments(original_segments)

segments_after_threshold = apply_silence_threshold(appended_segments, 10)

pretty_print_segments(segments_after_threshold)

Start: 0:00:00 - End: 0:28:36 - Speech: True
Start: 0:28:36 - End: 0:28:47 - Speech: False
Start: 0:28:47 - End: 0:53:57 - Speech: True
Start: 0:53:57 - End: 0:54:25 - Speech: False
Start: 0:54:25 - End: 0:54:25 - Speech: True
Start: 0:54:25 - End: 0:54:44 - Speech: False
Start: 0:54:44 - End: 0:55:14 - Speech: True
Start: 0:55:14 - End: 0:55:36 - Speech: False
Start: 0:55:36 - End: 1:22:56 - Speech: True
Start: 1:22:56 - End: 1:23:08 - Speech: False


**Insight from Claude**
Silence duration threshold:
- 300-700ms to identify a true pause between speakers
- 150-300ms for within-speaker pauses.

# Plotting the Audio Data

In [None]:
import matplotlib.pyplot as plt
import numpy as np

def plot_speech_silence_distribution(appended_segments, thresholds):
    """
    Plot speech vs silence distribution as a stacked bar chart for different thresholds.
    
    Args:
        appended_segments: The segments to analyze
        thresholds: List of threshold values to test
    """
    # Lists to store durations for each threshold
    silence_durations = []
    speech_durations = []
    
    # Calculate durations for each threshold
    for threshold in thresholds:
        segments = apply_silence_threshold(appended_segments, threshold)
        
        # Calculate total duration of silence and speech
        silence_duration = sum(segment['end'] - segment['start'] 
                              for segment in segments if not segment['is_speech'])
        speech_duration = sum(segment['end'] - segment['start'] 
                             for segment in segments if segment['is_speech'])
        
        silence_durations.append(silence_duration)
        speech_durations.append(speech_duration)
    
    # Convert to minutes if the values are very large
    convert_to_minutes = max(max(silence_durations), max(speech_durations)) > 1000
    if convert_to_minutes:
        silence_durations = [d / 60 for d in silence_durations]
        speech_durations = [d / 60 for d in speech_durations]
        y_label = 'Duration (minutes)'
    else:
        y_label = 'Duration (seconds)'
    
    # Create stacked bar chart
    fig, ax = plt.subplots(figsize=(12, 7))
    
    # Use positions 0, 1, 2, etc. for the bars
    positions = range(len(thresholds))
    
    # Create the stacked bars
    ax.bar(positions, silence_durations, label='Silence', color='lightgray')
    ax.bar(positions, speech_durations, bottom=silence_durations, 
           label='Speech', color='steelblue')
    
    # Add labels and styling
    ax.set_xlabel('Silence Threshold')
    ax.set_ylabel(y_label)
    ax.set_title('Distribution of Speech vs Silence by Threshold')
    ax.legend(loc='upper right')
    
    # Add text showing percentages on each bar
    total_durations = np.array(silence_durations) + np.array(speech_durations)
    for i, threshold in enumerate(thresholds):
        # Calculate percentages
        silence_pct = silence_durations[i] / total_durations[i] * 100
        speech_pct = speech_durations[i] / total_durations[i] * 100
        
        # Add percentage labels
        ax.text(i, silence_durations[i]/2, f"{silence_pct:.1f}%", 
                ha='center', va='center', color='black')
        ax.text(i, silence_durations[i] + speech_durations[i]/2, f"{speech_pct:.1f}%", 
                ha='center', va='center', color='white')
    
    # Set x-ticks to positions and label them with threshold values
    ax.set_xticks(positions)
    ax.set_xticklabels(thresholds)
    
    plt.tight_layout()
    plt.show()

# Example thresholds to test
thresholds = [0.5, 1, 1.5, 2, 2.5, 3, 3.5, 4]

# Run the function
plot_speech_silence_distribution(appended_segments, thresholds)

In [None]:
def plot_speech_silence_distribution(appended_segments, thresholds):
    """
    Plot average duration of speech and silence segments with min/max whiskers.
    
    Args:
        appended_segments: The segments to analyze
        thresholds: List of threshold values to test
    """
    # Data structures to store statistics
    silence_avg_durations = []
    speech_avg_durations = []
    silence_min_durations = []
    silence_max_durations = []
    speech_min_durations = []
    speech_max_durations = []
    
    # Calculate statistics for each threshold
    for threshold in thresholds:
        segments = apply_silence_threshold(appended_segments, threshold)
        
        # Get all silence and speech segments durations
        silence_durations = [segment['end'] - segment['start'] for segment in segments if not segment['is_speech']]
        speech_durations = [segment['end'] - segment['start'] for segment in segments if segment['is_speech']]
        
        # Calculate statistics
        silence_avg = np.mean(silence_durations) if silence_durations else 0
        speech_avg = np.mean(speech_durations) if speech_durations else 0
        silence_min = np.min(silence_durations) if silence_durations else 0
        silence_max = np.max(silence_durations) if silence_durations else 0
        speech_min = np.min(speech_durations) if speech_durations else 0
        speech_max = np.max(speech_durations) if speech_durations else 0
        
        # Append to lists
        silence_avg_durations.append(silence_avg)
        speech_avg_durations.append(speech_avg)
        silence_min_durations.append(silence_min)
        silence_max_durations.append(silence_max)
        speech_min_durations.append(speech_min)
        speech_max_durations.append(speech_max)
    
    # Convert to minutes if values are very large
    convert_to_minutes = max(max(silence_max_durations), max(speech_max_durations)) > 1000
    if convert_to_minutes:
        silence_avg_durations = [d / 60 for d in silence_avg_durations]
        speech_avg_durations = [d / 60 for d in speech_avg_durations]
        silence_min_durations = [d / 60 for d in silence_min_durations]
        silence_max_durations = [d / 60 for d in silence_max_durations]
        speech_min_durations = [d / 60 for d in speech_min_durations]
        speech_max_durations = [d / 60 for d in speech_max_durations]
        y_label = 'Duration (minutes)'
    else:
        y_label = 'Duration (seconds)'
    
    # Create plot
    fig, ax = plt.subplots(figsize=(14, 8))
    
    # Width of each bar
    bar_width = 0.35
    
    # Set positions for grouped bars
    positions = np.arange(len(thresholds))
    silence_positions = positions - bar_width/2
    speech_positions = positions + bar_width/2
    
    # Calculate error bar ranges (distance from average to min/max)
    silence_min_error = [avg - min_val for avg, min_val in zip(silence_avg_durations, silence_min_durations)]
    silence_max_error = [max_val - avg for avg, max_val in zip(silence_avg_durations, silence_max_durations)]
    speech_min_error = [avg - min_val for avg, min_val in zip(speech_avg_durations, speech_min_durations)]
    speech_max_error = [max_val - avg for avg, max_val in zip(speech_avg_durations, speech_max_durations)]
    
    # Create error bars as [lower error, upper error]
    silence_error = [silence_min_error, silence_max_error]
    speech_error = [speech_min_error, speech_max_error]
    
    # Create bars
    silence_bars = ax.bar(silence_positions, silence_avg_durations, bar_width, 
                         label='Silence', color='lightgray', yerr=silence_error, 
                         capsize=5, alpha=0.8)
    speech_bars = ax.bar(speech_positions, speech_avg_durations, bar_width,
                         label='Speech', color='steelblue', yerr=speech_error, 
                         capsize=5, alpha=0.8)
    
    # Add labels and styling
    ax.set_xlabel('Silence Threshold')
    ax.set_ylabel(f'Average {y_label}')
    ax.set_title('Average Duration of Speech and Silence Segments by Threshold\n(Whiskers show min/max values)')
    ax.set_xticks(positions)
    ax.set_xticklabels(thresholds)
    ax.legend()

    # Add y axis grid
    ax.yaxis.grid(True)

    # Add markers for every 2 minutes if values are in minutes
    if y_label == 'Duration (minutes)':
        ax.yaxis.set_major_locator(plt.MultipleLocator(2))

    # Add segment counts as text on each bar
    for threshold_idx, threshold in enumerate(thresholds):
        segments = apply_silence_threshold(appended_segments, threshold)
        silence_count = sum(1 for segment in segments if not segment['is_speech'])
        speech_count = sum(1 for segment in segments if segment['is_speech'])
        
        # Add count labels
        ax.text(silence_positions[threshold_idx], silence_avg_durations[threshold_idx]/2,
                f"n={silence_count}", ha='center', va='center', color='black', fontsize=8)
        ax.text(speech_positions[threshold_idx], speech_avg_durations[threshold_idx]/2,
                f"n={speech_count}", ha='center', va='center', color='white', fontsize=8)
    
    plt.tight_layout()
    plt.show()

# Example thresholds to test
thresholds = [0.5, 1, 1.5, 2, 2.5, 3, 3.5, 4]

# Run the function
plot_speech_silence_distribution(appended_segments, thresholds)

## Takeaways:
- A silence threshold of 1 second leads to all snippets being below 2min and on average around 15 to 20s. Seems interesting.