# 0.0.0 WhisperX/Pyannote Transcription+Diarization Pipeline 

This Jupyter notebook is designed to test and evaluate a new Transcription and Diarization Pipeline with the following objectives:
1. Achieving word-level transcription accuracy to ensure detailed and precise text representation of the audio input.
2. Assessing diarization confidence levels to accurately attribute spoken segments to different speakers and measure the reliability of speaker identification.
3. Enhancing the alignment of transcriptions to be closer to natural sentence segments, thereby improving the readability and usability of the transcribed data.

The notebook leverages advanced transcription and diarization capabilities provided by the Whisper, WhisperX, and pyannote libraries. By using GPU acceleration, it processes audio data efficiently, performing alignment and diarization to produce structured outputs that are saved in CSV format for further analysis. The resources and installation instructions are included to facilitate the setup and execution of the pipeline.

Resources:
https://towardsdatascience.com/unlock-the-power-of-audio-data-advanced-transcription-and-diarization-with-whisper-whisperx-and-ed9424307281 

# 0.1 Setup
WhisperX documentation found here: https://github.com/m-bain/whisperX

1. Create Python environment
conda create -n whisperx-env python=3.9
conda activate whisperx-env

2. Install PyTorch https://pytorch.org/get-started/locally/ 
conda install pytorch torchvision torchaudio pytorch-cuda=12.1 -c pytorch -c nvidia

3. Install WhisperX repository
pip install git+https://github.com/m-bain/whisperx.git

4. Additional useful packages to install
pip install charset-normalizer
pip install pandas
pip install nltk
pip install numpy
pip install plotly
pip install matplotlib
pip install jupyter ipywidgets
pip install webvtt-py
pip install pypi-json
pip install srt

5. Create .env file at the same level as this notebook file with the following line
HF_TOKEN="REPLACEWITHHUGGINGFACETOKENHERE"

# 0.2 Check once to see if CUDA GPU is available and PyTorch is working properly

In [1]:
# Check if CUDA GPU is available to PyTorch
import torch                                                # PyTorch
torch.cuda.set_device(0)                                    # Set the main GPU as device to use if present
print(torch.__version__)
torch.cuda.is_available(),torch.cuda.get_device_name()      # Check if GPU is available and get the name of the GPU

2.5.1+cu121


(True, 'NVIDIA GeForce RTX 4060 Laptop GPU')

# 2.0 Start here by adjusting variables
1. choose batch size, compute type, whisper model, and file extension to transcribe

In [4]:
import os
from tkinter import Tk, filedialog
import pandas as pd
import warnings
import torch
import whisperx
import gc
import datetime
import json
import webvtt

# Suppress warnings
warnings.filterwarnings("ignore", category=UserWarning)

# Configuration
torch.cuda.set_device(0)  # Set GPU
device = "cuda" if torch.cuda.is_available() else "cpu"
batch_size = 16
compute_type = "float16"
hf_token = os.getenv('HF_TOKEN')
whisperx_model = "small.en"
extensions = ['.ogg', '.m4a', '.mp3']


# 3.0 Run after adjusting variables first

Just push run here. You shouldn't need to change anything here unless you want to output less or more file types. These are mostly functions which are then called at the end of the cell.

1. You should get a popup asking to choose the folder where the files are found (It will also search subfolders).

2. You should then get a popup asking for where the transcription files should be placed (It will replicate the folder structure in which they were found)

3. You will also see a popup asking if you want to anonymize with a pseudonyms.csv file, and if so where it is located.

4. You should then see an output similar to the following (just ignore the warnings):

Model was trained with pyannote.audio 0.0.1, yours is 3.1.1. Bad things might happen unless you revert pyannote.audio to 0.x.
Model was trained with torch 1.10.0+cu102, yours is 2.3.0+cu121. Bad things might happen unless you revert torch to 1.x.

5. When complete you will see where each were written and the folders where they were written to.


In [None]:
from tkinter import Tk, filedialog, messagebox

# Functions
def find_audio_files(base_dir, extensions):
    audio_files = []
    for root, _, files in os.walk(base_dir):
        for file in files:
            if any(file.endswith(ext) for ext in extensions):
                audio_files.append(os.path.join(root, file))
    return audio_files

def anonymize_text(text, pseudonym_dict):
    for real_name, pseudonym in pseudonym_dict.items():
        text = text.replace(real_name, pseudonym)
    return text

def save_transcripts(segments, output_dir, relative_path, pseudonym_dict=None):
    if pseudonym_dict:
        for segment in segments:
            segment['text'] = anonymize_text(segment['text'], pseudonym_dict)
    for i, segment in enumerate(segments):
        segment['sentence_number'] = i + 1
    df = pd.DataFrame(segments)
    df['text'] = df['text'].apply(lambda x: x.lstrip())
    cols = ['sentence_number'] + [col for col in df.columns if col != 'sentence_number']
    df = df[cols]

    os.makedirs(output_dir, exist_ok=True)
    base_filename = os.path.splitext(os.path.basename(relative_path))[0]
    csv_path = os.path.join(output_dir, f"{base_filename}_transcription.csv")
    df.to_csv(csv_path, index=False)

    with open(os.path.join(output_dir, f"{base_filename}_transcription.txt"), 'w', encoding='utf-8') as f:
        for segment in segments:
            f.write(f"{segment['text'].strip()}\n")

    json_path = os.path.join(output_dir, f"{base_filename}_transcription.json")
    with open(json_path, 'w', encoding='utf-8') as f:
        json.dump(segments, f, ensure_ascii=False, indent=4)

    vtt = webvtt.WebVTT()
    for segment in segments:
        caption = webvtt.Caption()
        caption.start = str(datetime.timedelta(seconds=segment['start']))
        caption.end = str(datetime.timedelta(seconds=segment['end']))
        caption.lines = [f"{segment['sentence_number']}: {segment['text'].strip()}"]
        vtt.captions.append(caption)
    vtt.save(os.path.join(output_dir, f"{base_filename}_transcription.vtt"))

def process_audio_file(audio_file, base_output_dir, relative_path, pseudonym_dict=None):
    try:
        print(f"Processing {audio_file}...")
        audio = whisperx.load_audio(audio_file)
        model = whisperx.load_model(whisperx_model, device, compute_type=compute_type)
        result = model.transcribe(audio, batch_size=batch_size)
        del model
        gc.collect()
        torch.cuda.empty_cache()

        model_a, metadata = whisperx.load_align_model(language_code=result["language"], device=device)
        result = whisperx.align(result["segments"], model_a, metadata, audio, device, return_char_alignments=False)
        del model_a
        gc.collect()
        torch.cuda.empty_cache()

        diarize_model = whisperx.DiarizationPipeline(use_auth_token=hf_token, device=device)
        diarize_segments = diarize_model(audio)
        result = whisperx.assign_word_speakers(diarize_segments, result)

        output_dir = os.path.join(base_output_dir, os.path.dirname(relative_path))
        save_transcripts(result["segments"], output_dir, relative_path, pseudonym_dict)
    except Exception as e:
        print(f"Error processing {audio_file}: {e}")

def main():
    # Initialize Tkinter
    root = Tk()
    root.withdraw()  # Hide the main window
    
    # Bring the root window to the front
    root.attributes('-topmost', True)

    # Popup for input folder
    input_folder = filedialog.askdirectory(title="Select Folder Containing Audio/Video Files")
    if not input_folder:
        print("No folder selected. Exiting.")
        return

    # Popup for output folder
    output_folder = filedialog.askdirectory(title="Select Folder to Save Transcriptions")
    if not output_folder:
        print("No output folder selected. Exiting.")
        return

    # Ask if a pseudonyms.csv file will be used
    use_pseudonyms = messagebox.askyesno("Pseudonyms", "Will you use a pseudonyms.csv file for to anonymize the transcripts?")
    pseudonym_dict = None

    if use_pseudonyms:
        pseudonyms_file = filedialog.askopenfilename(
            title="Select Pseudonyms CSV File",
            filetypes=[("CSV files", "*.csv")]
        )
        if not pseudonyms_file:
            print("No pseudonyms file selected. Continuing without pseudonymization.")
        else:
            # Load the pseudonyms file
            pseudonyms_df = pd.read_csv(pseudonyms_file)
            pseudonym_dict = dict(zip(pseudonyms_df['name'], pseudonyms_df['pseudonym']))
            print(f"Pseudonyms loaded from {pseudonyms_file}.")

    # Find and process audio files
    audio_files = find_audio_files(input_folder, extensions)
    print(f"Found {len(audio_files)} files to process.")

    for audio_file in audio_files:
        relative_path = os.path.relpath(audio_file, input_folder)
        process_audio_file(audio_file, output_folder, relative_path, pseudonym_dict)
        print(f"Processed {audio_file}")

    print("All files processed.")

if __name__ == "__main__":
    main()


## Complete code

In [3]:
import os
from tkinter import Tk, filedialog, messagebox
import pandas as pd
import warnings
import torch
import whisperx
import gc
import datetime
import json
import webvtt

# Suppress warnings
warnings.filterwarnings("ignore", category=UserWarning)

# Configuration
torch.cuda.set_device(0)  # Set GPU
device = "cuda" if torch.cuda.is_available() else "cpu"
batch_size = 16
compute_type = "float16"
hf_token = os.getenv('HF_TOKEN')
whisperx_model = "small.en"
extensions = ['.ogg', '.m4a', '.mp3']

# Functions
def find_audio_files(base_dir, extensions):
    audio_files = []
    for root, _, files in os.walk(base_dir):
        for file in files:
            if any(file.endswith(ext) for ext in extensions):
                audio_files.append(os.path.join(root, file))
    return audio_files

def anonymize_text(text, pseudonym_dict):
    for real_name, pseudonym in pseudonym_dict.items():
        text = text.replace(real_name, pseudonym)
    return text

def save_transcripts(segments, output_dir, relative_path, pseudonym_dict=None):
    if pseudonym_dict:
        for segment in segments:
            segment['text'] = anonymize_text(segment['text'], pseudonym_dict)
    for i, segment in enumerate(segments):
        segment['sentence_number'] = i + 1
    df = pd.DataFrame(segments)
    df['text'] = df['text'].apply(lambda x: x.lstrip())
    cols = ['sentence_number'] + [col for col in df.columns if col != 'sentence_number']
    df = df[cols]

    os.makedirs(output_dir, exist_ok=True)
    base_filename = os.path.splitext(os.path.basename(relative_path))[0]
    csv_path = os.path.join(output_dir, f"{base_filename}_transcription.csv")
    df.to_csv(csv_path, index=False)

    with open(os.path.join(output_dir, f"{base_filename}_transcription.txt"), 'w', encoding='utf-8') as f:
        for segment in segments:
            f.write(f"{segment['text'].strip()}\n")

    json_path = os.path.join(output_dir, f"{base_filename}_transcription.json")
    with open(json_path, 'w', encoding='utf-8') as f:
        json.dump(segments, f, ensure_ascii=False, indent=4)

    vtt = webvtt.WebVTT()
    for segment in segments:
        caption = webvtt.Caption()
        caption.start = str(datetime.timedelta(seconds=segment['start']))
        caption.end = str(datetime.timedelta(seconds=segment['end']))
        caption.lines = [f"{segment['sentence_number']}: {segment['text'].strip()}"]
        vtt.captions.append(caption)
    vtt.save(os.path.join(output_dir, f"{base_filename}_transcription.vtt"))

def process_audio_file(audio_file, base_output_dir, relative_path, pseudonym_dict=None):
    try:
        print(f"Processing {audio_file}...")
        audio = whisperx.load_audio(audio_file)
        model = whisperx.load_model(whisperx_model, device, compute_type=compute_type)
        result = model.transcribe(audio, batch_size=batch_size)
        del model
        gc.collect()
        torch.cuda.empty_cache()

        model_a, metadata = whisperx.load_align_model(language_code=result["language"], device=device)
        result = whisperx.align(result["segments"], model_a, metadata, audio, device, return_char_alignments=False)
        del model_a
        gc.collect()
        torch.cuda.empty_cache()

        diarize_model = whisperx.DiarizationPipeline(use_auth_token=hf_token, device=device)
        diarize_segments = diarize_model(audio)
        result = whisperx.assign_word_speakers(diarize_segments, result)

        output_dir = os.path.join(base_output_dir, os.path.dirname(relative_path))
        save_transcripts(result["segments"], output_dir, relative_path, pseudonym_dict)
    except Exception as e:
        print(f"Error processing {audio_file}: {e}")

def main():
    # Initialize Tkinter
    root = Tk()
    root.withdraw()  # Hide the main window

    # Popup for input folder
    input_folder = filedialog.askdirectory(title="Select Folder Containing Audio/Video Files")
    if not input_folder:
        print("No folder selected. Exiting.")
        return

    # Popup for output folder
    output_folder = filedialog.askdirectory(title="Select Folder to Save Transcriptions")
    if not output_folder:
        print("No output folder selected. Exiting.")
        return

    # Ask if a pseudonyms.csv file will be used
    use_pseudonyms = messagebox.askyesno("Pseudonyms", "Will you use a pseudonyms.csv file?")
    pseudonym_dict = None

    if use_pseudonyms:
        pseudonyms_file = filedialog.askopenfilename(
            title="Select Pseudonyms CSV File",
            filetypes=[("CSV files", "*.csv")]
        )
        if not pseudonyms_file:
            print("No pseudonyms file selected. Continuing without pseudonymization.")
        else:
            # Load the pseudonyms file
            pseudonyms_df = pd.read_csv(pseudonyms_file)
            pseudonym_dict = dict(zip(pseudonyms_df['name'], pseudonyms_df['pseudonym']))
            print(f"Pseudonyms loaded from {pseudonyms_file}.")

    # Find and process audio files
    audio_files = find_audio_files(input_folder, extensions)
    print(f"Found {len(audio_files)} files to process.")

    for audio_file in audio_files:
        relative_path = os.path.relpath(audio_file, input_folder)
        process_audio_file(audio_file, output_folder, relative_path, pseudonym_dict)
        print(f"Processed {audio_file}")

    print("All files processed.")

if __name__ == "__main__":
    main()


Found 2 files to process.
Processing D:/NLPWork/WhisperXTranscription4Researchers/data/rawAudioFiles\monos\Mono1\Monologue1.ogg...


Lightning automatically upgraded your loaded checkpoint from v1.5.4 to v2.2.4. To apply the upgrade to your files permanently, run `python -m pytorch_lightning.utilities.upgrade_checkpoint c:\Users\mrhal\anaconda3\envs\whisperX-env\lib\site-packages\whisperx\assets\pytorch_model.bin`


Model was trained with pyannote.audio 0.0.1, yours is 3.3.2. Bad things might happen unless you revert pyannote.audio to 0.x.
Model was trained with torch 1.10.0+cu102, yours is 2.5.1+cu121. Bad things might happen unless you revert torch to 1.x.
Processed D:/NLPWork/WhisperXTranscription4Researchers/data/rawAudioFiles\monos\Mono1\Monologue1.ogg
Processing D:/NLPWork/WhisperXTranscription4Researchers/data/rawAudioFiles\monos\Mono2\Monologue2.ogg...


Lightning automatically upgraded your loaded checkpoint from v1.5.4 to v2.2.4. To apply the upgrade to your files permanently, run `python -m pytorch_lightning.utilities.upgrade_checkpoint c:\Users\mrhal\anaconda3\envs\whisperX-env\lib\site-packages\whisperx\assets\pytorch_model.bin`


Model was trained with pyannote.audio 0.0.1, yours is 3.3.2. Bad things might happen unless you revert pyannote.audio to 0.x.
Model was trained with torch 1.10.0+cu102, yours is 2.5.1+cu121. Bad things might happen unless you revert torch to 1.x.
Processed D:/NLPWork/WhisperXTranscription4Researchers/data/rawAudioFiles\monos\Mono2\Monologue2.ogg
All files processed.
