In [28]:
%pip install kaggle numpy pandas matplotlib librosa soundfile pyloudnorm scipy scikit-learn ipywidgets torch pyannote.audio mir_eval -q

Note: you may need to restart the kernel to use updated packages.


In [29]:
#!kaggle datasets download -d mfekadu/english-multispeaker-corpus-for-voice-cloning

In [30]:
#!unzip english-multispeaker-corpus-for-voice-cloning.zip -d data/

In [31]:
import os
import IPython.display as ipd
import librosa
from glob import glob
import soundfile as sf
import matplotlib.pyplot as plt
import numpy as np
import pyloudnorm as pyln
from scipy import signal, linalg
from scipy.signal import butter, lfilter

import sklearn
from sklearn.preprocessing import StandardScaler

from ipywidgets import interact
import urllib
import mir_eval

import torch
from pprint import pprint
from pyannote.audio import Pipeline
from pyannote.core import Annotation, Segment


from sklearn import preprocessing
from sklearn import cluster
%matplotlib inline

In [32]:
wave_pathes = glob(
    "data/VCTK-Corpus/VCTK-Corpus/wav48/*/*.wav", recursive=True
)
# In case of Windows
wave_pathes = [wave_path.replace('\\', '/') for wave_path in wave_pathes]
sample_file = wave_pathes[0]
ipd.Audio(sample_file)

In [33]:
model, utils = torch.hub.load(repo_or_dir="snakers4/silero-vad", model="silero_vad", force_reload=True)
get_speech_timestamps, save_audio, read_audio, VADIterator, collect_chunks = utils

Downloading: "https://github.com/snakers4/silero-vad/zipball/master" to /home/sviatoslav/.cache/torch/hub/master.zip


In [34]:
wav = read_audio(wave_pathes[9])
ipd.Audio(wave_pathes[9])

In [35]:
speech_timestamps = get_speech_timestamps(wav, model)
pprint(speech_timestamps)

[{'end': 39904, 'start': 20000}]


# Voice Activity Detection

### Energy-Based Voice Activity Detection (VAD)

Voice Activity Detection (VAD) is a crucial step in speech processing applications, such as speech recognition, speaker diarization, and audio segmentation. This approach utilizes short-term energy to differentiate speech from silence or background noise. 

#### Key Steps:
1. **Bandpass Filtering**: Removes unwanted noise and retains frequencies relevant to speech.
2. **Short-Term Energy Computation**: Measures the energy of the signal in short overlapping frames.
3. **Noise Floor Estimation**: Uses the median energy to set a dynamic threshold for speech detection.
4. **Speech and Silence Segmentation**:
   - Speech is detected when the energy exceeds the noise floor for a minimum number of consecutive frames.
   - A silence counter ensures that brief pauses do not split speech segments.
   - Speech segments are finalized based on minimum duration and silence gap constraints.


In [36]:
def bandpass_filter(signal, lowcut=300, highcut=3400, fs=16000, order=5):
    nyquist = 0.5 * fs
    low = lowcut / nyquist
    high = highcut / nyquist
    b, a = butter(order, [low, high], btype='band')
    return lfilter(b, a, signal)


In [37]:
def short_term_energy(signal, frame_size=128, hop_size=64):
    energy = []
    for i in range(0, len(signal) - frame_size, hop_size):
        frame = signal[i:i + frame_size]
        energy.append(np.sum(frame ** 2))
    return np.array(energy)


In [38]:

def speech_span(signal, sr, hop_size=64, min_speech_duration=0.2, max_silence_gap=0.3, min_streak=5):

    filtered_signal = bandpass_filter(signal)  
    energy = short_term_energy(filtered_signal)
    noise_floor = np.median(energy) * 5

    speech_segments = []
    start = None  
    silence_counter = 0
    streak = 0  

    for i, e in enumerate(energy):
        time = (i * hop_size) / sr  

        if e >= noise_floor:  
            streak += 1
            if streak >= min_streak:
                if start is None:
                    start = time - ((min_streak * hop_size) / sr) 
                silence_counter = 0 
        else:
            streak = 0 
            if start is not None:
                silence_counter += hop_size / sr
                if silence_counter > max_silence_gap:
                    if time - start >= min_speech_duration:
                        speech_segments.append((start, time - silence_counter))
                    start = None

    if start is not None and (time - start) >= min_speech_duration:
        speech_segments.append((start, time))

    return speech_segments


In [39]:
def convert_to_annotation(speech_spans):
    annotation = Annotation()
    for start, end in speech_spans:
        annotation[Segment(start, end)] = "SPEECH"
    return annotation

In [40]:

y, sr = librosa.load(wave_pathes[9])
print(speech_span(y, sr))


[(1.1290702947845803, 2.467120181405895)]


### K-means approach

#### Let's extract the features! 

**Zero crossing count** counts how many times the waveform changes sign within a given frame. It should be useful for VAD since it basically measures how "active" the signal is. Lower ZCC = less voice activity. 

**Overall energy** measures the total intensity of the signal in a given segment. It is computed as the sum of the squared amplitudes in a frame. 

**Spectral centroid** is the center of mass of the frequency spectrum. It is a measure of "brightness" of the signal since it is larger for higher frequencies. Voiced regions will likely have higher centroid compared to silence. 

**Spectral flux** <strike>is something we don't fully understand</strike> is a measure of how quickly the spectrum changes between frames (within a frame). First, we compute the magnitude spectrogram. Then, we compute the difference between consequtive magnitude values along the time axis for each row (frequency bin). Finally, we take the Euclidean norm of the differences and compute the mean. 


In [41]:

def normalize_LUFS(audio: np.ndarray, sr: int, target_LUFS: float = -16.0) -> np.ndarray:
    """Normalize audio loudness to a target LUFS level."""
    meter = pyln.Meter(sr)
    loudness = meter.integrated_loudness(audio)
    return pyln.normalize.loudness(audio, loudness, target_LUFS)


def segment_audio(audio: np.ndarray, sr: int, frame_duration: float = 0.090, overlap: float = 0.5):
    """Split audio into overlapping frames."""
    frame_size = int(sr * frame_duration)
    hop_length = int(frame_size * (1 - overlap))
    frames = librosa.util.frame(audio, frame_length=frame_size, hop_length=hop_length).T
    return frames


def normalize_track_features(features: np.ndarray) -> np.ndarray:
    scaler = StandardScaler()
    return scaler.fit_transform(features)

In [42]:
def extract_features(frame: np.ndarray, sr: int, n_fft: int = 1024, hop_length: int = 256):
    """
    Extract 4 basic features per frame:
      - Zero Crossing Count
      - Energy
      - Spectral Centroid
      - Spectral Flux
    """
    # Zero crossing count
    zcc = np.sum(librosa.zero_crossings(frame, pad=False))
    # Energy
    energy = np.sum(frame ** 2)
    # Spectral centroid
    spec_cent = np.mean(librosa.feature.spectral_centroid(y=frame, sr=sr,
                                                          n_fft=n_fft, hop_length=hop_length))
    # Spectral flux
    S = np.abs(librosa.stft(frame, n_fft=n_fft, hop_length=hop_length))
    flux_frames = np.diff(S, axis=1)
    spec_flux = np.mean(np.sqrt(np.sum(flux_frames ** 2, axis=0)))
    return np.array([zcc, energy, spec_cent, spec_flux], dtype=float)
  
def extract_features_full(audio: np.ndarray, sr: int, frame_duration: float = 0.090, overlap: float = 0.5) -> np.ndarray:
    frames = segment_audio(audio, sr, frame_duration, overlap)
    feats = [extract_features(frame, sr) for frame in frames]
    return np.array(feats)

In [43]:
def kmeans_only_vad(
    audio: np.ndarray,
    sr: int,
    frame_duration: float = 0.090,
    overlap: float = 0.5,
    min_gap: float = 0.4
) -> Annotation:
    """
    Classify frames into 'speech' or 'non-speech' purely based on K-Means.
    - No additional energy threshold.
    - We pick the cluster with the higher average energy as "speech".
    """
    # Extract and normalize features
    features = extract_features_full(audio, sr, frame_duration, overlap)
    features_norm = normalize_track_features(features)

    # K-Means with 2 clusters
    kmeans =  sklearn.cluster.KMeans(n_clusters=2, random_state=42)
    labels = kmeans.fit_predict(features_norm)

    # Identify which cluster is speech by comparing average energy
    energy_idx = 1  # energy is the second feature
    cluster_0_mean_energy = np.mean(features[labels == 0, energy_idx])
    cluster_1_mean_energy = np.mean(features[labels == 1, energy_idx])
    speech_cluster = 0 if cluster_0_mean_energy > cluster_1_mean_energy else 1

    # Build annotation from frames labeled as speech_cluster
    annotation = Annotation()
    hop_sec = frame_duration * (1 - overlap)

    speech_segments = []
    current_start = None

    for i, label in enumerate(labels):
        time = i * hop_sec
        if label == speech_cluster and current_start is None:
            current_start = time
        elif label != speech_cluster and current_start is not None:
            speech_segments.append((current_start, time + frame_duration))
            current_start = None

    if current_start is not None:
        speech_segments.append((current_start, len(labels) * hop_sec + frame_duration))

    # Merge speech segments within 'min_gap'
    merged_segments = []
    if speech_segments:
        merged_segments = [speech_segments[0]]
        for seg in speech_segments[1:]:
            if seg[0] - merged_segments[-1][1] < min_gap:
                merged_segments[-1] = (merged_segments[-1][0], seg[1])
            else:
                merged_segments.append(seg)

    for start, end in merged_segments:
        annotation[Segment(start, end)] = "SPEECH"

    return annotation

# Evaluation

In [44]:
from pyannote.core import Annotation, Segment
from pyannote.metrics.detection import DetectionErrorRate

metric_energy = DetectionErrorRate()
metric_kmeans = DetectionErrorRate()

for file in wave_pathes[:1000]:
    wav = read_audio(file)
    y, sr = librosa.load(file)

    speech_timestamps = get_speech_timestamps(wav, model, return_seconds=True)
    reference = Annotation()
    for speech in speech_timestamps:
      reference[Segment(speech["start"], speech["end"])] = "SPEECH"
    
    hypothesis = convert_to_annotation(speech_span(y, sr))
    hypothesis_kmeans = kmeans_only_vad(y, sr)


    _ = metric_energy(
        reference,      
        hypothesis)
  
    _ = metric_kmeans(
        reference,      
        hypothesis_kmeans
    )



In [45]:
# aggregate the performance over the whole test set
detection_error_rate = abs(metric_energy)
print(f'Detection error rate for energy-based approach = {detection_error_rate * 100:.1f}%')

detection_error_rate = abs(metric_kmeans)
print(f'Detection error rate for kmeans approach = {detection_error_rate * 100:.1f}%')

Detection error rate for energy-based approach = 12.6%
Detection error rate for kmeans approach = 33.4%
