In [None]:
import os
import torch
import torchaudio
import pandas as pd
import numpy as np
from funasr import AutoModel
import soundfile as sf
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score, accuracy_score, classification_report
from speechbrain.inference.VAD import VAD
import seaborn as sns
from pyannote.core import Segment
from pyannote.audio import Pipeline

In [None]:
# silero

SAMPLING_RATE = 16000
torch.set_num_threads(1)

torch.hub.set_dir('../models/.cache')
model_silero, utils_silero = torch.hub.load(repo_or_dir='snakers4/silero-vad',
                              model='silero_vad',
                              force_reload=True,
                              onnx=False)

(get_speech_timestamps,
 save_audio,
 read_audio,
 VADIterator,
 collect_chunks) = utils_silero

 # pyannote
pipeline = Pipeline.from_pretrained (
        "pyannote/voice-activity-detection",
         use_auth_token="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", # Once while downloading the model
        cache_dir="../models/.cache"
        )

# speechbrain
vad = VAD.from_hparams(
        source="speechbrain/vad-crdnn-libriparty",
        savedir="../models/.cache"  # Save the model in a cache folder
)

# funasr
model_funasr = AutoModel(model="fsmn-vad", model_revision="v2.0.4", device="cuda")

In [3]:
import sys
sys.path.append("/home/sgeadmin/Saurabh_Kushwaha/vad/")

from helper import vad_inference_pyannote, print_timestamps_pyannote, run_vad_on_noisy_audio_pyannote, visualize_metrics_vs_SNR_pyannote
from helper import vad_inference_funasr, convert_to_timestamps_funasr, run_vad_on_noisy_audio_funasr, visualize_metrics_vs_SNR_funasr
from helper import vad_inference_silero, print_timestamps_silero, run_vad_on_noisy_audio_silero, visualize_metrics_vs_SNR_silero
from helper import vad_inference_speechbrain, print_timestamps_speechbrain, run_vad_on_noisy_audio_speechbrain, visualize_metrics_vs_SNR_speechbrain
from helper.vad import parse_annotations_file_bh, evaluate_vad, add_noise, save_audio, plot_SNR, extract_metrics, visualize_all_metrics, evaluate_vad_cmatrix, plot_confusion_matrices, get_file_paths, read_path, parse_annotations_file, average_metrics, show_vad_matrix_bh, save_results_to_csv, extract_speech_segments, count_continuous_zeros_after_start_segments, count_continuous_ones_after_end_segments, calculate_fec, calculate_msc, calculate_over, calculate_nds, save_results_to_csv1, show_vad_metrics_matrix1, save_multiple_speech_segments_as_text, extract_filenames_as_string

In [4]:
def get_filename(file_path):
    file_name = file_path.split('/')[-1]
    file_id = file_name.split('.')[0]
    return file_id

def parse_speech_segments(file_path):
    speech_segments = []
    with open(file_path, 'r') as file:
        for line in file:
            label, start_time, end_time = line.strip().split()
            if not label in ["!SIL"]:  # Only process lines where the label is 'S'

                
                speech_segments.append({
                    'speech': [round(float(start_time), 6), round(float(end_time), 6)]
                })
    return speech_segments

def generate_speech_segments_from_nonspeech_segments(nonspeech_segments, total_duration, margin=0.001):
    speech_segments = []
    current_time = 0.0

    for nonspeech in nonspeech_segments:
        nonspeech_start, nonspeech_end = nonspeech['nonspeech']

        if nonspeech_start > current_time:
            speech_segments.append({'speech': [round(current_time, 6), round(nonspeech_start - margin, 6)]})

        current_time = nonspeech_end + margin

    if current_time < total_duration:
        speech_segments.append({'speech': [round(current_time, 6), round(total_duration, 6)]})

    return speech_segments

def get_wav_duration(file_path):
    with wave.open(file_path, 'rb') as wav_file:
        n_frames = wav_file.getnframes()
        frame_rate = wav_file.getframerate()
        duration = n_frames / float(frame_rate)
        
        return duration

def extract_speech_segments_from_json(file_path, key):
    with open(file_path, 'r') as file:
        data = json.load(file)

    if key not in data:
        return []
        
    sil_value = data[key]["sil"]
    parsed_sil = eval(sil_value)
    output = [{'nonspeech': [float(num) for num in sublist]} for sublist in parsed_sil]

    wav_file = "/Users/saurabh/Documents/projects/Voice-Activity-Detection/data/bh_dataset/bh_audios/" + key + ".wav"
    total_duraction = get_wav_duration(wav_file)
    
    output = generate_speech_segments_from_nonspeech_segments(output, total_duraction)
    
    return output

def show_vad_metrics_matrix1(metrics_fec, metrics_msc, metrics_over, metrics_nds, flag):
    models = ['Pyannote', 'FunASR', 'Silero', 'SpeechBrain', 'ASRmodel', 'newmodel']
    metrics = ['FEC', 'MSC', 'OVER', 'NDS']
    
    combined_data = {metric: {model: [] for model in models} for metric in metrics}
    
    for model_name in models:
        combined_data['FEC'][model_name] = metrics_fec[model_name]
        combined_data['MSC'][model_name] = metrics_msc[model_name]
        combined_data['OVER'][model_name] = metrics_over[model_name]
        combined_data['NDS'][model_name] = metrics_nds[model_name]
    
    average_data = {metric: {model: np.mean(combined_data[metric][model]) for model in models} for metric in metrics}
    
    df_combined = pd.DataFrame(average_data).T
    
    if flag:
        print(df_combined)

    plt.figure(figsize=(12, 8))
    plt.title("VAD Metrics Comparison")
    sns.heatmap(df_combined, annot=True, cmap="YlGnBu", fmt=".3f")
    plt.xticks(rotation=45, ha='right')
    plt.yticks(rotation=0)
    plt.show()
    
def save_results_to_csv1(metrics_fec, metrics_msc, metrics_over, metrics_nds, model_names, output_file, label_paths):
    all_results = []
    num_files = len(metrics_fec[model_names[0]])  # Assumes all metrics have the same number of files

    for file_idx in range(num_files):
        file = label_paths[file_idx].split('.')[0].split('/')[-1]
        
        for model_name in model_names:
            fec_value = metrics_fec[model_name][file_idx]
            msc_value = metrics_msc[model_name][file_idx]
            over_value = metrics_over[model_name][file_idx]
            nds_value = metrics_nds[model_name][file_idx]

            result = {
                'model': model_name,
                'file index': file_idx,
                'audio file': file,
                'FEC': fec_value,
                'MSC': msc_value,
                'OVER': over_value,
                'NDS': nds_value
            }
            all_results.append(result)
    
    df = pd.DataFrame(all_results)
    df.to_csv(output_file, index=False)

In [5]:
wav_directory = "/Users/saurabh/Documents/projects/Voice-Activity-Detection/data/bh_dataset/188_samples/188_audio"
label_directory = "/Users/saurabh/Documents/projects/Voice-Activity-Detection/data/bh_dataset/188_samples/188_label_for_FE"
transcript_directory = "/Users/saurabh/Documents/projects/Voice-Activity-Detection/testing/188_trans"

_, label_paths = read_path(wav_directory, label_directory)
audio_paths, forced_paths = read_path(wav_directory, transcript_directory)

audio_paths.sort()
label_paths.sort()
forced_paths.sort()

annotated_segments = [parse_annotations_file_bh(label_path) for label_path in label_paths]

In [8]:
def filter_speech_segments(segments):
    speech_segments = [segment for segment in segments if 'speech' in segment]
    return speech_segments

def merge_speech_segments(speech_segments):
    if not speech_segments:
        return []

    start_time = speech_segments[0]['speech'][0]
    end_time = speech_segments[-1]['speech'][1]

    return [{'speech': [start_time, end_time]}]

In [9]:
res_silero = []
res_speechbrain = []

rr = []

In [10]:
for i in range(len(audio_paths)):
    silero = vad_inference_silero(audio_paths[i], model_silero, utils_silero, sampling_rate=SAMPLING_RATE)
    speechbrain = vad_inference_speechbrain(audio_paths[i], vad)

    silero = print_timestamps_silero(silero)
    speechbrain = print_timestamps_speechbrain(speechbrain)


    silero = merge_speech_segments(silero)
    speechbrain = merge_speech_segments(speechbrain)

    
    res_silero.append(silero)
    res_speechbrain.append(speechbrain)

In [11]:
rr.sort(reverse=True)

for index in rr:
    audio_paths.pop(index)
    label_paths.pop(index)
    annotated_segments.pop(index)

In [12]:
file_names = extract_filenames_as_string(label_paths)

save_multiple_speech_segments_as_text(res_silero, file_names, "/Users/saurabh/Documents/projects/Voice-Activity-Detection/data/bh_dataset/predicted_for_FE/silero")
save_multiple_speech_segments_as_text(res_speechbrain, file_names, "/Users/saurabh/Documents/projects/Voice-Activity-Detection/data/bh_dataset/predicted_for_FE/speechbrain")