## Code that works

In [1]:
import warnings

# Suppress specific warnings by category
warnings.filterwarnings("ignore")

# speaker_diarization_transcription.py

import os
from dotenv import load_dotenv
import torch
from pyannote.audio import Pipeline
from pydub import AudioSegment
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline
from collections import defaultdict

class SpeakerDiarizationTranscription:
    def __init__(self):
        # Load environment variables from .env file
        load_dotenv()

        # Access the Hugging Face token
        self.huggingface_token = os.getenv("HUGGINGFACE_TOKEN")

        # Check if the token was loaded correctly
        if self.huggingface_token:
            print("Hugging Face token loaded successfully.")
        else:
            raise ValueError("Failed to load Hugging Face token. Check your .env file.")

        # Initialize the models
        self.pipeline = self.load_pyannote_model()
        self.pipe = self.load_whisper_model()

    def load_pyannote_model(self):
        # Initialize the pyannote pipeline
        pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization@2.1", use_auth_token=self.huggingface_token)

        # Check if CUDA is available and move the pipeline to GPU if it is
        if torch.cuda.is_available():
            pipeline.to(torch.device("cuda"))
            print("Pyannote pipeline moved to GPU.")
        else:
            print("CUDA is not available. The pyannote pipeline will run on the CPU.")

        return pipeline

    def convert_to_wav(self, audio_file):
        # Check if the file is mp3 and convert to wav if necessary
        if audio_file.lower().endswith('.mp3'):
            audio = AudioSegment.from_file(audio_file)
            wav_file = audio_file.rsplit('.', 1)[0] + '.wav'
            audio.export(wav_file, format="wav")
            return wav_file
        else:
            return audio_file
    
    def perform_diarization(self, wav_file, num_speakers=None):
        # Initialize an empty list to store the diarization entries
        diarization_list = []

        # Apply the pipeline to the wav file
        diarization = self.pipeline(wav_file, num_speakers=num_speakers)

        # Iterate over the diarization results and build the list of dictionaries
        for turn, _, speaker in diarization.itertracks(yield_label=True):
            # Create a dictionary for each diarization segment
            diarization_entry = {
                'start': turn.start,
                'stop': turn.end,
                'speaker': speaker
            }
            # Add the dictionary to the list
            diarization_list.append(diarization_entry)

        return diarization_list

    def load_whisper_model(self):
        # Set device and data types
        device = "cuda" if torch.cuda.is_available() else "cpu"
        torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32

        # Specify the model ID
        model_id = "openai/whisper-large-v2"  # Updated to a valid model ID

        # Load the model
        model = AutoModelForSpeechSeq2Seq.from_pretrained(
            model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True
        )
        model.to(device)

        # Load the processor
        processor = AutoProcessor.from_pretrained(model_id)

        # Create the pipeline with additional parameters
        pipe = pipeline(
            "automatic-speech-recognition",
            model=model,
            tokenizer=processor.tokenizer,
            feature_extractor=processor.feature_extractor,
            torch_dtype=torch_dtype,
            device=device,
            return_timestamps="word",  # Ensure word-level timestamps are returned
            chunk_length_s=30,         # Adjust for long audio files
            stride_length_s=(5, 5),    # Overlapping chunks to improve accuracy
        )

        print("Whisper model loaded successfully.")

        return pipe

    def transcribe_audio(self, wav_file, language):
        # Transcribe the audio file and get detailed results
        result = self.pipe(wav_file, return_timestamps=True, generate_kwargs={"language": language})

        transcription = []

        # Check if the result contains 'chunks' directly
        if 'chunks' in result:
            # Process the chunks directly
            transcription = result['chunks']
        elif 'segments' in result:
            # The 'segments' key exists, process each segment
            for segment in result['segments']:
                segment_start = segment['start']
                # Each segment may have multiple chunks
                for chunk in segment['chunks']:
                    # Adjust the chunk's timestamps to be absolute
                    t_start, t_end = chunk['timestamp']
                    chunk['timestamp'] = (t_start + segment_start, t_end + segment_start)
                    transcription.append(chunk)
        else:
            # Fallback to processing the entire result
            transcription = [{'timestamp': (result['start'], result['end']), 'text': result['text']}]

        return transcription

    def assign_speakers_to_transcription(self, diarization, transcription):
        # List to store results with assigned speakers
        assigned_transcription = []
        
        # Loop over each transcription segment to find the primary speaker
        for segment in transcription:
            segment_start, segment_end = segment['timestamp']
            segment_duration = segment_end - segment_start

            # Track overlapping time for each speaker in the current segment
            speaker_overlap = {}

            # Loop over each diarization entry to calculate overlap with the transcription segment
            for entry in diarization:
                entry_start, entry_end = entry['start'], entry['stop']
                
                # Calculate the overlap duration
                overlap_start = max(segment_start, entry_start)
                overlap_end = min(segment_end, entry_end)
                overlap_duration = max(0, overlap_end - overlap_start)

                # If there is overlap, accumulate it by speaker
                if overlap_duration > 0:
                    speaker = entry['speaker']
                    if speaker not in speaker_overlap:
                        speaker_overlap[speaker] = 0
                    speaker_overlap[speaker] += overlap_duration

            # Determine the primary speaker based on the highest overlap percentage
            if speaker_overlap:
                main_speaker = max(speaker_overlap, key=speaker_overlap.get)
                max_percentage = (speaker_overlap[main_speaker] / segment_duration) * 100
            else:
                main_speaker = None  # No clear speaker found
                max_percentage = 0

            # Append the transcription segment with identified speaker
            assigned_transcription.append({
                'timestamp': segment['timestamp'],
                'text': segment['text'],
                'main_speaker': main_speaker,
                'main_speaker_percentage': max_percentage
            })

        return assigned_transcription


    def get_diarization_and_transcription(self, audio_file, num_speakers, language):
        # Convert to wav if necessary
        wav_file = self.convert_to_wav(audio_file)

        # Perform diarization
        print("Starting speaker diarization...")
        diarization_list = self.perform_diarization(wav_file, num_speakers)
        print("Speaker diarization completed.")

        # Transcribe audio
        print("Starting transcription...")
        transcription = self.transcribe_audio(wav_file, language)
        print("Transcription completed.")

        # Clean up the temporary wav file if it was created
        if wav_file != audio_file:
            os.remove(wav_file)

        return diarization_list, transcription

    def process_audio(self, audio_file, num_speakers, language):
        # Convert to wav if necessary
        wav_file = self.convert_to_wav(audio_file)

        # Perform diarization
        print("Starting speaker diarization...")
        diarization_list = self.perform_diarization(wav_file, num_speakers)
        print("Speaker diarization completed.")

        # Transcribe audio
        print("Starting transcription...")
        transcription = self.transcribe_audio(wav_file, language)
        print("Transcription completed.")

        # Assign speakers to transcription
        print("Assigning speakers to transcription...")
        combined_result = self.assign_speakers_to_transcription(diarization_list, transcription)
        print("Speaker assignment completed.")

        # Clean up the temporary wav file if it was created
        if wav_file != audio_file:
            os.remove(wav_file)

        return combined_result

In [2]:
import sys
import logging
from contextlib import redirect_stdout

logging.getLogger("speechbrain").setLevel(logging.WARNING)

sdt = SpeakerDiarizationTranscription()

Hugging Face token loaded successfully.


Lightning automatically upgraded your loaded checkpoint from v1.5.4 to v2.4.0. To apply the upgrade to your files permanently, run `python -m pytorch_lightning.utilities.upgrade_checkpoint ../../../../../../../../../home/luis/.cache/torch/pyannote/models--pyannote--segmentation/snapshots/c4c8ceafcbb3a7a280c2d357aee9fbc9b0be7f9b/pytorch_model.bin`


Model was trained with pyannote.audio 0.0.1, yours is 3.3.2. Bad things might happen unless you revert pyannote.audio to 0.x.
Model was trained with torch 1.10.0+cu102, yours is 2.5.1+cu124. Bad things might happen unless you revert torch to 1.x.
Pyannote pipeline moved to GPU.
Whisper model loaded successfully.


In [3]:
# Select filename
filename = "conversacion_lidix"

# Define number of speakers
num_speakers = 2

# Set lanaguage
language = "spanish"

In [4]:
# Path to your audio file
audio_file = f"/mnt/c/Users/luisg/Desktop/STAR/STAR/audio_samples/{filename}.mp3"  # Replace with your audio file path

# Process the audio file
combined_result = sdt.process_audio(audio_file, num_speakers, language)

# Print the results
for chunk in combined_result:
    print(f"Speaker {chunk['main_speaker']}: {chunk['text']}")

Starting speaker diarization...
Speaker diarization completed.
Starting transcription...


You have passed language=spanish, but also have set `forced_decoder_ids` to [[1, None], [2, 50359]] which creates a conflict. `forced_decoder_ids` will be ignored in favor of language=spanish.
The attention mask is not set and cannot be inferred from input because pad token is same as eos token. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.


Transcription completed.
Assigning speakers to transcription...
Speaker assignment completed.
Speaker SPEAKER_01:  Venga. Vale, se plantea la cuestión de por qué hay ciertas carreras, sobre todo las
Speaker SPEAKER_01:  tecnológicas, las STEM, ahora han cobrado más importancia, dejando a otras en declive
Speaker SPEAKER_01:  en estos últimos años. Vale. No lo tienes por qué hacer bien, es una conversación. Venga, cuéntamelo.
Speaker SPEAKER_01:  Vale, bueno, hemos estructurado las carreras en cuatro grupos, o sea, tenemos cuatro grupos
Speaker SPEAKER_01:  claramente…
Speaker SPEAKER_00:  De estudiantes, no de carreras.
Speaker SPEAKER_01:  Estudiantes o trabajadores. En todas las organizaciones, vamos a decir, tanto universidades como empresas,
Speaker SPEAKER_01:  diferenciamos cuatro grupos que son los que vienen diferenciados principalmente por la motivación
Speaker SPEAKER_01:  y por los conocimientos o competencias que tiene cada uno.
Speaker SPEAKER_01:  Tenemos los primeros que

In [5]:
# Initialize an empty list to store the merged result
merged_result = []

# Keep track of the last speaker to detect changes
last_speaker = None
current_text = ""
current_timestamp = None

for chunk in combined_result:
    speaker = chunk['main_speaker']
    text = chunk['text']
    timestamp = chunk['timestamp']

    # If this speaker is the same as the last one, concatenate the text
    if speaker == last_speaker:
        current_text += " " + text  # Add space between concatenated text segments
    else:
        # If a new speaker starts, save the previous speaker's concatenated text
        if last_speaker is not None:
            merged_result.append({
                'speaker': last_speaker,
                'text': current_text,
                'timestamp': current_timestamp
            })
        
        # Start a new text segment for the current speaker
        last_speaker = speaker
        current_text = text
        current_timestamp = timestamp  # Capture timestamp of the first chunk for this speaker

# Append the final speaker's text
if last_speaker is not None:
    merged_result.append({
        'speaker': last_speaker,
        'text': current_text,
        'timestamp': current_timestamp
    })

# Display the merged result
for entry in merged_result:
    print(f"{entry['speaker']}: {entry['text']}")

SPEAKER_01:  Venga. Vale, se plantea la cuestión de por qué hay ciertas carreras, sobre todo las  tecnológicas, las STEM, ahora han cobrado más importancia, dejando a otras en declive  en estos últimos años. Vale. No lo tienes por qué hacer bien, es una conversación. Venga, cuéntamelo.  Vale, bueno, hemos estructurado las carreras en cuatro grupos, o sea, tenemos cuatro grupos  claramente…
SPEAKER_00:  De estudiantes, no de carreras.
SPEAKER_01:  Estudiantes o trabajadores. En todas las organizaciones, vamos a decir, tanto universidades como empresas,  diferenciamos cuatro grupos que son los que vienen diferenciados principalmente por la motivación  y por los conocimientos o competencias que tiene cada uno.  Tenemos los primeros que son los que dicen o piensan, bueno, los que tienen las competencias  y les gusta lo que hacen, es decir, se me da bien y me gusta.
SPEAKER_00:  O sea, tenemos un grupo que sería el empleo perfecto, los que se te hagan bien y además les  gusta estar motivado

In [13]:
for entry in merged_result:
    # Format the timestamp into start and end time with a clearer structure
    start_time, end_time = entry['timestamp']
    start_time_formatted = f"{int(start_time // 60)}:{int(start_time % 60):02}.{int((start_time % 1) * 100):02}"
    end_time_formatted = f"{int(end_time // 60)}:{int(end_time % 60):02}.{int((end_time % 1) * 100):02}"
    
    # Displaying the speaker, timestamp, and text in a more readable way
    print(f"{entry['speaker']} ({start_time_formatted} - {end_time_formatted}): {entry['text']}")

LIDIA (0:00.00 - 0:08.43):  Venga. Vale, se plantea la cuestión de por qué hay ciertas carreras, sobre todo las  tecnológicas, las STEM, ahora han cobrado más importancia, dejando a otras en declive  en estos últimos años. Vale. No lo tienes por qué hacer bien, es una conversación. Venga, cuéntamelo.  Vale, bueno, hemos estructurado las carreras en cuatro grupos, o sea, tenemos cuatro grupos  claramente…
LUIS (0:37.11 - 0:39.11):  De estudiantes, no de carreras.
LIDIA (0:39.11 - 0:46.00):  Estudiantes o trabajadores. En todas las organizaciones, vamos a decir, tanto universidades como empresas,  diferenciamos cuatro grupos que son los que vienen diferenciados principalmente por la motivación  y por los conocimientos o competencias que tiene cada uno.  Tenemos los primeros que son los que dicen o piensan, bueno, los que tienen las competencias  y les gusta lo que hacen, es decir, se me da bien y me gusta.
LUIS (1:06.51 - 1:12.15):  O sea, tenemos un grupo que sería el empleo perfecto, l

## Choose speakers

In [6]:
# Select the speaker names (if unknow, speaker_names = None)
speaker_names = ["LUIS", "LIDIA"]

In [7]:
def replace_speaker_names(merged_result, speaker_names=None):

    if speaker_names == None:
        return merged_result
    # Create a dictionary to map SPEAKER_XX to provided names
    speaker_map = {f"SPEAKER_{str(i).zfill(2)}": name for i, name in enumerate(speaker_names)}
    
    # Replace speaker names in merged_result
    for entry in merged_result:
        if entry['speaker'] in speaker_map:
            entry['speaker'] = speaker_map[entry['speaker']]
    
    return merged_result

# Example usage:
# Assuming merged_result is the result from the code you shared

# Call the function
updated_result = replace_speaker_names(merged_result, speaker_names)

# Display the updated merged result
for entry in updated_result:
    print(f"{entry['speaker']}: {entry['text']}")

LIDIA:  Venga. Vale, se plantea la cuestión de por qué hay ciertas carreras, sobre todo las  tecnológicas, las STEM, ahora han cobrado más importancia, dejando a otras en declive  en estos últimos años. Vale. No lo tienes por qué hacer bien, es una conversación. Venga, cuéntamelo.  Vale, bueno, hemos estructurado las carreras en cuatro grupos, o sea, tenemos cuatro grupos  claramente…
LUIS:  De estudiantes, no de carreras.
LIDIA:  Estudiantes o trabajadores. En todas las organizaciones, vamos a decir, tanto universidades como empresas,  diferenciamos cuatro grupos que son los que vienen diferenciados principalmente por la motivación  y por los conocimientos o competencias que tiene cada uno.  Tenemos los primeros que son los que dicen o piensan, bueno, los que tienen las competencias  y les gusta lo que hacen, es decir, se me da bien y me gusta.
LUIS:  O sea, tenemos un grupo que sería el empleo perfecto, los que se te hagan bien y además les  gusta estar motivados con el tema, ¿vale? 

In [8]:
# Specify the file path
file_path = f"/mnt/c/Users/luisg/Desktop/STAR/STAR/transcriptions_diarized/{filename}.txt"

# Write the merged result to the file
with open(file_path, "w", encoding="utf-8") as file:
    for entry in merged_result:
        file.write(f"{entry['speaker']}: {entry['text']}\n")