<a href="https://colab.research.google.com/github/manilka12/whisper-transcriber/blob/main/whisper_transcriber.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


## **How to use**
1. Click Runtime -> Run all and wait


In [None]:
# @markdown # **[Optional]** Access data in Google Drive 💾
# @markdown Enter a Google Drive path and run this cell to store the results inside Google Drive.

# Uncomment to copy generated images to drive, faster than downloading directly from colab in my experience.
from google.colab import drive
from pathlib import Path

drive_mount_path = Path("/") / "content" / "drive"
drive.mount(str(drive_mount_path))
drive_mount_path /= "My Drive"
#@markdown ---
drive_path = "Colab Notebooks/Faster Whisper" #@param {type:"string"}
#@markdown ---
#@markdown **Run this cell again if you change your Google Drive path.**

drive_whisper_path = drive_mount_path / Path(drive_path.lstrip("/"))
drive_whisper_path.mkdir(parents=True, exist_ok=True)

In [None]:
# Whisper Audio Transcription Tool
# Author: Manilka Chamuditha
# GitHub: https://github.com/manilka12/whisper-transcriber

# @title # 🎙️ Whisper Audio Transcription Tool {"display-mode":"form"}
#@markdown This notebook helps you transcribe audio from various sources using OpenAI's Whisper model (via faster-whisper).

#@markdown ---
#@markdown ## 📦 Setup Dependencies
#@markdown Run this cell to install required packages

# Install dependencies with progress indicators
import sys
from IPython.display import HTML, display, clear_output

def install_with_progress(package):
    """Install package with progress indicator"""
    display(HTML(f"<p>Installing {package}...</p>"))
    !pip install -q {package}
    clear_output(wait=True)
    display(HTML(f"<p>✅ {package} installed</p>"))

# Install required packages
install_with_progress("faster-whisper")
install_with_progress("yt-dlp")
install_with_progress("tqdm")

# Import required libraries
import os
import sys
import warnings
import torch
import numpy as np
import zipfile
import shutil
import logging
import subprocess
import tempfile
from pathlib import Path
from datetime import datetime
from tqdm.notebook import tqdm
from IPython.display import display, Markdown, YouTubeVideo, HTML
from google.colab import files
import requests
from urllib.parse import urlsplit

# Setup basic logging
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s',
                    datefmt='%H:%M:%S')
logger = logging.getLogger("WhisperTranscriber")

# Check for CUDA availability and setup device
device = "cuda" if torch.cuda.is_available() else "cpu"
logger.info(f"Using device: {device}")
if device == "cuda":
    !sudo apt-get update -qq > /dev/null
    !sudo apt install -qq nvidia-cuda-toolkit > /dev/null
    logger.info(f"CUDA installed. GPU: {torch.cuda.get_device_name(0)}")

#@title # 🧠 Model Selection
#@markdown Select the Whisper model variant to use for transcription

#@markdown ---
model_size = 'large-v3' #@param ['tiny', 'tiny.en', 'base', 'base.en', 'small', 'small.en', 'medium', 'medium.en', 'large-v1', 'large-v2', 'large-v3']
compute_type = "float16" #@param {type:"string"} ['float16', 'int8_float16', 'int8']
#@markdown ---
#@markdown **Memory usage**: Larger models provide better accuracy but require more GPU memory
#@markdown **Run this cell again if you change the model.**

try:
    from faster_whisper import WhisperModel
    model = WhisperModel(model_size, device=device, compute_type=compute_type)
    logger.info(f"✅ Model '{model_size}' loaded successfully")
except Exception as e:
    logger.error(f"❌ Failed to load model: {str(e)}")
    raise

#@title # 📺 Media Source Selection
#@markdown Choose the source of your video/audio file for transcription

#@markdown ---
# Pre-check if there are media files already in the working directory
def check_for_media_files():
    """Check if media files already exist in the workspace"""
    content_dir = Path("/content")
    audio_extensions = [".wav", ".mp3", ".ogg", ".opus", ".aac", ".flac", ".m4a"]
    video_extensions = [".mp4", ".mkv", ".mov", ".avi", ".wmv", ".flv", ".webm", ".3gp", ".mpeg"]
    supported_extensions = audio_extensions + video_extensions

    media_files = []
    for file_path in content_dir.iterdir():
        if file_path.is_file() and file_path.suffix.lower() in supported_extensions:
            media_files.append(file_path)

    return media_files

existing_media = check_for_media_files()
if existing_media:
    existing_files_str = "\n".join([f"- {file.name}" for file in existing_media])
    display(Markdown(f"📁 **Media files already found in workspace:**\n{existing_files_str}"))

source_type = "Auto-detect" #@param ['Auto-detect', 'YouTube','Google Drive','Direct Download URL', 'Manual Upload']
#@markdown **Auto-detect** will use any media files already in the workspace.
#@markdown If none found, it will prompt for upload.
#@markdown ---

#@markdown ### 🎬 YouTube Options
youtube_url = "" #@param {type:"string"}
download_audio_only = True #@param {type:"boolean"}
#@markdown ---

#@markdown ### 📁 Google Drive Options
drive_path = "my_video.mp4" #@param {type:"string"}
#@markdown If using a folder, all media files will be processed
#@markdown ---

#@markdown ### 🌐 Direct Download Options
download_url = "" #@param {type:"string"}
#@markdown ---

#@markdown ### 🚨 No media files detected?
#@markdown If needed, upload files in the **Manual Upload** option or choose another source.

class MediaProcessor:
    """Class to handle different media sources and processing"""

    def __init__(self):
        self.media_files = []
        self.audio_extensions = [".wav", ".mp3", ".ogg", ".opus", ".aac", ".flac", ".m4a"]
        self.video_extensions = [".mp4", ".mkv", ".mov", ".avi", ".wmv", ".flv", ".webm", ".3gp", ".mpeg"]
        self.supported_extensions = self.audio_extensions + self.video_extensions

        # Create output directory
        self.output_dir = Path("transcribed_texts")
        self.output_dir.mkdir(exist_ok=True)

        # Create temp directory for processing
        self.temp_dir = Path(tempfile.mkdtemp())

    def process_youtube(self, url, audio_only=True):
        """Download audio from YouTube URL"""
        try:
            display(Markdown(f"⬇️ Downloading from YouTube: `{url}`"))

            import yt_dlp
            ydl_opts = {
                'format': 'm4a/bestaudio/best' if audio_only else 'bestvideo+bestaudio',
                'outtmpl': f'{self.temp_dir}/%(id)s.%(ext)s',
                'quiet': True,
                'no_warnings': True,
                'postprocessors': [{
                    'key': 'FFmpegExtractAudio',
                    'preferredcodec': 'wav',
                    'preferredquality': '192',
                }] if audio_only else []
            }

            with yt_dlp.YoutubeDL(ydl_opts) as ydl:
                info = ydl.extract_info(url, download=True)
                if 'entries' in info:  # Playlist
                    for entry in info['entries']:
                        file_path = Path(f"{self.temp_dir}/{entry['id']}.wav" if audio_only else f"{self.temp_dir}/{entry['id']}.{entry['ext']}")
                        self.media_files.append(file_path)
                        display(Markdown(f"✅ Downloaded: **{entry['title']}**"))
                else:  # Single video
                    file_path = Path(f"{self.temp_dir}/{info['id']}.wav" if audio_only else f"{self.temp_dir}/{info['id']}.{info['ext']}")
                    self.media_files.append(file_path)
                    display(Markdown(f"✅ Downloaded: **{info['title']}**"))

        except Exception as e:
            logger.error(f"❌ YouTube download failed: {str(e)}")
            raise

    def process_drive(self, path, drive_already_mounted=True):
        """Process files from Google Drive"""
        if not drive_already_mounted:
            from google.colab import drive
            drive.mount('/content/drive')

        drive_path = Path('/content/drive/MyDrive') / Path(path.lstrip("/"))

        if not drive_path.exists():
            logger.error(f"❌ Path does not exist: {drive_path}")
            return

        if drive_path.is_dir():
            display(Markdown(f"📁 Processing directory: `{drive_path}`"))
            for file_path in drive_path.glob("**/*"):
                if file_path.is_file() and file_path.suffix.lower() in self.supported_extensions:
                    local_path = self.temp_dir / file_path.name
                    shutil.copy(file_path, local_path)
                    self.media_files.append(local_path)
                    display(Markdown(f"✅ Added: **{file_path.name}**"))
        else:
            if drive_path.suffix.lower() in self.supported_extensions:
                local_path = self.temp_dir / drive_path.name
                shutil.copy(drive_path, local_path)
                self.media_files.append(local_path)
                display(Markdown(f"✅ Added: **{drive_path.name}**"))
            else:
                logger.error(f"❌ Unsupported file type: {drive_path}")

    def process_direct_download(self, url):
        """Download media from direct URL"""
        try:
            display(Markdown(f"⬇️ Downloading from URL: `{url}`"))
            response = requests.get(url, stream=True)

            if response.status_code != 200:
                logger.error(f"❌ Download failed with status code: {response.status_code}")
                return

            filename = urlsplit(url).path.split("/")[-1]
            if not any(filename.lower().endswith(ext) for ext in self.supported_extensions):
                logger.warning(f"⚠️ File may not be a supported media type: {filename}")

            file_path = self.temp_dir / filename

            # Download with progress bar
            total_size = int(response.headers.get('content-length', 0))
            block_size = 1024  # 1 Kibibyte

            t = tqdm(total=total_size, unit='iB', unit_scale=True)
            with open(file_path, 'wb') as f:
                for data in response.iter_content(block_size):
                    t.update(len(data))
                    f.write(data)
            t.close()

            self.media_files.append(file_path)
            display(Markdown(f"✅ Downloaded: **{filename}**"))

        except Exception as e:
            logger.error(f"❌ Direct download failed: {str(e)}")

    def process_manual_upload(self):
        """Process manually uploaded files"""
        try:
            display(Markdown(f"📤 Upload files using the button below:"))

            # Ask user to upload files
            uploaded = files.upload()

            for filename, content in uploaded.items():
                if any(filename.lower().endswith(ext) for ext in self.supported_extensions):
                    file_path = self.temp_dir / filename
                    with open(file_path, 'wb') as f:
                        f.write(content)
                    self.media_files.append(file_path)
                    display(Markdown(f"✅ Uploaded: **{filename}**"))
                else:
                    logger.warning(f"⚠️ Skipping unsupported file: {filename}")
        except Exception as e:
            logger.error(f"❌ Manual upload processing failed: {str(e)}")

    def use_existing_files(self, file_list):
        """Use files that already exist in the workspace"""
        try:
            for file_path in file_list:
                target_path = self.temp_dir / file_path.name
                shutil.copy(file_path, target_path)
                self.media_files.append(target_path)
                display(Markdown(f"✅ Using existing file: **{file_path.name}**"))
        except Exception as e:
            logger.error(f"❌ Error processing existing files: {str(e)}")

    def convert_to_audio(self):
        """Convert video files to audio for processing"""
        for i, file_path in enumerate(self.media_files[:]):
            if file_path.suffix.lower() in self.video_extensions:
                display(Markdown(f"🔄 Converting video to audio: **{file_path.name}**"))

                # Define output audio path
                audio_path = file_path.with_suffix(".wav")

                # Convert video to audio using ffmpeg
                try:
                    result = subprocess.run(
                        ["ffmpeg", "-i", str(file_path), "-vn", "-acodec", "pcm_s16le",
                         "-ar", "16000", "-ac", "1", str(audio_path), "-y", "-loglevel", "error"],
                        stdout=subprocess.PIPE,
                        stderr=subprocess.PIPE,
                        check=True
                    )

                    # Replace video with audio in the list
                    self.media_files[i] = audio_path
                    display(Markdown(f"✅ Converted to audio: **{audio_path.name}**"))

                    # Optionally remove the original video file to save space
                    os.remove(file_path)

                except subprocess.CalledProcessError as e:
                    logger.error(f"❌ FFmpeg conversion failed: {e.stderr.decode('utf-8')}")
                    # Keep the original file in the list

    def cleanup(self):
        """Clean up temporary files"""
        try:
            shutil.rmtree(self.temp_dir)
            logger.info("🧹 Temporary files cleaned up")
        except Exception as e:
            logger.error(f"❌ Cleanup failed: {str(e)}")

# Process media based on selected source
processor = MediaProcessor()

try:
    if source_type == "Auto-detect":
        existing_media = check_for_media_files()
        if existing_media:
            processor.use_existing_files(existing_media)
        else:
            display(Markdown("⚠️ **No media files found in workspace. Please upload:**"))
            processor.process_manual_upload()

    elif source_type == "YouTube":
        if not youtube_url:
            display(Markdown("⚠️ **Please provide a YouTube URL**"))
        else:
            processor.process_youtube(youtube_url, download_audio_only)

    elif source_type == "Google Drive":
        if not drive_path:
            display(Markdown("⚠️ **Please provide a Google Drive path**"))
        else:
            processor.process_drive(drive_path)

    elif source_type == "Direct Download URL":
        if not download_url:
            display(Markdown("⚠️ **Please provide a download URL**"))
        else:
            processor.process_direct_download(download_url)

    elif source_type == "Manual Upload":
        processor.process_manual_upload()

    # Convert videos to audio for processing
    processor.convert_to_audio()

    # Summary of files to be processed
    if processor.media_files:
        display(Markdown(f"## 📋 Files to be processed ({len(processor.media_files)}):"))
        for file in processor.media_files:
            display(Markdown(f"- **{file.name}**"))
    else:
        display(Markdown("⚠️ **No media files found to process**"))

except Exception as e:
    display(Markdown(f"❌ **Error:** {str(e)}"))

#@title # 🚀 Transcription Settings
#@markdown Configure the transcription process

#@markdown ---
#@markdown ## 🌍 Language Settings
language = "en" #@param ["auto", "en", "zh", "de", "es", "ru", "ko", "fr", "ja", "pt", "tr", "pl", "ca", "nl", "ar", "sv", "it", "id", "hi", "fi", "vi", "he", "uk", "el", "ms", "cs", "ro", "da", "hu", "ta", "no", "th", "ur", "hr", "bg", "lt", "la", "mi", "ml", "cy", "sk", "te", "fa", "lv", "bn", "sr", "az", "sl", "kn", "et", "mk", "br", "eu", "is", "hy", "ne", "mn", "bs", "kk", "sq", "sw", "gl", "mr", "pa", "si", "km", "sn", "yo", "so", "af", "oc", "ka", "be", "tg", "sd", "gu", "am", "yi", "lo", "uz", "fo", "ht", "ps", "tk", "nn", "mt", "sa", "lb", "my", "bo", "tl", "mg", "as", "tt", "haw", "ln", "ha", "ba", "jw", "su"] {allow-input: true}
#@markdown Choose language or set to "auto" for automatic detection

initial_prompt = "" #@param {type:"string"}
#@markdown Optional: Guide the model with an initial prompt (e.g., "This is a medical lecture about cardiology.")

#@markdown ---
#@markdown ## ⚙️ Advanced Settings
beam_size = 10 #@param {type:"slider", min:1, max:10, step:1}
#@markdown Higher values = better quality but slower processing

word_level_timestamps = False #@param {type:"boolean"}
#@markdown Generate timestamps for each word instead of sentences

vad_filter = True #@param {type:"boolean"}
#@markdown Voice Activity Detection: filter out non-speech parts

vad_filter_min_silence_ms = 50 #@param {type:"integer"}
#@markdown Minimum silence duration in milliseconds for VAD filtering

#@markdown ---
#@markdown ## 📄 Output Settings
output_format = "Text (.txt)" #@param ["Text (.txt)", "Subtitles (.srt)", "Both"]
#@markdown Choose output format for transcriptions

include_timestamps = False #@param {type:"boolean"}
#@markdown Include timestamps in text output

include_confidence = False #@param {type:"boolean"}
#@markdown Include model confidence scores in the output

show_live_transcription = True #@param {type:"boolean"}
#@markdown Show transcription in real-time as it happens

live_display_lines = 5 #@param {type:"slider", min:1, max:10, step:1}
#@markdown Number of lines to show in live transcription window

def seconds_to_timecode(seconds, format="srt"):
    """Convert seconds to formatted timecode"""
    hours = int(seconds // 3600)
    seconds %= 3600
    minutes = int(seconds // 60)
    seconds %= 60
    milliseconds = int((seconds % 1) * 1000)
    seconds = int(seconds)

    if format == "srt":
        return f"{hours:02d}:{minutes:02d}:{seconds:02d},{milliseconds:03d}"
    elif format == "vtt":
        return f"{hours:02d}:{minutes:02d}:{seconds:02d}.{milliseconds:03d}"
    else:
        return f"{hours:02d}:{minutes:02d}:{seconds:02d}"

#@title # 🎬 Run Transcription
#@markdown Start the transcription process for all processed files

# Create a live transcription display area
if show_live_transcription:
    live_display = HTML(
        f"""
        <div style="border:1px solid #ddd; padding:10px; height:{24*live_display_lines}px; overflow-y:auto; margin-bottom:5px; background-color:#2d2d2d; font-family:monospace; white-space:pre-wrap; color:#ffffff;">
        <div id="live-transcription"></div>
        </div>
        """
    )
    display(live_display)

# Check if we have media files to process
if not processor.media_files:
    display(Markdown("⚠️ **No media files found to process. Please run the Media Source Selection cell first.**"))
else:
    # Create timestamp for this batch
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    batch_dir = processor.output_dir / f"batch_{timestamp}"
    batch_dir.mkdir(exist_ok=True)

    # Process each file
    for file_idx, audio_path in enumerate(processor.media_files):
        display(Markdown(f"## 🔄 Processing file {file_idx+1}/{len(processor.media_files)}: **{audio_path.name}**"))

        try:
            # Set up progress display
            progress_html = HTML(
                """
                <div style="width:100%; background-color:#f0f0f0; border-radius:5px;">
                    <div id="transcription-progress" style="background-color:#4CAF50; width:0%; height:20px; border-radius:5px;"></div>
                </div>
                <p id="transcription-status">Starting transcription...</p>
                """
            )
            display(progress_html)

            def update_progress(progress, message):
                display(HTML(
                    f"""
                    <script>
                        document.getElementById('transcription-progress').style.width = '{progress}%';
                        document.getElementById('transcription-status').textContent = '{message}';
                    </script>
                    """
                ))

            def update_live_transcription(text):
                if show_live_transcription:
                    display(HTML(
                        f"""
                        <script>
                            var liveDiv = document.getElementById('live-transcription');
                            liveDiv.innerHTML = "{text}";
                            liveDiv.parentElement.scrollTop = liveDiv.parentElement.scrollHeight;
                            liveDiv.style.color = "#ffffff"; // Ensure text remains white
                        </script>
                        """
                    ))

            # Start transcription
            update_progress(10, "Loading model and analyzing audio...")

            # Recent segments for live display
            class TranscriptionState:
                def __init__(self):
                    self.recent_segments = []

            # Create an instance of the state holder
            state = TranscriptionState()

            # Callback for live transcription
            def process_segment(segment):
                # Add the new segment
                state.recent_segments.append(segment.text.strip())
                # Keep only the most recent lines
                if len(state.recent_segments) > live_display_lines:
                    state.recent_segments = state.recent_segments[-live_display_lines:]
                # Update the display
                display_text = "\n".join(state.recent_segments)
                display_text = display_text.replace('"', '\\"').replace('\n', '\\n')
                update_live_transcription(display_text)

            # Run transcription
            segments, info = model.transcribe(
                str(audio_path),
                beam_size=beam_size,
                language=None if language == "auto" else language,
                initial_prompt=initial_prompt if initial_prompt else None,
                word_timestamps=word_level_timestamps,
                vad_filter=vad_filter,
                vad_parameters=dict(min_silence_duration_ms=vad_filter_min_silence_ms)
            )

            # Convert generator to list so we can iterate multiple times if needed
            segment_list = []
            for segment in segments:
                segment_list.append(segment)
                if show_live_transcription:
                    process_segment(segment)
                update_progress(50 + int(len(segment_list) % 10), "Transcription in progress...")

            update_progress(90, "Finalizing transcription...")

            # Determine output formats
            formats = []
            if output_format == "Text (.txt)" or output_format == "Both":
                formats.append("txt")
            if output_format == "Subtitles (.srt)" or output_format == "Both":
                formats.append("srt")

            # Display detected language info
            display(Markdown(f"🌍 Detected language: **{info.language}** (probability: {info.language_probability:.2f})"))

            # Process the transcription for each output format
            for format in formats:
                output_file_name = f"{audio_path.stem}.{format}"
                output_file_path = batch_dir / output_file_name

                update_progress(95, f"Writing {format.upper()} output...")

                with open(output_file_path, 'w', encoding='utf-8') as f:
                    if format == "txt":
                        # For TXT: Write continuous text without timestamps by default
                        if word_level_timestamps:
                            for segment in segment_list:
                                for i, word in enumerate(segment.words):
                                    text = word.word

                                    if include_timestamps:
                                        f.write(f"[{seconds_to_timecode(word.start, 'txt')}] ")

                                    if include_confidence:
                                        text += f" ({word.probability:.2f})"

                                    f.write(f"{text}")
                                    if i < len(segment.words) - 1:
                                        f.write(" ")
                                f.write("\n")
                        else:
                            for segment in segment_list:
                                text = segment.text.strip()

                                if include_timestamps:
                                    f.write(f"[{seconds_to_timecode(segment.start, 'txt')}-{seconds_to_timecode(segment.end, 'txt')}] ")

                                if include_confidence:
                                    text += f" ({segment.avg_logprob:.2f})"

                                f.write(f"{text}\n")

                    elif format == "srt":
                        # For SRT: Write subtitle format
                        index = 1
                        if word_level_timestamps:
                            for segment in segment_list:
                                for word in segment.words:
                                    f.write(f"{index}\n")
                                    f.write(f"{seconds_to_timecode(word.start)} --> {seconds_to_timecode(word.end)}\n")

                                    text_line = word.word
                                    if include_confidence:
                                        text_line = f"{text_line} ({word.probability:.2f})"

                                    f.write(f"{text_line}\n\n")
                                    index += 1
                        else:
                            for segment in segment_list:
                                f.write(f"{index}\n")
                                f.write(f"{seconds_to_timecode(segment.start)} --> {seconds_to_timecode(segment.end)}\n")

                                text_line = segment.text.strip()
                                if include_confidence:
                                    text_line = f"{text_line} ({segment.avg_logprob:.2f})"

                                f.write(f"{text_line}\n\n")
                                index += 1

                display(Markdown(f"✅ Created {format.upper()} file: **{output_file_name}**"))

            update_progress(100, "Transcription completed successfully!")

        except Exception as e:
            display(Markdown(f"❌ **Error transcribing {audio_path.name}**: {str(e)}"))
            logger.error(f"Transcription error: {str(e)}", exc_info=True)

    # Create a zip file of all outputs
    zip_file_path = batch_dir.parent / f"transcribed_texts_{timestamp}.zip"

    with zipfile.ZipFile(zip_file_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
        for file_path in batch_dir.glob("*"):
            zipf.write(file_path, arcname=file_path.name)

    # Provide download links
    display(Markdown(f"## 📥 Download Results"))
    display(Markdown(f"* [Download ZIP of all transcriptions]({zip_file_path})"))

    # Clean up
    processor.cleanup()

    # Offer to download directly through Colab
    display(Markdown(f"**Or download through Colab:**"))
    files.download(str(zip_file_path))