In [5]:
# Check if running in Kaggle environment and install packages accordingly
import sys
if 'kaggle' in sys.modules:
    !pip install openai-whisper pyannote.audio gdown

In [None]:
import whisper
import os
import time
from pathlib import Path
from pyannote.audio.pipelines import SpeakerDiarization
from pyannote.core import Segment
from huggingface_hub import HfFolder

# Handle Google Drive differently based on environment
if 'google.colab' in sys.modules:
    from google.colab import drive
    drive.mount('/content/drive')
    audio_folder_path = "/content/drive/MyDrive/Transcribe"  # Update this path as per your folder structure
    log_file_path = "/content/drive/MyDrive/Transcribe/transcription_log.txt"
elif 'kaggle' in sys.modules:
    # In Kaggle, use the default working directory or specify your own
    audio_folder_path = "/kaggle/input/your-audio-files-folder"  # Replace with your folder name in Kaggle
    log_file_path = "/kaggle/working/transcription_log.txt"
else:
    # For local environments
    audio_folder_path = "./Transcribe"  # Local directory
    log_file_path = "./transcription_log.txt"


KeyboardInterrupt: 

In [None]:
# Configuration settings
model_size = "small"  # Adjust based on your requirement


In [None]:
model = whisper.load_model(model_size)

In [None]:
def log_message(message, log_file=log_file_path):
    with open(log_file, "a") as file:
        file.write(f"{message}\n")

In [None]:
def transcribe_and_log_full_transcript(audio_path, model, log_file=log_file_path):
    start_time = time.time()
    try:
        result = model.transcribe(audio_path)
        execution_time = time.time() - start_time
        success_message = f"Transcription successful for {Path(audio_path).name}! Execution time: {execution_time:.2f} seconds"
        log_message(success_message, log_file)
        
        output_file_path = Path(audio_path).with_suffix('.txt')
        with open(output_file_path, "w") as file:
            for segment in result['segments']:
                file.write(f"{segment['start']}-{segment['end']}: {segment['text']}\n")
        
        log_message(f"Full transcription saved to {output_file_path}", log_file)
        return result['segments']
    except Exception as e:
        error_message = f"Error during transcription of {Path(audio_path).name}: {str(e)}"
        log_message(error_message, log_file)
        raise


In [None]:
def perform_diarization_pyannote(audio_path, output_log="diarization_pyannote_log.txt"):
    start_time = time.time()
    pipeline = SpeakerDiarization(segmentation="pyannote/segmentation", use_auth_token=HfFolder.get_token())
    diarization = pipeline({'uri': 'SpeakerDiarization', 'audio': audio_path})
    
    with open(output_log, "w") as log_file:
        for turn, _, speaker in diarization.itertracks(yield_label=True):
            start, end = turn.start, turn.end
            log_file.write(f"Speaker: {speaker}, Start: {start}, End: {end}\n")
    
    execution_time = time.time() - start_time
    print(f"Diarization completed successfully in {execution_time:.2f} seconds.")
    return [{'speaker': speaker, 'start': turn.start, 'end': turn.end} for turn, _, speaker in diarization.itertracks(yield_label=True)]


In [None]:
def integrate_diarization_with_transcript(diarization_results, transcription_segments):
    integrated_output = []
    for segment in transcription_segments:
        speaker_label = "Unknown"
        for speaker_segment in diarization_results:
            if segment['start'] >= speaker_segment['start'] and segment['end'] <= speaker_segment['end']:
                speaker_label = speaker_segment['speaker']
                break
        integrated_output.append(f"{speaker_label}: {segment['text']}")
    return integrated_output


In [None]:
def process_files_in_directory(directory_path, model):
    for file_name in os.listdir(directory_path):
        if file_name.endswith(('.mp3', '.mp4', '.wav')):
            file_path = os.path.join(directory_path, file_name)
            print(f"Processing file: {file_path}")
            try:
                transcription_segments = transcribe_and_log_full_transcript(file_path, model)
                diarization_results = perform_diarization_pyannote(file_path)
                integrated_transcript = integrate_diarization_with_transcript(diarization_results, transcription_segments)
                
                output_file_path = Path(file_path).with_suffix('.integrated.txt')
                with open(output_file_path, "w") as file:
                    for line in integrated_transcript:
                        file.write(f"{line}\n")
                
                print(f"Integrated transcription saved to {output_file_path}")
            except Exception as e:
                print(f"An error occurred while processing {file_path}: {e}")


In [None]:
process_files_in_directory(audio_folder_path, model)