In [2]:
## imports
import torch
import os
import math
import shutil
from pydub import AudioSegment
from pyannote.audio import Pipeline
from dotenv import load_dotenv
from openai import OpenAI
import textwrap

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
def convert_audio_to_wav(input_file, output_file=None):
    """
    Converts an audio file of any supported format to WAV format using pydub, with error checking.
    
    :param input_file: Path to the input audio file (e.g., m4a, mp3, wav, ogg).
    :param output_file: Optional, path to save the converted wav file. 
                        If not provided, the output file will have the same name as the input but with a .wav extension.
    :return: Path to the converted WAV file or None if conversion fails.
    """
    # Check if ffmpeg is installed and available
    if not shutil.which("ffmpeg"):
        raise EnvironmentError("ffmpeg is not installed or not found in system PATH. Please install ffmpeg.")

    # Detect file format based on extension
    file_format = input_file.split('.')[-1].lower()

    # Supported formats by pydub + ffmpeg
    supported_formats = ['m4a', 'mp3', 'ogg', 'flv', 'aac', 'wma', 'flac', 'wav']

    # Check if the file format is supported
    if file_format not in supported_formats:
        raise ValueError(f"Unsupported file format: {file_format}. Supported formats are: {supported_formats}")

    try:
        # Try to load the audio file
        audio = AudioSegment.from_file(input_file, format=file_format)

        # If output file is not provided, generate the output file name
        if output_file is None:
            output_file = input_file.rsplit('.', 1)[0] + '.wav'

        # Export the audio file as wav
        audio.export(output_file, format="wav")
        return output_file

    except FileNotFoundError:
        print(f"Error: The input file '{input_file}' does not exist.")
        return None
    except Exception as e:
        print(f"An error occurred while converting the audio file: {e}")
        return None

def perform_diarization(wav_file, hf_token, num_speakers=None):
    """
    Perform speaker diarization on a WAV audio file using pyannote.audio, with Hugging Face token for authentication.
    Utilizes MPS backend on Mac M1/M2 if available.
    
    :param wav_file: Path to the WAV file.
    :param hf_token: Hugging Face token for accessing the pre-trained model.
    :return: Diarization object with speaker segments and timestamps.
    """
    try:
        # Load the speaker diarization pipeline using the provided Hugging Face token

        pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=hf_token)

        # Check if MPS is available, otherwise default to CPU
        if torch.backends.mps.is_available():
            pipeline.to(torch.device("mps")) 

         # Perform speaker diarization, specifying the number of speakers if provided
        if num_speakers:
            diarization = pipeline(wav_file, num_speakers=num_speakers)
        else:
            diarization = pipeline(wav_file)

        # Collect the results in a structured format
        results = []
        for segment, _, speaker in diarization.itertracks(yield_label=True):
            result = {
                "speaker": speaker,
                "start_time": segment.start,
                "end_time": segment.end
            }
            results.append(result)
            print(f"Speaker {speaker} speaks from {segment.start:.1f}s to {segment.end:.1f}s")

        return results

    except Exception as e:
        print(f"An error occurred during diarization: {e}")
        return None



# Function to split audio into chunks
def split_audio_by_size(input_file, max_size_mb=20, overlap_ms=1000):
    audio = AudioSegment.from_file(input_file)
    bytes_per_second = audio.frame_rate * audio.frame_width * audio.channels
    max_size_bytes = max_size_mb * 1024 * 1024
    chunk_duration_ms = math.floor((max_size_bytes / bytes_per_second) * 1000)
    
    chunks = []
    start = 0
    while start < len(audio):
        end = start + chunk_duration_ms
        chunks.append((audio[start:end], start))
        start = end - overlap_ms
    
    chunk_files = []
    for i, (chunk, chunk_start_time_ms) in enumerate(chunks):
        chunk_filename = f"{os.path.splitext(input_file)[0]}_chunk_{i}.wav"
        chunk.export(chunk_filename, format="wav")
        chunk_files.append((chunk_filename, chunk_start_time_ms))
    
    return chunk_files

# Function to transcribe a single chunk and adjust timestamps
def transcribe_audio_chunk(chunk_file, whisper_api_key, chunk_start_time_ms, debug=False):   
    # Initialize the OpenAI client
    client = OpenAI(api_key=whisper_api_key)

    try:
        with open(chunk_file, "rb") as audio_file:
            response = client.audio.transcriptions.create(
                model="whisper-1", 
                file=audio_file, 
                response_format="verbose_json",
                timestamp_granularities=["word"]
            )
            
            words = response.words
            print(response)
            print(words)
            adjusted_words = []
            for word in words:
                adjusted_words.append({
                    'word': word.word,
                    'start': word.start + chunk_start_time_ms / 1000,
                    'end': word.start + chunk_start_time_ms / 1000
                })

            print(adjusted_words)
            return adjusted_words
    except Exception as e:
        print(f"An error occurred during transcription: {e}")
        return None

# Function to synchronize diarization results with transcriptions
def synchronize_diarization_with_whisper(diarization_results, transcribed_chunks):
    synchronized_output = []
    words = [word for chunk in transcribed_chunks for word in chunk]
    
    for segment in diarization_results:
        segment_start = segment['start_time']
        segment_end = segment['end_time']
        speaker = segment['speaker']
        
        segment_words = [
            word['word'] for word in words 
            if word['start'] >= segment_start and word['end'] <= segment_end
        ]
        segment_text = " ".join(segment_words)
        
        synchronized_output.append({
            'speaker': speaker,
            'text': segment_text,
            'start_time': segment_start,
            'end_time': segment_end
        })
    
    return synchronized_output

# Main function to split, transcribe, and synchronize
def transcribe_and_sync(input_file, whisper_api_key, diarization_results, max_size_mb=20, overlap_ms=1000, debug=False):
    chunk_files = split_audio_by_size(input_file, max_size_mb, overlap_ms)
    transcribed_chunks = []
    
    for chunk_file, chunk_start_time_ms in chunk_files:
        chunk_transcription = transcribe_audio_chunk(chunk_file, whisper_api_key, chunk_start_time_ms, debug=debug)
        print(f"Transcribing {chunk_file}")
        if chunk_transcription:
            transcribed_chunks.append(chunk_transcription)
    
    synchronized_transcription = synchronize_diarization_with_whisper(diarization_results, transcribed_chunks)

    for chunk_file, _ in chunk_files:
        os.remove(chunk_file)

    return synchronized_transcription

def print_clean_transcript(synchronized_transcription, wrap_width=None):
    """
    Prints a clean transcript with speakers and their corresponding text, combining consecutive speech from the same speaker.
    Optionally wraps text to a specified width.
    
    :param synchronized_transcription: List of synchronized speaker and text segments.
    :param wrap_width: Optional width for word wrapping the text. If None, no wrapping is applied.
    """
    previous_speaker = None
    combined_text = ""

    for segment in synchronized_transcription:
        current_speaker = segment['speaker']
        current_text = segment['text'].strip()  # Strip leading/trailing whitespace

        if not current_text:
            # Skip empty or whitespace-only segments
            continue

        if current_speaker == previous_speaker:
            # Continue combining text if the speaker is the same
            combined_text += " " + current_text
        else:
            # Print the previous speaker's text before starting a new speaker
            if previous_speaker is not None:
                # Apply word wrapping if wrap_width is provided
                if wrap_width:
                    wrapped_text = textwrap.fill(combined_text.strip(), width=wrap_width, subsequent_indent='  ')
                else:
                    wrapped_text = combined_text.strip()
                print(f"{previous_speaker}: {wrapped_text}")
            
            # Start a new speaker block
            previous_speaker = current_speaker
            combined_text = current_text

    # Print the last speaker's text after the loop ends
    if previous_speaker is not None:
        # Apply word wrapping if wrap_width is provided
        if wrap_width:
            wrapped_text = textwrap.fill(combined_text.strip(), width=wrap_width, subsequent_indent='  ')
        else:
            wrapped_text = combined_text.strip()
        print(f"{previous_speaker}: {wrapped_text}")

def write_clean_transcript_to_file(synchronized_transcription, file_path, wrap_width=None):
    """
    Writes a clean transcript with speakers and their corresponding text, combining consecutive speech from the same speaker.
    Optionally wraps text to a specified width.
    
    :param synchronized_transcription: List of synchronized speaker and text segments.
    :param file_path: Path to the file where the cleaned transcript will be written.
    :param wrap_width: Optional width for word wrapping the text. If None, no wrapping is applied.
    """
    with open(file_path, 'w') as file:
        previous_speaker = None
        combined_text = ""

        for segment in synchronized_transcription:
            current_speaker = segment['speaker']
            current_text = segment['text'].strip()  # Strip leading/trailing whitespace

            if not current_text:
                # Skip empty or whitespace-only segments
                continue

            if current_speaker == previous_speaker:
                # Continue combining text if the speaker is the same
                combined_text += " " + current_text
            else:
                # Write the previous speaker's text before starting a new speaker
                if previous_speaker is not None:
                    # Apply word wrapping if wrap_width is provided
                    if wrap_width:
                        wrapped_text = textwrap.fill(combined_text.strip(), width=wrap_width, subsequent_indent='  ')
                    else:
                        wrapped_text = combined_text.strip()
                    file.write(f"{previous_speaker}: {wrapped_text}\n")
                
                # Start a new speaker block
                previous_speaker = current_speaker
                combined_text = current_text

        # Write the last speaker's text after the loop ends
        if previous_speaker is not None:
            if wrap_width:
                wrapped_text = textwrap.fill(combined_text.strip(), width=wrap_width, subsequent_indent='  ')
            else:
                wrapped_text = combined_text.strip()
            file.write(f"{previous_speaker}: {wrapped_text}\n")



In [4]:
load_dotenv()
hf_token = os.getenv("HF_TOKEN")
whisper_api_key = os.getenv("OPEN_API_TAELGAR")

input_file = convert_audio_to_wav("/Users/tim/Downloads/Dunmar-CO-Rec1.m4a")
prefix = "/Users/tim/Downloads/dunmar-co-segment1"
output_file = f"{prefix}.transcript.txt"
num_speakers = 5

diarization_results = perform_diarization(input_file, hf_token, num_speakers=num_speakers)
print("Finished diarization.")
synchronized_transcription = transcribe_and_sync(input_file, whisper_api_key, diarization_results, debug=False)  
write_clean_transcript_to_file(synchronized_transcription, output_file, wrap_width=None)
pickle.dump(synchronized_transcription, open(f"{prefix}.transcript.pkl", "wb"))
pickle.dump(diarization_results, open(f"{prefix}.diarization.pkl", "wb"))

: 