In [2]:
import os
import librosa
from scipy import signal


import torch
%matplotlib inline

from tqdm import tqdm


import pandas as pd
import numpy as np
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from scipy.signal import wiener

In [3]:
def apply_filters(audio, sr):
    '''
    apply high-pass filter to filter out non-speech noise < 60Hz
    '''
    hp_cutoff = 80
    Wn_hp = hp_cutoff / (sr / 2)
    
    b, a = signal.butter(4, Wn_hp, btype='highpass')
    audio = signal.filtfilt(b, a, audio)

    return audio
    
def loudness_normalization(audio):
    
    return librosa.util.normalize(audio)

def apply_wiener_filter(y, win_length=7):
    '''Noise filter'''
    filtered_audio = wiener(y, mysize=win_length)
    return filtered_audio

def extract_features(audio, sr, chunk_start, chunk_end, context_size=5):
    '''
    Extracting features in current chunk and in context window of n chunks
    '''
    current_chunk = audio[chunk_start:chunk_end]
    
    context_samples = int(context_size * (chunk_end - chunk_start))
    left_start = max(0, chunk_start - context_samples)
    right_end = min(len(audio), chunk_end + context_samples)
    
    left_context = audio[left_start:chunk_start]
    right_context = audio[chunk_end:right_end]
    
    full_context = np.concatenate([left_context, current_chunk, right_context])
    
    n_fft = min(512, len(current_chunk))
    hop_length = n_fft // 4
    
    mfccs = librosa.feature.mfcc(
        y=current_chunk, 
        sr=sr, 
        n_mfcc=13,
        n_fft=n_fft,
        hop_length=hop_length
    )
    
    mfccs_with_context = librosa.feature.mfcc(
        y=full_context, 
        sr=sr, 
        n_mfcc=13,
        n_fft=n_fft,
        hop_length=hop_length
    )
    # delta features need context window, they track how the mfccs change over time 
    mfcc_delta = librosa.feature.delta(mfccs_with_context)
    mfcc_delta2 = librosa.feature.delta(mfccs_with_context, order=2)
    
    if mfccs.shape[1] == 0:
        mfccs = np.zeros((13, 1))
    if mfcc_delta.shape[1] == 0:
        mfcc_delta = np.zeros((13, 1))
    if mfcc_delta2.shape[1] == 0:
        mfcc_delta2 = np.zeros((13, 1))
    
    spec = np.abs(librosa.stft(current_chunk, n_fft=n_fft, hop_length=hop_length))

    if spec.shape[1] == 0:
        spectral_centroid = 0
        spectral_flux = 0
    else:
        spectral_centroid = librosa.feature.spectral_centroid(
            S=spec, sr=sr).mean()
        spectral_flux = np.mean(np.diff(spec, axis=1)**2) if spec.shape[1] > 1 else 0
    
    zcr = librosa.feature.zero_crossing_rate(
        current_chunk, 
        frame_length=n_fft, 
        hop_length=hop_length
    ).mean()
    ste = np.mean(current_chunk**2)
    
    features = np.concatenate([
        np.mean(mfccs, axis=1),         
        np.mean(mfcc_delta, axis=1),   
        np.mean(mfcc_delta2, axis=1),   
        [spectral_centroid, zcr, spectral_flux, ste]  
    ])
    
    return features

def process_audio_file(audio_path, sr=16000, 
                      chunk_duration=0.025, hop_duration=0.01, context_size=5):
        
    audio, sr = librosa.load(audio_path, sr=sr, mono=True)
    
    audio = apply_filters(audio, sr)
    audio = loudness_normalization(audio)
    audio = apply_wiener_filter(audio)
    
    
    chunk_samples = int(chunk_duration * sr)
    hop_samples = int(hop_duration * sr)
    num_chunks = (len(audio) - chunk_samples) // hop_samples + 1
    
    feature_dim = 13 * 3 + 4 
    all_features = np.zeros((num_chunks, feature_dim))
    
    for i in range(num_chunks):
        chunk_start = i * hop_samples
        chunk_end = chunk_start + chunk_samples
        
        all_features[i] = extract_features(
            audio, sr, chunk_start, chunk_end, context_size
        )                            
    
    feature_names = (
        [f"mfcc_{i+1}" for i in range(13)] +
        [f"mfcc_delta_{i+1}" for i in range(13)] +
        [f"mfcc_delta2_{i+1}" for i in range(13)] +
        ['spectral_centroid', 'zcr', 'spectral_flux', 'ste']
    )
    
    df = pd.DataFrame(all_features, columns=feature_names)
    df["start_time"] = np.arange(num_chunks) * hop_duration
    df["end_time"] = df["start_time"] + chunk_duration
    df["file_path"] = audio_path
    
    return df


def process_directory(input_dir, output_csv):
    all_data = []    
    total_files = sum(1 for root, _, files in os.walk(input_dir) 
                     for f in files if f.endswith('.wav'))
    
    with tqdm(total=total_files, desc="Processing audio files") as pbar:
        for root, _, files in os.walk(input_dir):
            for file in files:
                if file.endswith(".wav"):
                    audio_path = os.path.join(root, file)
                    
                    df = process_audio_file(audio_path)
                    all_data.append(df)
                    pbar.update(1)
    
    if all_data:
        combined_df = pd.concat(all_data, ignore_index=True)
        combined_df.to_csv(output_csv, index=False)
    
    return combined_df

In [5]:
def prepare_features(df):
        scaler = StandardScaler()
        feature_cols = [col for col in df.columns if any(
            col.startswith(prefix) for prefix in ['mfcc_', 'spectral_', 'zcr', 'ste']
        )]
        
        X = df[feature_cols].values
        # scale for k means
        X = scaler.fit_transform(X)
        return X
     
def apply_kmeans(X, n_clusters=2):
    kmeans = KMeans(n_clusters=n_clusters, random_state=42)
    labels = kmeans.fit_predict(X)
    # we take the last column, which is ste (noise ste < speech ste)
    cluster_energies = [X[labels == i, -1].mean() for i in range(n_clusters)]
    if cluster_energies[0] > cluster_energies[1]:
        labels = 1 - labels
        
    return labels
    
    
def correct_vad_predictions(df, threshold_ms=100):
    '''
    If a switch between speech and silence lasts for less than 100ms, 
    it is replaced with the surrounding majority class.
    '''
    df_copy = df.copy()
    min_frames = threshold_ms // 10 
    predictions = df_copy['vad_prediction'].values
    
    changes = np.diff(predictions)
    change_points = np.where(changes != 0)[0] + 1
    
    segments = np.concatenate(([0], change_points, [len(predictions)]))
    
    for i in range(len(segments) - 1):
        segment_length = segments[i + 1] - segments[i]
        
        if segment_length < min_frames:
            value_before = predictions[segments[i] - 1] if segments[i] > 0 else predictions[segments[i]]
            value_after = predictions[segments[i + 1]] if segments[i + 1] < len(predictions) else predictions[segments[i + 1] - 1]
            
            if value_before == value_after:
                predictions[segments[i]:segments[i + 1]] = value_before
    
    df_copy['vad_prediction'] = predictions
    
    return df_copy

In [19]:
silero_model, silero_utils = torch.hub.load(repo_or_dir='snakers4/silero-vad', model='silero_vad')

Using cache found in /Users/skashperova/.cache/torch/hub/snakers4_silero-vad_master


In [29]:
from pyannote.core import Annotation, Segment
from pyannote.metrics.detection import DetectionErrorRate, DetectionPrecision, DetectionRecall, DetectionPrecisionRecallFMeasure


def load_silero_vad(audio_path):
    wav, sr = librosa.load(audio_path, sr=16000)
    (get_speech_timestamps, _, _, _, _) = silero_utils
    speech_timestamps = get_speech_timestamps(wav, silero_model, return_seconds=True)
    return speech_timestamps


def custom_vad_segments(df, frame_duration=0.01):
    """Get speech segments from custom VAD predictions"""
    predictions = df['vad_prediction'].values
    segments = []
    changes = np.diff(predictions.astype(int))
    change_points = np.where(changes != 0)[0] + 1
    
    if predictions[0] == 1:
        change_points = np.concatenate(([0], change_points))
    if predictions[-1] == 1:
        change_points = np.concatenate((change_points, [len(predictions)]))
    
    for i in range(0, len(change_points), 2):
        if i + 1 < len(change_points):
            start = change_points[i] * frame_duration
            end = change_points[i + 1] * frame_duration
            segments.append({'start': start, 'end': end})
    return segments


def annotate_segments(reference_segments, hypothesis_segments):
    reference = Annotation()
    hypothesis = Annotation()
    
    for segment in reference_segments:
        reference[Segment(segment['start'], segment['end'])] = 'SPEECH'
    
    for segment in hypothesis_segments:
        hypothesis[Segment(segment['start'], segment['end'])] = 'SPEECH'
        
    return reference, hypothesis
    

def calculate_der(reference_segments, hypothesis_segments):
    reference, hypothesis = annotate_segments(reference_segments, hypothesis_segments)
    metric = DetectionErrorRate()
    _ = metric(reference, hypothesis)
    return abs(metric)


def calculate_precision(reference_segments, hypothesis_segments):
    reference, hypothesis = annotate_segments(reference_segments, hypothesis_segments)
    
    metric = DetectionPrecision()
    _ = metric(reference, hypothesis)
    return abs(metric)


def calculate_recall(reference_segments, hypothesis_segments):
    reference, hypothesis = annotate_segments(reference_segments, hypothesis_segments)
    
    metric = DetectionRecall()
    _ = metric(reference, hypothesis)
    return abs(metric)


def calculate_f1(reference_segments, hypothesis_segments):
    reference, hypothesis = annotate_segments(reference_segments, hypothesis_segments)
    
    metric = DetectionPrecisionRecallFMeasure()
    _ = metric(reference, hypothesis)
    return abs(metric)


def main(df):
    total_error_rate = 0.0
    total_f1 = 0.0
    total_recall = 0.0
    total_precision = 0.0
    file_count = 0
    
    for file_path in df['file_path'].unique():
        file_df = df[df['file_path'] == file_path]
        audio_path = file_path
        
        silero_vad_regions = load_silero_vad(audio_path)
        custom_vad_segments_result = custom_vad_segments(file_df)
                
        error_rate = calculate_der(silero_vad_regions, custom_vad_segments_result)
        total_f1 += calculate_f1(silero_vad_regions, custom_vad_segments_result)
        total_recall += calculate_recall(silero_vad_regions, custom_vad_segments_result)
        total_precision += calculate_precision(silero_vad_regions, custom_vad_segments_result)

        total_error_rate += error_rate
        file_count += 1
            
    if file_count > 0:
        avg_error_rate = (total_error_rate / file_count) * 100
        print(f"\nAvg Detection Error Rate: {avg_error_rate:.1f}%")
        avg_f1 = (total_f1 / file_count) * 100
        print(f"\nAvg Detection F1: {avg_f1:.1f}%")
        avg_recall = (total_recall / file_count) * 100
        print(f"\nAvg Detection Recall: {avg_recall:.1f}%")
        avg_precision = (total_precision / file_count) * 100
        print(f"\nAvg Detection Precision: {avg_precision:.1f}%")
        
    return avg_error_rate, avg_f1, avg_recall, avg_precision

In [None]:
global_df = []
for i in range(10):
    input_directory = f"../data/VCTK-Corpus/VCTK-Corpus/wav48/p23{i}"
    output_csv_path = f"output_features_{i}.csv"
    if os.path.exists(output_csv_path):
        df = pd.read_csv(output_csv_path)
    else:
        df = process_directory(input_directory, output_csv_path)
        global_df.append(df)

global_df = pd.concat(global_df, ignore_index=True)

In [12]:
import warnings

warnings.filterwarnings("ignore")

In [13]:
features = prepare_features(global_df)
predictions = apply_kmeans(features)
global_df["vad_prediction"] = predictions
global_df = correct_vad_predictions(global_df)

In [30]:
avg_error_rate, avg_f1, avg_recall, avg_precision = main(global_df)


Avg Detection Error Rate: 35.3%

Avg Detection F1: 76.8%

Avg Detection Recall: 67.6%

Avg Detection Precision: 97.2%
