In [4]:
import torch
import re
import os
import pandas as pd 
import datetime
import csv
import numpy as np
import re
import glob
import time
from sklearn.metrics import precision_score, recall_score, f1_score
import numpy as np
from tqdm import tqdm
from IPython.display import Audio


#@title Install and Import Dependencies

# this assumes that you have a relevant version of PyTorch installed
# !pip install -q torchaudio

SAMPLING_RATE = 16000
OUT_FILE = "results.txt"

import torch
torch.set_num_threads(1)
# download example
# torch.hub.download_url_to_file('https://models.silero.ai/vad_models/en.wav', 'en_example.wav')

## Utils 

In [5]:
#compute model size  
def compute_model_size(model):
    num_params = np.sum([param.numel() for param in model.parameters()])
    dtype = next(model.parameters()).dtype
    num_bits = int(re.search(r"\d+$", str(dtype)).group(0))
    size_in_bytes = float(num_params * num_bits / (8 * 1e6))
    return size_in_bytes, int(num_params)



def timeit(func):
    def internal(*args, **kwargs):
        start = time.time()
        res = func(*args, **kwargs)
        end = time.time()
        lag = end - start
        return res, lag
    return internal    


#compute metric for this sample

def compute_metrics(y_true, y_pred, latency):
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)
    prec, recall, f1 = precision_score(y_true, y_pred), recall_score(y_true, y_pred) , f1_score(y_true, y_pred)
    res = {"prec": prec, "recall": recall, "f1": f1, "latency":latency }
    return res

def dump_results(file_path, res:list):
    lines_to_dump = []
    if not os.path.exists(file_path):
        header = ["time", "model", "prec", "recall", "f1", "test_dataset", "num_params", "size(MB)", "latency"]
        lines_to_dump.append(header)
    lines_to_dump.append(res)
    with open(file_path, "a+") as f:
        writer = csv.writer(f)
        for line in lines_to_dump:
            writer.writerow(line)

def process_rttm(file_path):
    df = pd.read_csv(file_path, sep=" ", names=["type", "file_id", "channel_id", "start", "duration", "orthography", "speaker_type", "speaker_name", "confidence_score", "other"])
    df['start'] = df['start'].astype(float)
    df['duration'] = df['duration'].astype(float)
    df['end'] = df['start']+ df['duration']
    return df

def extract_start_end(df):
    return [{"start": float(df['start'][i]), "end": float(df['end'][i])} for i in range(len(df['start']))]

    

# SILERO-V5

In [6]:
# ********************************SILERO****************************************************8
USE_PIP = True # download model using pip package or torch.hub
USE_ONNX = False # change this to True if you want to test onnx model
if USE_ONNX:
    !pip install -q onnxruntime
if USE_PIP:
  !pip install -q silero-vad
  from silero_vad import (load_silero_vad,
                          read_audio,
                          get_speech_timestamps,
                          save_audio,
                          VADIterator,
                          collect_chunks)
  model = load_silero_vad(onnx=USE_ONNX)
else:
  model, utils = torch.hub.load(repo_or_dir='snakers4/silero-vad',
                                model='silero_vad',
                                force_reload=True,
                                onnx=USE_ONNX)

  (get_speech_timestamps,
  save_audio,
  read_audio,
  VADIterator,
  collect_chunks) = utils


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [7]:
#compute model size 
size, num_params = compute_model_size(model)
size, num_params

(1.850376, 462594)

In [8]:
#mesure inference speed  for 1 wav file ( ~ 10 s )
wav_file_path = "/home/yehoshua/projects/silero-vad/en_example.wav"
get_speech_timestamps = timeit(get_speech_timestamps)
wav = read_audio(wav_file_path, sampling_rate=SAMPLING_RATE)

In [9]:
def compute_and_extract(wav_file_path, label_file_path):
    # audio = Audio(wav_file_path)
    wav = read_audio(wav_file_path, sampling_rate=SAMPLING_RATE)
    # get speech timestamps from full audio file
    speech_timestamps, latency = get_speech_timestamps(wav, model, sampling_rate=SAMPLING_RATE, return_seconds=True)
    labels = open(label_file_path, "r").read().split(",")[1:]
    labels = [{"start": float(s), "end":float(e), "voice": float(v)} for s, e, v  in [[labels[i], labels[i+1], labels[i+2]] for i in range(0,len(labels) -2, 3)]]
    filtered_labels = [{k:float(v) for k, v in label.items() if label['voice'] == 1} for label in labels]
    filtered_labels = [dic for dic in filtered_labels if dic != dict()]
    return speech_timestamps,  filtered_labels, latency

In [10]:

def create_mask(labels, preds, step=31.5e-3):
    """
    Creates binary masks for labels and predictions based on time intervals
    and a given step size, by checking each cursor point against all segments.

    Args:
        labels (list of dict): List of ground truth segments [{'start': x, 'end': y}].
        preds (list of dict): List of predicted segments [{'start': x, 'end': y, 'voice': 1.0}].
        step (float): The time step (e.g., 31.25e-3 for 31.25 ms).

    Returns:
        tuple: A tuple containing two lists (labels_mask, preds_mask),
               where each list contains 0s and 1s representing non-speech/speech
               at each `step` interval.
    """
    labels_mask = []
    preds_mask = []

    # Clean and filter predicted segments (only consider 'voice': 1.0)
    # Ensure segments have 'start' and 'end' keys
    valid_preds = [d for d in preds if isinstance(d, dict) and 'start' in d and 'end' in d and d.get('voice', 1.0) == 1.0]

    # Filter labels to ensure they have 'start' and 'end' keys
    valid_labels = [d for d in labels if isinstance(d, dict) and 'start' in d and 'end' in d]


    # Determine the maximum end time across both labels and valid_preds
    # This ensures the mask covers the full extent of relevant activity.
    max_end_time = 0.0
    if valid_labels:
        max_end_time = max(max_end_time, max(d['end'] for d in valid_labels))
    if valid_preds:
        max_end_time = max(max_end_time, max(d['end'] for d in valid_preds))

    # If there are no segments at all, return empty masks
    if max_end_time == 0.0:
        return [], []

    # Iterate through the timeline at fixed steps
    cursor = 0.0
    
    # CRITICAL CHANGE HERE: Use a while loop with a slight buffer
    # The `+ step * 0.5` is a common trick to handle floating point inaccuracies
    # and ensure that if max_end_time falls exactly on a step boundary,
    # or just slightly past it due to internal calculation, the loop runs one more time
    # to cover the interval up to max_end_time.
    while cursor < max_end_time + step * 0.5: # Run slightly beyond max_end_time
        # Check if the current cursor position falls within any ground truth segment
        is_label_speech = 0
        for segment in valid_labels:
            # Check if cursor is within [start, end)
            if segment['start'] <= cursor < segment['end']:
                is_label_speech = 1
                break # Found an overlapping label, no need to check further for this cursor
        labels_mask.append(is_label_speech)

        # Check if the current cursor position falls within any predicted segment
        is_pred_speech = 0
        for segment in valid_preds:
            # Check if cursor is within [start, end)
            if segment['start'] <= cursor < segment['end']:
                is_pred_speech = 1
                break # Found an overlapping prediction, no need to check further for this cursor
        preds_mask.append(is_pred_speech)

        cursor += step # Move to the next time step
        
    return labels_mask, preds_mask
# labels_res, preds_res = create_mask(filtered_labels, speech_timestamps, 31.5e-3)


In [11]:
#load dataset example and compute timestamps with VAD
test_dir = "ten_vad/testset"
dic = {'wav': sorted([os.path.join(test_dir, f) for f in os.listdir(test_dir) if f.endswith('.wav')], key=lambda f: int(re.search(r'(\d+)', f).group(1)) if re.search(r'(\d+)', f) else float('inf')), 
       "time_stamp": sorted([os.path.join(test_dir, f) for f in os.listdir(test_dir) if f.endswith('.scv')], key=lambda f: int(re.search(r'(\d+)', f).group(1)) if re.search(r'(\d+)', f) else float('inf'))}

In [12]:
def run_inference_silero(dic):
    res = []
    y_trues = []
    for (wav_path, label_path) in tqdm(zip(dic["wav"], dic['time_stamp']), desc="iteration on dataset"):
        timestamps, labels, latency = compute_and_extract(wav_path, label_path)
        y_true, y_pred =  create_mask(labels, timestamps)
        y_trues.append(y_true)
        res_sample = compute_metrics(y_true, y_pred, latency)
        res += [res_sample]
    return res, y_trues
        
res, y_trues = run_inference_silero(dic)

iteration on dataset: 30it [00:02, 10.54it/s]


In [13]:
#post process metrics 
prec = np.mean([dic['prec'] for dic in res])
recall = np.mean([dic['recall'] for dic in res])
f1 = np.mean([dic['f1'] for dic in res])
latency = np.mean([dic['latency'] for dic in res]) 
dataset = "ten_vad_testset"
model_name = "silero-v5"
date = datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S')


In [14]:
res_silero_summary = [date, model_name, prec, recall, f1, dataset, num_params, size, latency]
dump_results(OUT_FILE, res_silero_summary)


## *TEN VAD*

In [15]:
#********************************TENVAD****************************************************8
ten_vad_preds_file = sorted(glob.glob("./ten_vad/testset/*pred.csv"), key=lambda x: int(re.search("\d+", x).group(0)))
print(ten_vad_preds_file)
ten_vad_pred = []
ten_vad_latency = []
for preds_file in ten_vad_preds_file:
    df = pd.read_csv(preds_file)
    preds = df.iloc[:, 1].tolist()
    latency = np.sum(df.iloc[:, 2].tolist())
    ten_vad_pred.append(preds)
    ten_vad_latency.append(latency)




['./ten_vad/testset/testset-audio-01pred.csv', './ten_vad/testset/testset-audio-02pred.csv', './ten_vad/testset/testset-audio-03pred.csv', './ten_vad/testset/testset-audio-04pred.csv', './ten_vad/testset/testset-audio-05pred.csv', './ten_vad/testset/testset-audio-06pred.csv', './ten_vad/testset/testset-audio-07pred.csv', './ten_vad/testset/testset-audio-08pred.csv', './ten_vad/testset/testset-audio-09pred.csv', './ten_vad/testset/testset-audio-10pred.csv', './ten_vad/testset/testset-audio-11pred.csv', './ten_vad/testset/testset-audio-12pred.csv', './ten_vad/testset/testset-audio-13pred.csv', './ten_vad/testset/testset-audio-14pred.csv', './ten_vad/testset/testset-audio-15pred.csv', './ten_vad/testset/testset-audio-16pred.csv', './ten_vad/testset/testset-audio-17pred.csv', './ten_vad/testset/testset-audio-18pred.csv', './ten_vad/testset/testset-audio-19pred.csv', './ten_vad/testset/testset-audio-20pred.csv', './ten_vad/testset/testset-audio-21pred.csv', './ten_vad/testset/testset-audio-

In [16]:
res_ten_vad = []
for pred, label, latency in zip(ten_vad_pred, y_trues, ten_vad_latency):
    len_pred, len_label = len(pred), len(label)
    min_len = min(len_pred, len_label)
    pred, label = pred[:min_len], label[:min_len]
    res_ten_vad.append(compute_metrics(np.array(label), np.array(pred), latency))

In [17]:
prec = np.mean([dic['prec'] for dic in res_ten_vad])
recall = np.mean([dic['recall'] for dic in res_ten_vad])
f1 = np.mean([dic['f1'] for dic in res_ten_vad])
latency = np.mean([dic['latency'] for dic in res_ten_vad])
dataset = "ten_vad_testset"
model_name = "ten_vad"
num_params = "NA"
size = "0.35"
date = datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S')

res_ten_vad_summary = [date, model_name, prec, recall, f1, dataset, num_params, size, latency]
dump_results(OUT_FILE, res_ten_vad_summary)

## PYANONOTE AUDIO 

In [18]:
# !pip install pyannote.audio

In [None]:
# 1. visit hf.co/pyannote/segmentation and accept user conditions
# 2. visit hf.co/settings/tokens to create an access token
# 3. instantiate pretrained voice activity detection pipeline

from pyannote.audio import Pipeline
pipeline = Pipeline.from_pretrained("pyannote/voice-activity-detection",
                                    use_auth_token=os.environ['HF_TOKEN'])
output = pipeline("en_example.wav")


/home/yehoshua/.pyenv/versions/3.11.9/lib/python3.11/site-packages/pytorch_lightning/utilities/migration/migration.py:208: You have multiple `ModelCheckpoint` callback states in this checkpoint, but we found state keys that would end up colliding with each other after an upgrade, which means we can't differentiate which of your checkpoint callbacks needs which states. At least one of your `ModelCheckpoint` callbacks will not be able to reload the state.
Lightning automatically upgraded your loaded checkpoint from v1.1.3 to v2.5.1.post0. To apply the upgrade to your files permanently, run `python -m pytorch_lightning.utilities.upgrade_checkpoint ../../.cache/torch/pyannote/models--pyannote--segmentation/snapshots/059e96f964841d40f1a5e755bb7223f76666bba4/pytorch_model.bin`


Model was trained with pyannote.audio 0.0.1, yours is 3.3.2. Bad things might happen unless you revert pyannote.audio to 0.x.
Model was trained with torch 1.7.1, yours is 2.7.0+cu126. Bad things might happen unless you revert torch to 1.x.


In [20]:
pyannote_vad_model = pipeline._segmentation.model
size = compute_model_size(pyannote_vad_model)
size

(5.891512, 1472878)

In [21]:
TMP_FILE = "tmp.csv"
os.makedirs("pyannote_results", exist_ok=True)
pyannote_preds = []
for wav_file in dic['wav']:
    output = pipeline(wav_file)
    with open(TMP_FILE, "w") as f:
        rttm = output.write_rttm(f)
    df = process_rttm(TMP_FILE)
    time_stamps = extract_start_end(df)
    _, labels, latency = compute_and_extract(wav_file, wav_file.replace(".wav", ".scv"))
    _, y_preds  = create_mask(labels, time_stamps)
    pyannote_preds.append(y_preds)

res_pyannote = []
for pred, label in zip(pyannote_preds, y_trues):
    len_pred, len_label = len(pred), len(label)
    min_len = min(len_pred, len_label)
    pred, label = pred[:min_len], label[:min_len]
    res_pyannote.append(compute_metrics(np.array(label), np.array(pred), "NA"))
    
    
prec = np.mean([dic['prec'] for dic in res_pyannote])
recall = np.mean([dic['recall'] for dic in res_pyannote])
f1 = np.mean([dic['f1'] for dic in res_ten_vad])
latency = "NA"
dataset = "ten_vad_testset"
model_name = "pyannote"
num_params = "1472878"
size = "5.891512"
date = datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S')

pyannote_summary = [date, model_name, prec, recall, f1, dataset, num_params, size, latency]
dump_results(OUT_FILE, pyannote_summary)

## TR-VAD

In [22]:
import csv 
tr_vad_preds = []
tr_vad_latencies = []
with open("tr_vad_preds.csv", "r") as f: 
    csv_reader = csv.reader(f)
    for line in csv_reader:
        tr_vad_preds.append([int(elem) for elem in line[2:]])
        tr_vad_latencies.append(float(line[1]))



res_tr_vad = []
for pred, label, latency in zip(tr_vad_preds, y_trues, tr_vad_latencies):
    len_pred, len_label = len(pred), len(label)
    min_len = min(len_pred, len_label)
    pred, label = pred[:min_len], label[:min_len]
    res_tr_vad.append(compute_metrics(np.array(label), np.array(pred), latency))
    
    
prec = np.mean([dic['prec'] for dic in res_tr_vad])
recall = np.mean([dic['recall'] for dic in res_tr_vad])
f1 = np.mean([dic['f1'] for dic in res_tr_vad])
latency = np.mean([dic['latency'] for dic in res_tr_vad])
dataset = "ten_vad_testset"
model_name = "tr_vad"
num_params = "376000"
size = "1.504"
date = datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S')

tr_vad_summary = [date, model_name, prec, recall, f1, dataset, num_params, size, latency]
dump_results(OUT_FILE, tr_vad_summary)

## SPEECHBRAIN: crdnn trained on libriparty

In [25]:
from speechbrain.inference.VAD import VAD

VAD = VAD.from_hparams(source="speechbrain/vad-crdnn-libriparty", savedir="pretrained_models/vad-crdnn-libriparty")
boundaries = VAD.get_speech_segments("speechbrain/vad-crdnn-libriparty/example_vad.wav")
speechbrain_preds = []
speechbrain_latencies = []
for wav_file in dic['wav']:
    start = time.time()
    outputs = VAD.get_speech_segments(wav_file)
    end = time.time()
    latency = end - start
    time_stamps = [{"start": t[0], "end":t[1]} for t in outputs]
    _, labels, latency = compute_and_extract(wav_file, wav_file.replace(".wav", ".scv"))
    _, y_preds  = create_mask(labels, time_stamps)
    speechbrain_preds.append(y_preds)
    speechbrain_latencies.append(latency)


In [27]:
speechbrain_vad = VAD
compute_model_size(speechbrain_vad)

(0.438976, 109744)

In [30]:
res_speechbrain = []
for pred, label, latency in zip(speechbrain_preds, y_trues, speechbrain_latencies):
    len_pred, len_label = len(pred), len(label)
    min_len = min(len_pred, len_label)
    pred, label = pred[:min_len], label[:min_len]
    res_speechbrain.append(compute_metrics(np.array(label), np.array(pred), latency))
    
    
prec = np.mean([dic['prec'] for dic in res_speechbrain])
recall = np.mean([dic['recall'] for dic in res_speechbrain])
f1 = np.mean([dic['f1'] for dic in res_speechbrain])
latency = np.mean([dic['latency'] for dic in res_speechbrain])
dataset = "ten_vad_testset"
model_name = "speechbrain"
num_params = "109744"
size = "0.438976"
date = datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S')

speechbrain_summary = [date, model_name, prec, recall, f1, dataset, num_params, size, latency]
dump_results(OUT_FILE, speechbrain_summary)

  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
