In [1]:
import os
import json
from pyannote.core import Segment
from pyannote.audio import Inference
from pyannote.audio.pipelines import VoiceActivityDetection
from sklearn.cluster import AgglomerativeClustering
from sklearn.decomposition import PCA
from collections import defaultdict
import numpy as np
from webvtt import WebVTT, Caption
from pydub import AudioSegment as PydubAudioSegment

# Initialize the speaker embedding model and VAD pipeline
AUTH_TOKEN = "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"
embedding_model = Inference("pyannote/embedding", use_auth_token=AUTH_TOKEN)
vad_pipeline = VoiceActivityDetection(segmentation="pyannote/segmentation", use_auth_token=AUTH_TOKEN)

def format_time(seconds):
    """
    Format time in seconds to HH:MM:SS.sss format.
    """
    hours = int(seconds // 3600)
    minutes = int((seconds % 3600) // 60)
    seconds = seconds % 60
    return f"{hours:02}:{minutes:02}:{seconds:06.3f}"

def extract_embeddings(audio_path):
    """
    Extract speaker embeddings from a given audio file.
    """
    embeddings = []
    segments = []
    vad = vad_pipeline(audio_path)
    audio = PydubAudioSegment.from_file(audio_path)
    duration = len(audio) / 1000.0
    for segment in vad.get_timeline().support():
        if segment.end > duration:
            segment = Segment(segment.start, duration)
        embedding = embedding_model.crop(audio_path, segment).data
        embeddings.append(embedding)
        segments.append(segment)
    if embeddings:
        return np.vstack(embeddings), segments
    else:
        return None, None

def process_directory(directory):
    """
    Process all .wav files in a given directory and extract speaker embeddings.
    """
    all_embeddings = []
    file_mapping = []
    segment_mapping = []
    for file in os.listdir(directory):
        if file.endswith(".wav"):
            audio_path = os.path.join(directory, file)
            print(f"Processing {audio_path}...")
            embeddings, segments = extract_embeddings(audio_path)
            if embeddings is not None:
                all_embeddings.append(embeddings)
                file_mapping.extend([(file, i) for i in range(len(embeddings))])
                segment_mapping.extend(segments)
    if all_embeddings:
        return np.vstack(all_embeddings), file_mapping, segment_mapping
    else:
        return None, [], []

def cluster_embeddings(embeddings, n_clusters=None, distance_threshold=None):
    """
    Cluster speaker embeddings using Agglomerative Clustering.
    """
    # Reduce dimensionality using PCA
    pca = PCA(n_components=50)
    reduced_embeddings = pca.fit_transform(embeddings)
    
    clustering = AgglomerativeClustering(n_clusters=n_clusters, distance_threshold=distance_threshold, metric='euclidean', linkage='ward')
    labels = clustering.fit_predict(reduced_embeddings)
    return labels

def parse_vtt(vtt_path):
    """
    Parse a VTT file and return a list of (start, end, text) tuples.
    """
    vtt_segments = []
    for caption in WebVTT().read(vtt_path):
        start = caption.start_in_seconds
        end = caption.end_in_seconds
        text = caption.text
        vtt_segments.append((start, end, text))
    return vtt_segments

def assign_speakers_to_vtt(vtt_segments, speaker_segments, labels):
    """
    Assign speaker labels to VTT segments based on diarized segments.
    """
    labeled_vtt_segments = []
    for start, end, text in vtt_segments:
        for (seg_start, seg_end), label in zip(speaker_segments, labels):
            if seg_start <= start <= seg_end or seg_start <= end <= seg_end:
                labeled_vtt_segments.append((start, end, text, label))
                break
    return labeled_vtt_segments

def save_labeled_vtt(labeled_segments, output_path):
    """
    Save labeled VTT segments to a new VTT file.
    """
    vtt = WebVTT()
    for start, end, text, label in labeled_segments:
        caption = Caption(
            start=format_time(start),
            end=format_time(end),
            text=f"Speaker {label}: {text}"
        )
        vtt.captions.append(caption)
    vtt.save(output_path)

def save_to_json(data, output_path):
    """
    Save data to a JSON file.
    """
    with open(output_path, 'w') as f:
        json.dump(data, f, indent=4)

if __name__ == "__main__":
    # Directory containing the .wav and .vtt files
    audio_directory = "audio/"

    # Output JSON file
    output_json = "unique_speaker_clusters.json"

    # Process the directory and extract embeddings
    embeddings, file_mapping, segment_mapping = process_directory(audio_directory)

    if embeddings is not None:
        # Cluster the embeddings to identify unique speakers
        # Specify the number of unique speakers
        n_unique_speakers = 5  # Replace with the actual number of unique speakers
        labels = cluster_embeddings(embeddings, n_clusters=n_unique_speakers)

        # Organize the clustering results
        speaker_clusters = defaultdict(list)
        for (file, segment_idx), label, segment in zip(file_mapping, labels, segment_mapping):
            speaker_clusters[f"Speaker {label}"].append({
                "file": file,
                "segment_index": segment_idx,
                "segment_start": segment.start,
                "segment_end": segment.end
            })

        # Save the clusters to a JSON file
        save_to_json(speaker_clusters, output_json)

        # Integrate with VTT files
        for file in os.listdir(audio_directory):
            if file.endswith(".vtt"):
                vtt_path = os.path.join(audio_directory, file)
                vtt_segments = parse_vtt(vtt_path)
                audio_file = file.replace('.vtt', '.wav')
                if os.path.exists(os.path.join(audio_directory, audio_file)):
                    labeled_vtt_segments = assign_speakers_to_vtt(vtt_segments, segment_mapping, labels)
                    output_vtt_path = os.path.join(audio_directory, f"{file}_labeled")
                    save_labeled_vtt(labeled_vtt_segments, output_vtt_path)
                    print(f"Labeled VTT saved to {output_vtt_path}")
    else:
        print("No embeddings were extracted from the audio files.")


  from .autonotebook import tqdm as notebook_tqdm
Lightning automatically upgraded your loaded checkpoint from v1.2.7 to v2.2.5. To apply the upgrade to your files permanently, run `python -m pytorch_lightning.utilities.upgrade_checkpoint C:\Users\lettu\.cache\torch\pyannote\models--pyannote--embedding\snapshots\4db4899737a38b2d618bbd74350915aa10293cb2\pytorch_model.bin`


Model was trained with pyannote.audio 0.0.1, yours is 3.2.0. Bad things might happen unless you revert pyannote.audio to 0.x.
Model was trained with torch 1.8.1+cu102, yours is 2.3.1+cu118. Bad things might happen unless you revert torch to 1.x.


Lightning automatically upgraded your loaded checkpoint from v1.5.4 to v2.2.5. To apply the upgrade to your files permanently, run `python -m pytorch_lightning.utilities.upgrade_checkpoint C:\Users\lettu\.cache\torch\pyannote\models--pyannote--segmentation\snapshots\660b9e20307a2b0cdb400d0f80aadc04a701fc54\pytorch_model.bin`


Model was trained with pyannote.audio 0.0.1, yours is 3.2.0. Bad things might happen unless you revert pyannote.audio to 0.x.
Model was trained with torch 1.10.0+cu102, yours is 2.3.1+cu118. Bad things might happen unless you revert torch to 1.x.
Processing audio/AdventureSP_001.wav...
Processing audio/AdventureSP_002.wav...
Processing audio/AdventureSP_003.wav...
Processing audio/AdventureSP_004.wav...
Processing audio/AdventureSP_005.wav...
Processing audio/AdventureSP_006.wav...
Processing audio/AdventureSP_006_Alt_01.wav...
Processing audio/AdventureSP_007.wav...
Processing audio/Ambient - Rubmle with gem.wav...
Processing audio/Ambient - Water dripping in cave.wav...
Processing audio/Annika fire.wav...
Processing audio/Bell Church -wrong stone.wav...
Processing audio/Bell Ring.wav...
Processing audio/BridgePieces_001.wav...
Processing audio/BridgePieces_002.wav...
Processing audio/BridgePieces_003.wav...
Processing audio/BridgePieces_004.wav...
Processing audio/BridgePieces_005.wa

In [4]:
import soundfile as sf
f = sf.SoundFile('audio/AdventureSP_001.wav')
print(f.format, f.subtype, f.endian)

# loop over the .wav files in the audio directory and resave them as PCM_16
for file in os.listdir(audio_directory):
    if file.endswith(".wav"):
        audio_path = os.path.join(audio_directory, file)
        audio_data, sample_rate = sf.read(audio_path)
        sf.write(audio_path, audio_data, sample_rate, subtype='PCM_16')


WAV MS_ADPCM FILE
