# Whisper.cpp Transcription and Speaker Diarization for Kaggle

This notebook demonstrates how to use [whisper.cpp](https://github.com/ggml-org/whisper.cpp) for offline speech recognition and speaker diarization. It provides functionality similar to the FastAPI service from the original project but adapted for Kaggle notebooks.

## Features

- **Multiple Models**: Support for various model sizes (tiny, base, small, medium, large)
- **Diarization**: Speaker segmentation using pyannote.audio
- **Offline Operation**: All processing happens locally
- **On-demand Model Downloads**: Download models as needed
- **Support for Various Audio Formats**: Through FFmpeg integration

## Setup

Let's start by installing the necessary dependencies.

In [None]:
# Install required packages
!pip install -q ffmpeg-python numpy torch tqdm requests python-dotenv kagglehub

In [None]:
# Check for GPU availability for PyTorch
import torch

print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA device: {torch.cuda.get_device_name(0)}")

## Install Whisper.cpp

We need to build whisper.cpp from source. Let's clone the repository and compile it.

In [None]:
# Clone whisper.cpp repository
!git clone https://github.com/ggml-org/whisper.cpp.git

# Build whisper.cpp
%cd whisper.cpp
!make
%cd ..

## Install Speaker Diarization Tools

For speaker diarization, we'll use pyannote.audio. This requires a Hugging Face token.

In [None]:
# Install pyannote.audio for speaker diarization
!pip install -q pyannote.audio

In [None]:
# Set up Hugging Face token for pyannote.audio
import os

# You'll need to provide your Hugging Face token
# Get one from: https://huggingface.co/settings/tokens
# And accept the user agreement for: https://huggingface.co/pyannote/speaker-diarization-3.1

HF_TOKEN = "" # Replace with your token

# Uncomment below to set your token
# HF_TOKEN = input("Enter your Hugging Face token: ")
# os.environ["HF_TOKEN"] = HF_TOKEN

## Project Structure

Now let's set up the project structure with the necessary modules. We'll implement:

1. Configuration
2. Dataset download using kagglehub
3. Transcription service
4. Speaker diarization

### 1. Configuration

In [None]:
import logging
import os
from pathlib import Path

# Set up logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)

# Define project root directory
ROOT_DIR = Path(os.getcwd())
logger.info(f"ROOT_DIR is set to: {ROOT_DIR}")

# Whisper.cpp configuration
WHISPER_BIN_PATH = ROOT_DIR / "whisper.cpp" / "main"
DEFAULT_MODEL = "small.en"

# Directories
MODELS_DIR = ROOT_DIR / "models"
TEMP_UPLOAD_DIR = ROOT_DIR / "temp_uploads"
DATASET_DIR = ROOT_DIR / "dataset"

# Ensure required directories exist
def ensure_directories():
    """Create required directories if they don't exist."""
    for directory in [MODELS_DIR, TEMP_UPLOAD_DIR, DATASET_DIR]:
        if not directory.exists():
            directory.mkdir(parents=True, exist_ok=True)
            logger.info(f"Created directory: {directory}")

# Initialize directories
ensure_directories()

# Log configuration
logger.info(f"WHISPER_BIN_PATH: {WHISPER_BIN_PATH}")
logger.info(f"DEFAULT_MODEL: {DEFAULT_MODEL}")
logger.info(f"MODELS_DIR: {MODELS_DIR}")
logger.info(f"TEMP_UPLOAD_DIR: {TEMP_UPLOAD_DIR}")
logger.info(f"DATASET_DIR: {DATASET_DIR}")
logger.info(f"HF_TOKEN: {'[SET]' if os.environ.get('HF_TOKEN') else '[NOT SET]'}")

### 2. Dataset Download with Kagglehub

Instead of managing model information directly, we'll use kagglehub to download a dataset with audio samples for testing transcription and diarization.

In [None]:
import kagglehub
import glob
from typing import Any, Dict, List, Optional, Union
import subprocess
import shutil
import requests

# Define the dataset to download
KAGGLE_DATASET = "wiradkp/mini-speech-diarization"  # This dataset contains audio files with multiple speakers

def download_kaggle_dataset():
    """Download a dataset from Kaggle using kagglehub"""
    logger.info(f"Downloading dataset from Kaggle: {KAGGLE_DATASET}")
    
    try:
        # Download the dataset
        dataset_path = kagglehub.dataset_download(KAGGLE_DATASET)
        logger.info(f"Dataset downloaded to: {dataset_path}")
        
        # Get list of audio files in the dataset
        audio_files = []
        for ext in ['*.wav', '*.mp3', '*.flac', '*.ogg']:
            audio_files.extend(glob.glob(os.path.join(dataset_path, '**', ext), recursive=True))
        
        logger.info(f"Found {len(audio_files)} audio files in the dataset")
        return dataset_path, audio_files
    
    except Exception as e:
        logger.error(f"Failed to download Kaggle dataset: {str(e)}")
        raise

# Download whisper.cpp model
def download_whisper_model(model_name="small.en"):
    """Download a specific whisper.cpp model"""
    script_path = MODELS_DIR / "download-ggml-model.sh"
    
    # Download the model download script if not exists
    if not script_path.exists():
        logger.info(f"Downloading model download script to {script_path}")
        script_url = (
            f"https://raw.githubusercontent.com/ggml-org/whisper.cpp/"
            f"master/models/download-ggml-model.sh"
        )
        response = requests.get(script_url)
        response.raise_for_status()
        with open(script_path, "wb") as f:
            f.write(response.content)
        # Make the script executable
        script_path.chmod(0o755)
    
    model_file = MODELS_DIR / f"ggml-{model_name}.bin"
    
    if model_file.exists():
        logger.info(f"Model {model_name} already exists at {model_file}")
        return model_file
    
    logger.info(f"Downloading model {model_name}...")
    try:
        # Execute the download script
        result = subprocess.run(
            [str(script_path), model_name, str(MODELS_DIR)],
            check=True,
            capture_output=True,
            text=True,
        )
        logger.info(f"Download output: {result.stdout}")
        
        # Double-check the file exists after download
        if not model_file.exists():
            logger.error(f"Download script completed but model file not found at {model_file}")
            raise RuntimeError(f"Model file not found after download: {model_file}")
        
        return model_file
    except Exception as e:
        logger.error(f"Error downloading model: {str(e)}")
        raise

# Download the dataset
try:
    dataset_path, audio_files = download_kaggle_dataset()
    print(f"Dataset downloaded to: {dataset_path}")
    
    if audio_files:
        print(f"Found {len(audio_files)} audio files. First few files:")
        for file in audio_files[:5]:
            print(f"- {os.path.basename(file)}")
    else:
        print("No audio files found in the dataset. Downloading a sample file...")
        !wget -q -O sample.mp3 "https://github.com/openai/whisper/raw/main/sample-en.mp3"
        audio_files = [str(ROOT_DIR / "sample.mp3")]
        print(f"Downloaded sample audio file: {audio_files[0]}")
except Exception as e:
    print(f"Error downloading dataset: {e}")
    # Create a fallback in case dataset download fails
    print("Using a sample audio file as fallback...")
    !wget -q -O sample.mp3 "https://github.com/openai/whisper/raw/main/sample-en.mp3"
    audio_files = [str(ROOT_DIR / "sample.mp3")]
    print(f"Downloaded sample audio file: {audio_files[0]}")

# Download the small model for our demonstration
try:
    model_path = download_whisper_model("small.en")
    print(f"Downloaded whisper model: small.en to {model_path}")
except Exception as e:
    print(f"Error downloading model: {e}")
    # Try downloading a smaller model as fallback
    try:
        model_path = download_whisper_model("tiny.en")
        print(f"Downloaded fallback model: tiny.en to {model_path}")
    except Exception as e2:
        print(f"Error downloading fallback model: {e2}")
        model_path = None

### 3. Speaker Diarization Implementation

Now we'll implement the speaker diarization service using pyannote.audio.

In [None]:
class SpeakerDiarizationService:
    def __init__(self, hf_token: Optional[str] = None):
        """
        Initialize the speaker diarization service using pyannote/speaker-diarization.

        Args:
            hf_token: Hugging Face access token (required to use the pyannote models)
        """
        self.pipeline = None
        self.hf_token = hf_token if hf_token is not None else os.environ.get("HF_TOKEN")
        self._initialized = False

        if not self.hf_token:
            logger.warning(
                "No Hugging Face token provided for speaker diarization. "
                "Set HF_TOKEN environment variable or provide token in constructor."
            )

    def initialize(self):
        """Initialize the pyannote speaker diarization pipeline"""
        if self._initialized:
            return

        logger.info("Initializing pyannote speaker diarization pipeline")
        try:
            # Import pyannote.audio here to avoid issues if not installed
            from pyannote.audio import Pipeline
            
            # Load the speaker diarization model from Hugging Face
            self.pipeline = Pipeline.from_pretrained(
                "pyannote/speaker-diarization-3.1",
                use_auth_token=self.hf_token,
            )

            # Use CUDA if available
            if torch.cuda.is_available() and self.pipeline is not None:
                logger.info("Using CUDA for speaker diarization")
                self.pipeline = self.pipeline.to(torch.device("cuda"))

            self._initialized = True
            logger.info("Speaker diarization pipeline initialized successfully")
        except Exception as e:
            logger.error(f"Failed to initialize speaker diarization pipeline: {e}")
            raise

    def diarize(
        self,
        audio_path: Union[str, Path],
        num_speakers: Optional[int] = None,
        min_speakers: Optional[int] = None,
        max_speakers: Optional[int] = None,
    ) -> Dict:
        """
        Perform speaker diarization on an audio file.

        Args:
            audio_path: Path to the audio file
            num_speakers: Exact number of speakers in the audio (if known)
            min_speakers: Minimum number of speakers expected
            max_speakers: Maximum number of speakers expected

        Returns:
            A dictionary of diarization results
        """
        if not self._initialized:
            self.initialize()

        try:
            audio_path = str(audio_path) if isinstance(audio_path, Path) else audio_path

            # Prepare input for pyannote
            file = {"uri": "audio", "audio": audio_path}

            # Set speaker count constraints if provided
            diarization_params = {}
            if num_speakers is not None:
                diarization_params["num_speakers"] = num_speakers
            else:
                if min_speakers is not None:
                    diarization_params["min_speakers"] = min_speakers
                if max_speakers is not None:
                    diarization_params["max_speakers"] = max_speakers

            # Apply diarization
            if self.pipeline is None:
                raise ValueError("Diarization pipeline not initialized properly")
            diarization = self.pipeline(file, **diarization_params)

            # Convert pyannote Annotation to a more usable format
            results = self._process_diarization(diarization)

            return results

        except Exception as e:
            logger.error(f"Speaker diarization failed: {e}")
            raise

    def _process_diarization(self, diarization):
        """
        Process diarization results to a usable format.

        Args:
            diarization: PyAnnote diarization result

        Returns:
            Dictionary with processed diarization segments
        """
        segments = []

        # Process each segment from the diarization result
        for segment, track, speaker in diarization.itertracks(yield_label=True):
            segments.append(
                {
                    "speaker": speaker,
                    "start": segment.start,
                    "end": segment.end,
                    "duration": segment.end - segment.start,
                }
            )

        # Sort segments by start time
        segments.sort(key=lambda s: s["start"])

        return {"segments": segments, "num_speakers": len(diarization.labels())}

    def align_diarization_with_transcription(
        self, diarization_result: Dict, transcription_segments: List[Dict]
    ) -> List[Dict]:
        """
        Align speaker diarization results with whisper transcription segments.

        Args:
            diarization_result: Output from diarize() method
            transcription_segments: List of segments from whisper transcription

        Returns:
            List of transcription segments with speaker labels
        """
        if not diarization_result or not transcription_segments:
            return transcription_segments

        diarization_segments = diarization_result["segments"]

        # Create a function to find the best speaker for a given time range
        def get_speaker_for_segment(start: float, end: float) -> str:
            # Find overlapping diarization segments
            overlaps = []
            for segment in diarization_segments:
                overlap_start = max(segment["start"], start)
                overlap_end = min(segment["end"], end)

                if overlap_end > overlap_start:  # There is an overlap
                    overlap_duration = overlap_end - overlap_start
                    overlaps.append((segment["speaker"], overlap_duration))

            if not overlaps:
                return "UNKNOWN"

            # Return the speaker with the most overlap
            overlaps.sort(key=lambda x: x[1], reverse=True)
            return overlaps[0][0]

        # Assign speakers to transcription segments
        for segment in transcription_segments:
            start = segment.get("start", segment.get("t0"))
            end = segment.get("end", segment.get("t1"))
            if start is not None and end is not None:
                segment["speaker"] = get_speaker_for_segment(start, end)

        return transcription_segments

### 4. Transcription Service Implementation

Now we'll implement the transcription service that integrates whisper.cpp with our speaker diarization.

In [None]:
import json
import re
import ffmpeg

class TranscriptionService:
    def __init__(self, model_path, whisper_bin=WHISPER_BIN_PATH, temp_dir=None, hf_token=None):
        """
        Initialize the transcription service

        Args:
            model_path: Path to the whisper model file
            whisper_bin: Path to the whisper binary
            temp_dir: Directory to store temporary files
            hf_token: Hugging Face API token for accessing pyannote models
        """
        self.model_path = Path(model_path)
        self.whisper_bin = whisper_bin
        self.temp_dir = Path(temp_dir) if temp_dir is not None else TEMP_UPLOAD_DIR
        self.hf_token = hf_token

        # Initialize speaker diarization service if available
        self.diarization_service = None
        try:
            self.diarization_service = SpeakerDiarizationService(hf_token=hf_token)
            logging.info("Initialized pyannote speaker diarization service")
        except Exception as e:
            logging.error(f"Failed to initialize speaker diarization service: {e}")

        if not self.temp_dir.exists():
            self.temp_dir.mkdir(parents=True)

        if not self.model_path.exists():
            raise FileNotFoundError(f"Model file not found at {model_path}")

    def convert_audio_to_wav(self, audio_path):
        """Convert audio file to 16-bit WAV format required by whisper.cpp"""
        output_path = self.temp_dir / f"{Path(audio_path).stem}_converted.wav"

        try:
            ffmpeg.input(audio_path).output(
                str(output_path), acodec="pcm_s16le", ar=16000, ac=1  # 16-bit PCM  # 16 kHz  # mono
            ).run(quiet=True, overwrite_output=True)

            logger.info(f"Converted {audio_path} to {output_path}")
            return output_path
        except ffmpeg.Error as e:
            logger.error(f"Error converting audio: {e.stderr.decode() if e.stderr else str(e)}")
            raise

    def transcribe(
        self,
        audio_path,
        enable_diarization=False,
        num_speakers=None,
        min_speakers=None,
        max_speakers=None,
        language="auto",
    ):
        """
        Transcribe audio file using whisper.cpp

        Args:
            audio_path: Path to the audio file
            enable_diarization: Whether to enable speaker diarization
            num_speakers: Exact number of speakers (optional)
            min_speakers: Minimum number of speakers (optional)
            max_speakers: Maximum number of speakers (optional)
            language: Language code or "auto" for detection

        Returns:
            A dictionary with the transcription results
        """
        wav_path = None
        try:
            wav_path = self.convert_audio_to_wav(audio_path)

            # Prepare command
            cmd = [
                str(self.whisper_bin),
                "-m",
                str(self.model_path),
                "-f",
                str(wav_path),
                "-oj",  # Output JSON
                "-l",
                language,  # Use specified language or auto-detect
            ]

            cmd_str = " ".join(cmd)
            logger.info(f"Running command: {cmd_str}")

            try:
                result = subprocess.run(cmd, check=False, capture_output=True, text=True)
            except FileNotFoundError:
                logger.error(f"Error: whisper binary not found at '{self.whisper_bin}'")
                return {"error": "whisper binary not found. Please ensure whisper.cpp is built properly."}

            # Get the output
            output = result.stdout.strip()
            logger.debug(f"Raw output: {output}")

            if not output:
                logger.error("No output from transcription command")
                return {"error": "No output from transcription command: " + result.stderr}

            try:
                # Try to parse as JSON first (if -oj flag worked as expected)
                transcription_result = json.loads(output)
                
                # Apply pyannote diarization if requested and available
                if enable_diarization and self.diarization_service is not None:
                    logger.info("Applying pyannote speaker diarization")
                    try:
                        diarization_result = self.diarization_service.diarize(
                            audio_path=wav_path,
                            num_speakers=num_speakers,
                            min_speakers=min_speakers,
                            max_speakers=max_speakers,
                        )

                        # Align diarization with transcription segments
                        if "segments" in transcription_result:
                            # Convert segment timestamps to seconds if needed
                            for segment in transcription_result["segments"]:
                                if "start" not in segment and "t0" in segment:
                                    segment["start"] = segment["t0"]
                                if "end" not in segment and "t1" in segment:
                                    segment["end"] = segment["t1"]

                            # Add speaker labels to segments
                            transcription_result[
                                "segments"
                            ] = self.diarization_service.align_diarization_with_transcription(
                                diarization_result=diarization_result,
                                transcription_segments=transcription_result["segments"],
                            )

                            # Add diarization metadata
                            transcription_result["diarization"] = {
                                "num_speakers": diarization_result["num_speakers"],
                                "method": "pyannote",
                            }

                            # Format the full text with speaker labels for better readability
                            if "text" in transcription_result:
                                speaker_texts = []
                                for segment in transcription_result["segments"]:
                                    if "speaker" in segment and "text" in segment:
                                        speaker_texts.append(
                                            f"{segment['speaker']}: {segment['text']}"
                                        )

                                # Update the full text to include speaker information
                                transcription_result["text_with_speakers"] = "\n".join(speaker_texts)
                    except Exception as e:
                        logger.error(f"Error during pyannote diarization: {e}")

                return transcription_result
            except json.JSONDecodeError:
                logger.info("Output is not JSON format, parsing as text")
                
                # Simple fallback for non-JSON output
                return {"text": output, "error": "Failed to parse JSON output"}

        except subprocess.CalledProcessError as e:
            error_message = e.stderr if e.stderr else str(e)
            logger.error(f"Transcription failed: {error_message}")
            return {"error": f"Transcription failed: {error_message}"}
        finally:
            # Clean up temporary WAV file
            if wav_path and os.path.exists(wav_path):
                os.unlink(wav_path)

### 5. Process Audio Files from the Dataset

Now we'll process a sample audio file from the dataset, applying both transcription and speaker diarization.

In [None]:
# Initialize transcription service with the small model
if 'model_path' in locals() and model_path and 'audio_files' in locals() and audio_files:
    # Set HF_TOKEN for pyannote.audio (if you have one)
    if not HF_TOKEN:
        print("WARNING: No HF_TOKEN provided. Speaker diarization may not work properly.")
        print("Consider providing a Hugging Face token for full functionality.")
    
    transcription_service = TranscriptionService(
        model_path=model_path,
        whisper_bin=WHISPER_BIN_PATH,
        hf_token=HF_TOKEN,
    )
    
    # Process a sample audio file from the dataset
    sample_file = audio_files[0]  # Use the first audio file
    print(f"Processing audio file: {os.path.basename(sample_file)}")
    
    try:
        # Transcribe with speaker diarization
        result = transcription_service.transcribe(
            audio_path=sample_file,
            enable_diarization=True,
            num_speakers=None,  # Let pyannote automatically detect number of speakers
            language="auto",    # Auto-detect language
        )
        
        # Display results
        print("\n=== Transcription Results ===")
        
        if "error" in result:
            print(f"Error: {result['error']}")
        else:
            print("\nFull transcription:")
            print(result.get("text", "No text found"))
            
            print("\nTranscription with speaker labels:")
            if "text_with_speakers" in result:
                print(result["text_with_speakers"])
            else:
                print("No speaker-labeled text available")
            
            print("\nDetailed segments:")
            for i, segment in enumerate(result.get("segments", [])[:10]):  # Show first 10 segments
                start = segment.get("start", segment.get("t0", 0))
                end = segment.get("end", segment.get("t1", 0))
                speaker = segment.get("speaker", "Unknown")
                print(f"{i+1}. [{start:.2f}s - {end:.2f}s] Speaker {speaker}: {segment.get('text', '')}")
            
            if len(result.get("segments", [])) > 10:
                print(f"... and {len(result.get('segments', [])) - 10} more segments")
            
            if "diarization" in result:
                print(f"\nDetected {result['diarization'].get('num_speakers', 0)} speakers")
    
    except Exception as e:
        print(f"Error processing audio file: {e}")
else:
    print("Cannot process audio files: Either no model or no audio files available")

### 6. Visualize Diarization Results

Let's create a visualization of the diarization results to better understand the speaker segmentation.

In [None]:
# Visualize the diarization results
if 'result' in locals() and 'segments' in result and not "error" in result:
    try:
        import matplotlib.pyplot as plt
        import numpy as np
        from matplotlib.patches import Rectangle
        
        # Extract speakers and segments
        speakers = set()
        for segment in result['segments']:
            if 'speaker' in segment:
                speakers.add(segment['speaker'])
        
        # Convert speaker labels to numeric IDs for plotting
        speaker_ids = {speaker: i for i, speaker in enumerate(sorted(speakers))}
        
        # Create plot
        fig, ax = plt.subplots(figsize=(15, 5))
        
        # Plot each segment
        for segment in result['segments']:
            if 'speaker' in segment and 'start' in segment and 'end' in segment:
                speaker = segment['speaker']
                start = segment['start']
                end = segment['end']
                duration = end - start
                speaker_id = speaker_ids[speaker]
                
                # Plot rectangle for this segment
                rect = Rectangle((start, speaker_id - 0.4), duration, 0.8, 
                                alpha=0.6, color=f'C{speaker_id % 10}')
                ax.add_patch(rect)
                
                # Add text
                if duration > 1.0:  # Only add text for segments long enough to be readable
                    ax.text(start + duration/2, speaker_id, segment['text'][:20] + ('...' if len(segment['text']) > 20 else ''), 
                           ha='center', va='center', fontsize=8)
        
        # Set labels and title
        ax.set_yticks(list(speaker_ids.values()))
        ax.set_yticklabels(list(speaker_ids.keys()))
        ax.set_xlabel('Time (seconds)')
        ax.set_ylabel('Speaker')
        ax.set_title('Speaker Diarization Results')
        
        # Set x-axis limit to match the audio duration
        max_time = max([segment.get('end', segment.get('t1', 0)) for segment in result['segments']])
        ax.set_xlim(0, max_time)
        
        # Set y-axis limit
        ax.set_ylim(-1, len(speakers))
        
        plt.tight_layout()
        plt.show()
        
        # Create a text visualization too
        print("\n=== Timeline Visualization ===")
        print("Time (s) | Speaker | Text")
        print("-" * 80)
        
        for segment in sorted(result['segments'], key=lambda s: s.get('start', s.get('t0', 0))):
            start = segment.get('start', segment.get('t0', 0))
            end = segment.get('end', segment.get('t1', 0))
            speaker = segment.get('speaker', 'Unknown')
            text = segment.get('text', '')
            print(f"{start:7.2f} - {end:5.2f} | {speaker:7} | {text}")
            
    except Exception as e:
        print(f"Error creating visualization: {e}")

## Conclusion

This notebook demonstrates how to use whisper.cpp for offline transcription combined with pyannote.audio for speaker diarization. By downloading audio samples from a Kaggle dataset, we can test the full pipeline without needing to upload our own files.

Key features implemented:

1. **whisper.cpp integration**: Fast, offline speech recognition
2. **Speaker diarization**: Using pyannote.audio's state-of-the-art speaker segmentation
3. **Kaggle dataset integration**: Easy access to test audio files
4. **Visualization**: Both text and graphical representation of speaker turns

To adapt this for your own use:

- Try different whisper.cpp models (tiny, base, small, medium, large) based on your accuracy needs
- Experiment with speaker count constraints if you know how many speakers to expect
- Use your own audio files by replacing the sample file paths