

# optimized code with the .ann database


took about 2 mins to install modules



In [None]:
pip install torch torchaudio speechbrain annoy

Collecting speechbrain
  Downloading speechbrain-1.0.2-py3-none-any.whl.metadata (23 kB)
Collecting annoy
  Downloading annoy-1.17.3.tar.gz (647 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m647.5/647.5 kB[0m [31m10.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  

In [None]:
import os
import time
import logging
import numpy as np
import torchaudio
import torch
import pickle
import annoy
from speechbrain.pretrained import SpeakerRecognition

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Check for GPU availability
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logging.info(f"Using device: {device}")

def load_model():
    """Load the SpeechBrain model with optimization"""
    start_time = time.time()
    model = SpeakerRecognition.from_hparams(
        source="speechbrain/spkrec-ecapa-voxceleb",
        savedir="pretrained_models/spkrec-ecapa-voxceleb",
        run_opts={"device": device}
    )
    logging.info(f"Model loaded in {time.time() - start_time:.2f}s")
    return model

def load_vector_database(db_path, id_to_name_path=None):
    start_time = time.time()

    # If there's a separate ID to name mapping file, load it
    id_to_name = {}
    if id_to_name_path and os.path.exists(id_to_name_path):
        with open(id_to_name_path, 'rb') as f:
            id_to_name = pickle.load(f)
        logging.info(f"Loaded ID to name mapping with {len(id_to_name)} entries")

    # Load the Annoy index
    try:
        # Default dimension for ECAPA-TDNN model
        embedding_dim = 192

        # Check if there's a metadata file that contains the dimension
        metadata_path = db_path.replace('.ann', '_metadata.pkl')
        if os.path.exists(metadata_path):
            with open(metadata_path, 'rb') as f:
                metadata = pickle.load(f)
                if 'embedding_dim' in metadata:
                    embedding_dim = metadata['embedding_dim']
                if 'id_to_name' in metadata and not id_to_name:
                    id_to_name = metadata['id_to_name']

        # Load the index
        index = annoy.AnnoyIndex(embedding_dim, 'angular')
        index.load(db_path)

        logging.info(f"Loaded vector database with {index.get_n_items()} embeddings in {time.time() - start_time:.2f}s")
        return index, id_to_name
    except Exception as e:
        logging.error(f"Error loading vector database: {e}")
        return None, {}

def load_and_preprocess_audio(file_path, target_sr=16000):
    """Optimized audio loading and preprocessing"""
    try:
        # Check if file exists
        if not os.path.isfile(file_path):
            logging.error(f"File does not exist: {file_path}")
            return None

        # Load audio with torchaudio
        audio, orig_sr = torchaudio.load(file_path)

        # Move to device for faster processing
        audio = audio.to(device)

        # Resample if needed
        if orig_sr != target_sr:
            transform = torchaudio.transforms.Resample(orig_sr, target_sr).to(device)
            audio = transform(audio)

        # Convert to mono
        if audio.shape[0] > 1:
            audio = torch.mean(audio, dim=0, keepdim=True)

        # Normalize audio (RMS)
        audio = audio / torch.sqrt(torch.mean(audio**2) + 1e-8)

        return audio
    except Exception as e:
        logging.error(f"Error processing {file_path}: {e}")
        return None

def extract_voice_embedding(model, audio_file):
    """Extract embedding without caching"""
    start_time = time.time()

    try:
        # Load and preprocess audio
        audio = load_and_preprocess_audio(audio_file)
        if audio is None:
            return None

        # Ensure consistent length for processing
        # 3 seconds at 16kHz = 48000 samples
        target_length = 3 * 16000
        if audio.shape[1] > target_length:
            # Take the middle portion
            start = (audio.shape[1] - target_length) // 2
            audio = audio[:, start:start+target_length]
        elif audio.shape[1] < target_length:
            padding = torch.zeros(1, target_length - audio.shape[1], device=device)
            audio = torch.cat([audio, padding], dim=1)

        # Extract embedding
        with torch.no_grad():  # Disable gradient calculation for inference
            embedding = model.encode_batch(audio).squeeze().cpu().numpy()

        logging.info(f"Extracted embedding for {audio_file} in {time.time() - start_time:.2f}s")
        return embedding
    except Exception as e:
        logging.error(f"Error generating embedding for {audio_file}: {e}")
        return None

def query_vector_database(input_embedding, vector_db, id_to_name, top_n=5):
    """Query the ANN vector database for similar voice embeddings"""
    start_time = time.time()

    try:
        # ANN index, get top n items
        nearest_ids, distances = vector_db.get_nns_by_vector(
            input_embedding, top_n, include_distances=True)

        # Convert distances to similarity scores (ANN uses angular distance)
        # Convert from distance to similarity score (1 - normalized_distance)
        max_distance = 2.0  # Maximum angular distance
        similarities = [(1.0 - min(dist, max_distance) / max_distance) for dist in distances]

        # Map IDs to names
        results = []
        for i, (idx, similarity) in enumerate(zip(nearest_ids, similarities)):
            # Get name from mapping or use ID as string if not found
            name = id_to_name.get(idx, f"speaker_{idx}")
            results.append((name, float(similarity)))

        logging.info(f"Database query completed in {time.time() - start_time:.2f}s")
        return results
    except Exception as e:
        logging.error(f"Error querying vector database: {e}")
        return []

def process_speaker_recognition(input_file, vector_db_path, id_to_name_path=None):
    """Main function to process speaker recognition using pre-built vector database"""
    total_start_time = time.time()

    # Load model once
    model = load_model()

    # Load vector database
    vector_db, id_to_name = load_vector_database(vector_db_path, id_to_name_path)
    if vector_db is None:
        return [("Error", "Could not load vector database")]

    # Extract embedding for input file
    input_embedding = extract_voice_embedding(model, input_file)
    if input_embedding is None:
        return [("Error", "Could not generate input voice embedding")]

    # Compare with vector database
    results = query_vector_database(input_embedding, vector_db, id_to_name)

    logging.info(f"Total processing time: {time.time() - total_start_time:.2f}s")
    return results

def batch_process_test_files(test_folder, vector_db_path, id_to_name_path=None, expected_matches=None):
    """Process multiple test files and evaluate accuracy"""
    # Load model once
    model = load_model()

    # Load vector database once
    vector_db, id_to_name = load_vector_database(vector_db_path, id_to_name_path)
    if vector_db is None:
        return [("Error", "Could not load vector database")]

    results = {}
    correct_top1 = 0
    correct_top5 = 0
    total = 0

    for filename in os.listdir(test_folder):
        if filename.lower().endswith(('.wav', '.mp3', '.flac', '.ogg', '.m4a')):
            filepath = os.path.join(test_folder, filename)
            name = os.path.splitext(filename)[0]

            # Extract embedding
            embedding = extract_voice_embedding(model, filepath)
            if embedding is None:
                continue

            # Compare with references
            matches = query_vector_database(embedding, vector_db, id_to_name)
            results[name] = matches

            # Check accuracy if expected matches are provided
            if expected_matches and name in expected_matches:
                expected = expected_matches[name]
                if matches[0][0] == expected:
                    correct_top1 += 1
                if any(match[0] == expected for match in matches):
                    correct_top5 += 1
                total += 1

                logging.info(f"Test {name}: Expected {expected}, Got {[m[0] for m in matches]}")

    if total > 0:
        logging.info(f"Accuracy: Top-1: {correct_top1/total:.2f}, Top-5: {correct_top5/total:.2f}")

    return results

# Example usage
if __name__ == "__main__":
    input_voice_file = 'rahultest4.m4a'
    vector_db_path = 'vector_database.ann'

    # Optional: Path to the ID to name mapping file if stored separately
    id_to_name_path = 'speaker_id_mapping.pkl'  # Set to None if not available

    results = process_speaker_recognition(input_voice_file, vector_db_path, id_to_name_path)

    print("Top 5 Matches:")
    for name, score in results:
        print(f"Speaker: {name}, Similarity Score: {score:.4f}")

Top 5 Matches:
Speaker: speaker_11, Similarity Score: 0.5413
Speaker: speaker_7, Similarity Score: 0.4670
Speaker: speaker_4, Similarity Score: 0.4426
Speaker: speaker_5, Similarity Score: 0.4316
Speaker: speaker_18, Similarity Score: 0.4315
