In [None]:
!pip install -q scikit-learn
!pip install -q pyannote.core
!pip install -q pyannote.metrics
!pip install -q seaborn
!pip install -q speechbrain

[0m

In [None]:
import os
import torchaudio
from speechbrain.pretrained import EncoderClassifier
import numpy as np
import math
from sklearn.cluster import KMeans
from pyannote.core import Annotation, Segment

ModuleNotFoundError: No module named 'speechbrain.pretrained'

In [None]:
# Функція для генерації RTTM файлу
def generate_rttm(audio_path, reference_rttm_path, output_directory, classifier, fs):
    n_clusters = read_num_speakers_from_rttm(reference_rttm_path)
    signal, _ = torchaudio.load(audio_path)
    segment_length = fs
    total_samples = signal.shape[1]
    num_segments = math.ceil(total_samples / segment_length)
    embeddings = []


    for i in range(num_segments):
        start = i * segment_length
        end = min(start + segment_length, total_samples)
        segment_length_actual = end - start

        # Перевірка, чи сегмент має повну секунду
        if segment_length_actual != fs:
            continue  # Пропускаємо сегмент, якщо він коротший за секунду

        segment = signal[:, start:end]

        embedding = classifier.encode_batch(segment)
        embeddings.append(embedding.squeeze().cpu().detach().numpy())


    embeddings_array = np.array(embeddings)
    kmeans = KMeans(n_clusters=n_clusters, n_init=10)
    kmeans.fit(embeddings_array)
    labels = kmeans.labels_

    hypothesis = Annotation()
    for i, label in enumerate(labels):
        start = i
        hypothesis[Segment(start, start + 1)] = f"spk{label}"

    file_id = os.path.basename(audio_path).split('.')[0]
    save_rttm(hypothesis, file_id, output_directory)

In [None]:
def save_rttm(annotation, file_id, output_directory):
    if not os.path.exists(output_directory):
        os.makedirs(output_directory)

    file_path = os.path.join(output_directory, f"{file_id}_hypothesis.rttm")
    if not os.path.exists(file_path):
        with open(file_path, "w") as file:
            for segment, _, label in annotation.itertracks(yield_label=True):
                start = segment.start
                duration = segment.duration
                file.write(f"SPEAKER {file_id} 1 {start:.2f} {duration:.2f} <NA> <NA> {label} <NA> <NA>\n")

In [None]:
# Функція для завантаження RTTM файлу та визначення кількості доповідачів
def read_num_speakers_from_rttm(rttm_file_path):
    speakers = set()
    with open(rttm_file_path, 'r') as file:
        for line in file:
            parts = line.strip().split()
            speaker_id = parts[7]
            speakers.add(speaker_id)
    return len(speakers)

In [None]:
# Ініціалізація моделі
classifier = EncoderClassifier.from_hparams(source="speechbrain/spkrec-ecapa-voxceleb", savedir="speechbrain", run_opts={"device":"cuda"})

wav_dir = "voxconverse_test_wav"
rttm_dir = "voxconverse_test_rttm"
output_directory = "speechbrain_output_directory"

wav_files = [f for f in os.listdir(wav_dir) if f.endswith('.wav')]
audio_files = [os.path.join(wav_dir, f) for f in wav_files]
reference_rttm_files = [os.path.join(rttm_dir, f.replace('.wav', '.rttm')) for f in wav_files]

wav_files = wav_files[42:]
audio_files = audio_files[42:]
reference_rttm_files = reference_rttm_files[42:]

for audio_path, rttm_path in zip(audio_files, reference_rttm_files):
    print(rttm_path)
    generate_rttm(audio_path, rttm_path, output_directory, classifier, 16000)

In [None]:
import pandas as pd
from pyannote.metrics.diarization import DiarizationErrorRate, JaccardErrorRate
from pyannote.metrics.identification import IdentificationErrorRate

def load_rttm(file_path):
    annotation = Annotation()
    with open(file_path, 'r') as file:
        for line in file:
            parts = line.strip().split()
            start = float(parts[3])
            duration = float(parts[4])
            label = parts[7]
            annotation[Segment(start, start + duration)] = label
    return annotation

rttm_folder = 'voxconverse_test_rttm'  # Шлях до папки з RTTM файлами
file_names = []

# Перебір всіх файлів у папці
for file in os.listdir(rttm_folder):
    if file.endswith('.rttm'):
        # Додавання назви файлу без розширення .rttm до списку
        file_names.append(os.path.splitext(file)[0])

# Створення екземпляра DiarizationErrorRate
der_metric = DiarizationErrorRate(collar=0.5)
jer_metric = JaccardErrorRate(collar=0.5)
ier_metric = IdentificationErrorRate(collar=0.5)

# Масив для зберігання значень DER для кожної пари
error_rates_der = []
error_rates_jer = []
error_rates_ier = []

for file_name in file_names:
    reference_path = f'voxconverse_test_rttm/{file_name}.rttm'
    hypothesis_path = f'speechbrain_output_directory/{file_name}_hypothesis.rttm'

    reference_annotation = load_rttm(reference_path)
    hypothesis_annotation = load_rttm(hypothesis_path)

    # Calculate metrics
    der = der_metric(reference_annotation, hypothesis_annotation)
    jer = jer_metric(reference_annotation, hypothesis_annotation)
    ier = ier_metric(reference_annotation, hypothesis_annotation)

    error_rates_der.append(der)
    error_rates_jer.append(jer)
    error_rates_ier.append(ier)

# Розрахунок середнього DER
average_der = sum(error_rates_der) / len(error_rates_der)
print(f"Average Diarization Error Rate: {average_der:.2f}")

average_jer = sum(error_rates_jer) / len(error_rates_jer)
print(f"Average Jaccard Error Rate: {average_jer:.2f}")

average_ier = sum(error_rates_ier) / len(error_rates_ier)
print(f"Average Insertion Error Rate: {average_ier:.2f}")

df_result = pd.DataFrame([{
    'Python Library': 'SpeechBrain',
    'Average Diarization Error Rate': average_der,
    'Average Jaccard Error Rate': average_jer,
    'Average Insertion Error Rate': average_ier,
}])

display(df_result)

Average Diarization Error Rate: 0.31
Average Jaccard Error Rate: 0.41
Average Insertion Error Rate: 1.07


Unnamed: 0,Python Library,Average Diarization Error Rate,Average Jaccard Error Rate,Average Insertion Error Rate
0,SpeechBrain,0.308005,0.406106,1.067025
