# Audio/Video Transcription with OpenAI Whisper

In this Notebook on **Container Runtime**, we will **process audio and video files** to generate accurate transcriptions using OpenAI's Whisper model. This notebook leverages GPU acceleration for faster processing and stores results in structured Snowflake tables for analysis and search.

## What we'll accomplish:
- **Audio/Video Processing**: Convert various media formats to audio for transcription
- **Speech-to-Text**: Use OpenAI Whisper for accurate transcription with language detection
- **Batch Processing**: Process multiple files efficiently with progress tracking
- **Data Storage**: Store transcriptions with metadata in Snowflake tables
- **JSON Output**: Structured transcripts with speaker segments and timestamps

**Why is Container Runtime needed?**
Since we have audio and video files, we need to install OpenAI Whisper and FFmpeg for media processing. FFmpeg is required for audio extraction from video files and cannot be installed in standard Warehouse compute. We also use GPU compute here, which makes transcription much faster.

**⚠️ IMPORTANT**: This notebook installs all packages via pip and doesn't require environment.yml. Make sure External Access Integrations are enabled!


## OpenAI Whisper

Whisper is a general-purpose speech recognition model from OpenAI. It's trained on a large dataset of diverse audio and is also a multitasking model that can perform multilingual speech recognition, speech translation, and language identification.

**Key Benefits:**
- **Accuracy**: State-of-the-art transcription quality
- **Multilingual**: Supports 99+ languages
- **Robust**: Handles various audio qualities and accents
- **GPU Accelerated**: Fast processing on Snowflake's GPU compute


## Let's get started!


## ⚙️ Configuration

Set your transcription options here before running the rest of the notebook.


In [None]:
####################################
# TRANSCRIPTION CONFIGURATION
####################################

# Whisper Model Selection
# Options: "tiny", "base", "small", "medium", "large"
# - tiny: Fastest, least accurate (~39x realtime)
# - base: Good balance (default, ~16x realtime)
# - small: Better accuracy (~6x realtime)
# - medium: High accuracy (~2x realtime)
# - large: Best accuracy (~1x realtime)
WHISPER_MODEL = "base"

# Speaker Diarization (identifies different speakers)
# Set to True to enable speaker identification (requires additional processing time)
# Set to False for faster transcription without speaker separation
ENABLE_SPEAKER_DIARIZATION = False

# Batch Processing
# Number of files to process before showing progress update
PROGRESS_UPDATE_INTERVAL = 5

# Output Options
# Include file path in results (useful for organizing large batches)
INCLUDE_FILE_PATH = True

print("✅ Configuration loaded:")
print(f"   📊 Whisper Model: {WHISPER_MODEL}")
print(f"   👥 Speaker Diarization: {'ENABLED' if ENABLE_SPEAKER_DIARIZATION else 'DISABLED'}")
print(f"   📈 Progress Updates: Every {PROGRESS_UPDATE_INTERVAL} files")
print(f"   📁 Include File Paths: {INCLUDE_FILE_PATH}")


In [None]:
# Install all required packages since environment.yml is minimal
print("🔧 Installing required packages...")
print("This may take several minutes...")

!pip install openai-whisper pandas

import warnings
warnings.filterwarnings("ignore")

from snowflake.core import Root
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.functions import col
from snowflake.snowpark.types import StructType, StructField, StringType, FloatType, IntegerType, TimestampType
import glob
import os
import time
from datetime import datetime
import pandas as pd

session = get_active_session()
root = Root(session)

# Add a query tag to the session for monitoring
session.query_tag = {"origin":"sf_se", 
                     "name":"audio_video_transcription", 
                     "version":{"major":1, "minor":0},
                     "attributes":{"is_quickstart":0, "source":"notebook"}}

# Set session context 
session.use_role("SYSADMIN")

# Print the current role, warehouse, and database/schema
print(f"Role: {session.get_current_role()}")
print(f"Warehouse: {session.get_current_warehouse()}")
print(f"Database.Schema: {session.get_fully_qualified_current_schema()}")


## 1. Prerequisites & Setup

**🚨 CRITICAL: External Access Integrations Must Be Enabled**

Before running any installation commands, you MUST enable external access integrations:

### **Steps to Enable External Access:**
1. Click the **⚙️ Settings** icon in the top-right of this notebook
2. In the settings panel, find **"External Access Integrations"**
3. **Toggle ON** these integrations:
   - ✅ `TRANSCRIPTION_PYPI_ACCESS_INTEGRATION`
   - ✅ `TRANSCRIPTION_ALLOW_ALL_INTEGRATION`
4. Click **"Save"** or close the settings panel

### **Other Prerequisites:**
- Upload media files to the `AUDIO_VIDEO_STAGE` in Snowflake
- Ensure you're using a GPU compute pool for faster processing

**⚠️ If you skip enabling external access integrations, package installation will fail!**


Verify FFmpeg is available (pre-installed in Container Runtime):


In [None]:
# Check if FFmpeg is available (usually pre-installed in Container Runtime)
import subprocess

try:
    result = subprocess.run(['ffmpeg', '-version'], capture_output=True, text=True, check=True)
    print("✅ FFmpeg is already installed!")
    print(f"   Version: {result.stdout.split()[2]}")
except:
    print("⚠️  FFmpeg not found. Attempting installation...")
    try:
        # Install if not present
        subprocess.run(['apt-get', 'update'], check=True, capture_output=True)
        subprocess.run(['apt-get', 'install', '-y', 'ffmpeg'], check=True, capture_output=True)
        print("✅ FFmpeg installed successfully!")
    except Exception as e:
        print(f"❌ Failed to install FFmpeg: {e}")
        raise


In [None]:
# Verify ffprobe is also available (needed for duration detection)
try:
    result = subprocess.run(['ffprobe', '-version'], capture_output=True, text=True, check=True)
    print("✅ ffprobe is available")
except:
    print("⚠️ ffprobe not found (should be installed with ffmpeg)")


In [None]:
# Display FFmpeg capabilities
!ffmpeg -version | head -3


Now install OpenAI Whisper and additional dependencies:


In [None]:
# Install required packages for transcription
# CRITICAL: External Access Integrations must be enabled in notebook settings

print("📦 Installing required packages...")
print("🔄 Step 1: Installing OpenAI Whisper...")

try:
    import subprocess
    import sys
    
    # Install whisper first
    result = subprocess.run([sys.executable, "-m", "pip", "install", "openai-whisper"], 
                          capture_output=True, text=True, check=True)
    print("✅ OpenAI Whisper installed successfully")
    
except subprocess.CalledProcessError as e:
    print(f"❌ Failed to install OpenAI Whisper: {e}")
    print(f"Error output: {e.stderr}")
    print("🚨 Make sure External Access Integrations are enabled in notebook settings!")
    raise

print("✅ Package installation complete!")
print("ℹ️  Video processing uses FFmpeg CLI (more reliable in Container Runtime)")
print("ℹ️  Speaker diarization is available as an optional enhancement.")


**🔍 Troubleshooting Installation Issues:**

If the installation above failed:
1. **Check External Access Integrations** - Go to notebook settings and ensure they're enabled
2. **Restart the notebook** - Sometimes a fresh start helps
3. **Try manual installation** - Run individual pip commands in separate cells


## 2. Load Whisper Model

Load the Whisper model configured in the Configuration cell above (default: 'base'). You can change the model by updating `WHISPER_MODEL` in the configuration.


In [None]:
# Import required libraries and test installations
print("📚 Testing library imports...")

# Test Whisper import
try:
    import whisper
    print("   ✅ OpenAI Whisper imported successfully")
    WHISPER_AVAILABLE = True
except ImportError as e:
    print(f"   ❌ Error importing Whisper: {e}")
    print("   🚨 SOLUTION: Enable External Access Integrations in notebook settings")
    print("   Then restart the notebook and try again")
    WHISPER_AVAILABLE = False

# Video processing uses FFmpeg CLI directly
print("   ✅ Video processing available via FFmpeg CLI")

# Check for optional speaker diarization packages
try:
    from pyannote.audio import Pipeline
    import torch
    print("   ✅ PyAnnote.audio imported")
    DIARIZATION_AVAILABLE = True
except ImportError as e:
    print(f"   ℹ️  PyAnnote.audio not available (this is optional)")
    print("   Speaker diarization will be skipped. Only basic transcription will be performed.")
    DIARIZATION_AVAILABLE = False

import json

# Check if we can proceed
if not WHISPER_AVAILABLE:
    print("\n❌ Cannot proceed without Whisper. Please:")
    print("   1. Go to notebook Settings (⚙️ icon)")
    print("   2. Enable External Access Integrations")
    print("   3. Restart this notebook")
    print("   4. Re-run the installation cell")
    raise ImportError("Whisper is required but not available")

# Load whisper model
if WHISPER_AVAILABLE:
    print(f"\n🤖 Loading Whisper model: '{WHISPER_MODEL}'...")
    try:
        model = whisper.load_model(WHISPER_MODEL)
        print(f"✅ Whisper model '{WHISPER_MODEL}' loaded successfully!")
        print(f"   Model: {model.dims}")
        print(f"   Device: {next(model.parameters()).device}")
    except Exception as e:
        print(f"❌ Error loading Whisper model: {e}")
        print("   This might be a GPU/memory issue. Try restarting the notebook.")
        raise
else:
    print("❌ Skipping model loading - Whisper not available")
    model = None

# Load speaker diarization pipeline (optional, based on configuration)
diarization_pipeline = None
if ENABLE_SPEAKER_DIARIZATION and DIARIZATION_AVAILABLE:
    print("\n👥 Loading speaker diarization pipeline...")
    try:
        # Note: This requires a HuggingFace token for some models
        # If you get authentication errors, you'll need to provide a token
        diarization_pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1")
        print("✅ Speaker diarization pipeline loaded!")
        print("   This will add speaker identification to transcriptions")
    except Exception as e:
        print(f"⚠️  Could not load diarization pipeline: {e}")
        print("   Continuing without speaker diarization...")
        DIARIZATION_AVAILABLE = False
elif ENABLE_SPEAKER_DIARIZATION and not DIARIZATION_AVAILABLE:
    print("\n⚠️  Speaker diarization is ENABLED in config but PyAnnote is not available")
    print("   Install PyAnnote to enable speaker diarization:")
    print("   !pip install pyannote.audio torch torchaudio")
else:
    print("\nℹ️  Speaker diarization is DISABLED in configuration")
    print("   Set ENABLE_SPEAKER_DIARIZATION = True to enable it")
    DIARIZATION_AVAILABLE = False


## 3. Download Media Files

Download all audio and video files from the Snowflake stage to process locally.


In [None]:
# Create local directory for media files
os.makedirs('media_files', exist_ok=True)

# Download all files from the audio/video stage
print("Downloading files from stage...")
try:
    files = session.file.get('@TRANSCRIPTION_DB.TRANSCRIPTION_SCHEMA.AUDIO_VIDEO_STAGE/', 'media_files/')
    print(f"✅ Downloaded {len(files)} files")
except Exception as e:
    print(f"⚠️  Error downloading files: {e}")
    print("Make sure you have uploaded files to the AUDIO_VIDEO_STAGE first!")
    files = []


## 4. Define Helper Functions

Create functions to handle audio extraction from video files and transcription processing.


In [None]:
def extract_audio_from_video(video_path, audio_path):
    """
    Extract audio from video file using FFmpeg CLI (more reliable in Container Runtime)
    """
    try:
        import subprocess
        
        # Get video duration using ffprobe
        duration_cmd = [
            'ffprobe',
            '-v', 'error',
            '-show_entries', 'format=duration',
            '-of', 'default=noprint_wrappers=1:nokey=1',
            video_path
        ]
        
        try:
            duration_result = subprocess.run(duration_cmd, capture_output=True, text=True, check=True)
            duration = float(duration_result.stdout.strip())
        except:
            duration = 0
        
        # Extract audio using FFmpeg
        extract_cmd = [
            'ffmpeg',
            '-y',  # Overwrite output file
            '-i', video_path,  # Input file
            '-vn',  # No video
            '-acodec', 'pcm_s16le',  # Audio codec for WAV
            '-ar', '16000',  # Sample rate (Whisper compatible)
            '-ac', '1',  # Mono audio
            audio_path
        ]
        
        result = subprocess.run(extract_cmd, capture_output=True, text=True, check=False)
        
        if result.returncode == 0 and os.path.exists(audio_path):
            return True, duration
        else:
            print(f"FFmpeg extraction failed: {result.stderr[:200]}")
            return False, 0
            
    except Exception as e:
        print(f"Error extracting audio from {video_path}: {e}")
        return False, 0

def get_file_size(file_path):
    """Get file size in bytes"""
    try:
        return os.path.getsize(file_path)
    except:
        return 0

def transcribe_media_file(file_path):
    """
    Transcribe audio or video files using Whisper with optional speaker diarization
    Returns: (transcript, language, processing_time, audio_duration, transcript_with_speakers, speaker_count)
    """
    start_time = time.time()
    
    try:
        # Determine file type
        file_ext = os.path.splitext(file_path)[1].lower()
        video_formats = ['.mp4', '.avi', '.mov', '.mkv', '.webm', '.flv']
        audio_formats = ['.mp3', '.wav', '.m4a', '.flac', '.aac', '.ogg']
        
        audio_path = file_path
        audio_duration = 0
        
        # If it's a video file, extract audio first
        if file_ext in video_formats:
            print(f"📹 Extracting audio from video: {os.path.basename(file_path)}")
            audio_path = file_path.rsplit('.', 1)[0] + '_temp_audio.wav'
            success, duration = extract_audio_from_video(file_path, audio_path)
            if not success:
                return None, None, 0, 0, None, 0
            audio_duration = duration
        elif file_ext in audio_formats:
            print(f"🎵 Processing audio file: {os.path.basename(file_path)}")
            # Get audio duration using whisper
            try:
                audio = whisper.load_audio(file_path)
                audio_duration = len(audio) / whisper.audio.SAMPLE_RATE
            except:
                audio_duration = 0
        else:
            print(f"⚠️  Unsupported file format: {file_ext}")
            return None, None, 0, 0, None, 0
        
        # Transcribe using Whisper with word-level timestamps
        print(f"🔄 Transcribing: {os.path.basename(file_path)}")
        result = model.transcribe(audio_path, word_timestamps=True)
        
        # Initialize speaker diarization variables
        transcript_with_speakers = None
        speaker_count = 0
        
        # Perform speaker diarization if available
        if DIARIZATION_AVAILABLE and diarization_pipeline is not None:
            try:
                print(f"👥 Identifying speakers...")
                diarization = diarization_pipeline(audio_path)
                
                # Create speaker segments
                speaker_segments = []
                current_segments = {}
                
                # Process word-level timestamps with speaker information
                for segment in result["segments"]:
                    for word_info in segment.get("words", []):
                        word_start = word_info["start"]
                        word_end = word_info["end"]
                        word_text = word_info["word"]
                        
                        # Find speaker for this time segment
                        speaker_label = "Unknown"
                        for turn, _, speaker in diarization.itertracks(yield_label=True):
                            if word_start >= turn.start and word_end <= turn.end:
                                speaker_label = f"Speaker_{speaker}"
                                break
                        
                        # Group consecutive words by speaker
                        if speaker_label not in current_segments:
                            current_segments[speaker_label] = {
                                "speaker": speaker_label,
                                "start_time": word_start,
                                "end_time": word_end,
                                "text": word_text
                            }
                        else:
                            # Extend current segment
                            current_segments[speaker_label]["end_time"] = word_end
                            current_segments[speaker_label]["text"] += word_text
                
                # Convert to final format
                for speaker_info in current_segments.values():
                    speaker_segments.append({
                        "speaker": speaker_info["speaker"],
                        "start_time": round(speaker_info["start_time"], 2),
                        "end_time": round(speaker_info["end_time"], 2),
                        "duration": round(speaker_info["end_time"] - speaker_info["start_time"], 2),
                        "text": speaker_info["text"].strip()
                    })
                
                # Sort by start time
                speaker_segments.sort(key=lambda x: x["start_time"])
                
                # Create JSON structure
                transcript_with_speakers = {
                    "file_info": {
                        "filename": os.path.basename(file_path),
                        "duration": round(audio_duration, 2),
                        "language": result.get('language', 'unknown')
                    },
                    "speakers": speaker_segments,
                    "full_transcript": result['text'].strip()
                }
                
                speaker_count = len(set(seg["speaker"] for seg in speaker_segments))
                print(f"   🎯 Identified {speaker_count} speakers")
                
            except Exception as e:
                print(f"   ⚠️  Speaker diarization failed: {e}")
                print(f"   Continuing with basic transcription...")
        else:
            # Create simple JSON structure with time-based segments (fallback)
            if result.get("segments"):
                segments = []
                for i, segment in enumerate(result["segments"]):
                    segments.append({
                        "speaker": f"Speaker_{i % 2}",  # Simple alternating speakers as demo
                        "start_time": round(segment["start"], 2),
                        "end_time": round(segment["end"], 2), 
                        "duration": round(segment["end"] - segment["start"], 2),
                        "text": segment["text"].strip()
                    })
                
                transcript_with_speakers = {
                    "file_info": {
                        "filename": os.path.basename(file_path),
                        "duration": round(audio_duration, 2),
                        "language": result.get('language', 'unknown')
                    },
                    "speakers": segments,
                    "full_transcript": result['text'].strip()
                }
                
                speaker_count = 2  # Demo: assume 2 speakers
                print(f"   📝 Created demo speaker segments (alternating speakers)")
        
        # Clean up temporary audio file if created
        if audio_path != file_path and os.path.exists(audio_path):
            os.remove(audio_path)
        
        processing_time = time.time() - start_time
        
        print(f"✅ Completed: {os.path.basename(file_path)} ({processing_time:.2f}s)")
        print(f"   Language: {result.get('language', 'unknown')}")
        print(f"   Duration: {audio_duration:.1f}s")
        if speaker_count > 0:
            print(f"   Speakers: {speaker_count}")
        
        return (
            result['text'].strip(),
            result.get('language', 'unknown'),
            processing_time,
            audio_duration,
            transcript_with_speakers,
            speaker_count
        )
        
    except Exception as e:
        processing_time = time.time() - start_time
        print(f"❌ Error transcribing {file_path}: {e}")
        return None, None, processing_time, 0, None, 0

print("✅ Helper functions defined!")


## 5. Process Media Files

Find all media files and process them for transcription.


In [None]:
# Find all media files
audio_formats = ['*.mp3', '*.wav', '*.m4a', '*.flac', '*.aac', '*.ogg']
video_formats = ['*.mp4', '*.avi', '*.mov', '*.mkv', '*.webm', '*.flv']

# Video processing available via FFmpeg CLI
supported_formats = audio_formats + video_formats
print("📁 Scanning for audio and video files...")

media_files = []
for pattern in supported_formats:
    media_files.extend(glob.glob(f'media_files/{pattern}'))
    # Also check for uppercase extensions
    media_files.extend(glob.glob(f'media_files/{pattern.upper()}'))

print(f"📁 Found {len(media_files)} media files to process:")
for i, file in enumerate(media_files[:10], 1):  # Show first 10
    file_ext = os.path.splitext(file)[1].lower()
    file_type = "🎵" if file_ext in ['.mp3', '.wav', '.m4a', '.flac', '.aac', '.ogg'] else "📹"
    print(f"   {i}. {file_type} {os.path.basename(file)}")
if len(media_files) > 10:
    print(f"   ... and {len(media_files) - 10} more files")

if len(media_files) == 0:
    print("⚠️  No media files found! Please upload files to the AUDIO_VIDEO_STAGE first.")


## 6. Batch Transcription Processing

Process all media files and collect results. This may take some time depending on the number and size of files.


In [None]:
# Process all media files
all_transcriptions = []
total_files = len(media_files)
failed_files = []

if total_files > 0:
    print(f"🚀 Starting transcription of {total_files} files...")
    print("=" * 60)
    
    for i, file_path in enumerate(media_files, 1):
        print(f"\n[{i}/{total_files}] Processing: {os.path.basename(file_path)}")
        
        # Get file info
        file_name = os.path.basename(file_path)
        file_type = os.path.splitext(file_name)[1][1:].upper()  # Remove dot and uppercase
        file_size = get_file_size(file_path)
        
        # Transcribe the file with speaker diarization
        transcript, language, processing_time, audio_duration, transcript_with_speakers, speaker_count = transcribe_media_file(file_path)
        
        if transcript is not None:
            # Store successful transcription
            record = {
                'FILE_PATH': file_path if INCLUDE_FILE_PATH else '',
                'FILE_NAME': file_name,
                'FILE_TYPE': file_type,
                'DETECTED_LANGUAGE': language,
                'TRANSCRIPT': transcript,
                'TRANSCRIPT_WITH_SPEAKERS': transcript_with_speakers,
                'PROCESSING_TIME_SECONDS': processing_time,
                'FILE_SIZE_BYTES': file_size,
                'AUDIO_DURATION_SECONDS': audio_duration,
                'SPEAKER_COUNT': speaker_count,
                'TRANSCRIPTION_TIMESTAMP': datetime.now()
            }
            all_transcriptions.append(record)
        else:
            failed_files.append(file_name)
        
        # Progress update (using configured interval)
        if i % PROGRESS_UPDATE_INTERVAL == 0 or i == total_files:
            success_count = len(all_transcriptions)
            print(f"\n📊 Progress: {i}/{total_files} processed, {success_count} successful")
    
    print("\n" + "=" * 60)
    print(f"✅ Transcription complete!")
    print(f"   Successfully processed: {len(all_transcriptions)} files")
    print(f"   Failed: {len(failed_files)} files")
    
    if failed_files:
        print(f"   Failed files: {', '.join(failed_files)}")
        
else:
    print("⚠️  No files to process.")


## 7. Preview Results

Let's take a look at some of the transcription results before storing them in Snowflake.


In [None]:
if all_transcriptions:
    print("📋 Sample transcription results:")
    print("=" * 80)
    
    for i, result in enumerate(all_transcriptions[:3], 1):  # Show first 3 results
        print(f"\n🎵 File {i}: {result['FILE_NAME']}")
        print(f"   Type: {result['FILE_TYPE']} | Language: {result['DETECTED_LANGUAGE']}")
        print(f"   Duration: {result['AUDIO_DURATION_SECONDS']:.1f}s | Processing: {result['PROCESSING_TIME_SECONDS']:.1f}s")
        print(f"   Size: {result['FILE_SIZE_BYTES']:,} bytes")
        
        # Show speaker information if available
        if result['SPEAKER_COUNT'] > 0:
            print(f"   👥 Speakers identified: {result['SPEAKER_COUNT']}")
            
            # Show first few speaker segments
            if result['TRANSCRIPT_WITH_SPEAKERS']:
                speakers_data = result['TRANSCRIPT_WITH_SPEAKERS']
                print(f"   📝 Speaker segments preview:")
                for j, segment in enumerate(speakers_data.get('speakers', [])[:3]):
                    print(f"      {segment['speaker']} ({segment['start_time']:.1f}s-{segment['end_time']:.1f}s): {segment['text'][:100]}...")
                if len(speakers_data.get('speakers', [])) > 3:
                    print(f"      ... and {len(speakers_data.get('speakers', [])) - 3} more segments")
        else:
            print(f"   📝 Transcript preview: {result['TRANSCRIPT'][:200]}...")
        
        print("-" * 80)
    
    if len(all_transcriptions) > 3:
        print(f"\n... and {len(all_transcriptions) - 3} more transcriptions")
        
    # Summary statistics
    total_duration = sum(r['AUDIO_DURATION_SECONDS'] for r in all_transcriptions)
    total_processing = sum(r['PROCESSING_TIME_SECONDS'] for r in all_transcriptions)
    avg_speed_ratio = total_processing / total_duration if total_duration > 0 else 0
    
    languages = {}
    file_types = {}
    for r in all_transcriptions:
        languages[r['DETECTED_LANGUAGE']] = languages.get(r['DETECTED_LANGUAGE'], 0) + 1
        file_types[r['FILE_TYPE']] = file_types.get(r['FILE_TYPE'], 0) + 1
    
    print(f"\n📊 Summary Statistics:")
    print(f"   Total audio duration: {total_duration/60:.1f} minutes")
    print(f"   Total processing time: {total_processing/60:.1f} minutes")
    print(f"   Average speed ratio: {avg_speed_ratio:.2f}x (lower is faster)")
    print(f"   Languages detected: {', '.join(languages.keys())}")
    print(f"   File types: {', '.join(file_types.keys())}")
else:
    print("No transcription results to display.")


## 8. Store Results in Snowflake

Now we'll store all transcription results in the Snowflake table for analysis and search.


In [None]:
if all_transcriptions:
    print("💾 Storing transcription results in Snowflake...")
    
    # Convert to Pandas DataFrame and fix timestamp formatting
    df_results = pd.DataFrame(all_transcriptions)
    
    # Ensure timestamp is properly formatted as string for Snowflake
    df_results['TRANSCRIPTION_TIMESTAMP'] = df_results['TRANSCRIPTION_TIMESTAMP'].dt.strftime('%Y-%m-%d %H:%M:%S.%f')
    
    # Convert speaker data to JSON strings for storage
    df_results['TRANSCRIPT_WITH_SPEAKERS'] = df_results['TRANSCRIPT_WITH_SPEAKERS'].apply(
        lambda x: json.dumps(x) if x is not None else None
    )
    
    # Convert to Snowpark DataFrame with proper schema
    schema = StructType([
        StructField("FILE_PATH", StringType()),
        StructField("FILE_NAME", StringType()),
        StructField("FILE_TYPE", StringType()),
        StructField("DETECTED_LANGUAGE", StringType()),
        StructField("TRANSCRIPT", StringType()),
        StructField("TRANSCRIPT_WITH_SPEAKERS", StringType()),  # JSON as string
        StructField("PROCESSING_TIME_SECONDS", FloatType()),
        StructField("FILE_SIZE_BYTES", IntegerType()),
        StructField("AUDIO_DURATION_SECONDS", FloatType()),
        StructField("SPEAKER_COUNT", IntegerType()),
        StructField("TRANSCRIPTION_TIMESTAMP", StringType())  # Use StringType for timestamp
    ])
    
    snowpark_df = session.create_dataframe(df_results, schema=schema)
    
    # Write to Snowflake table using SQL INSERT with proper timestamp conversion
    try:
        # Create a temporary view from the DataFrame
        snowpark_df.create_or_replace_temp_view("temp_transcription_data")
        
        # Insert data with proper timestamp and JSON conversion
        insert_sql = """
        INSERT INTO TRANSCRIPTION_RESULTS (
            FILE_PATH, FILE_NAME, FILE_TYPE, DETECTED_LANGUAGE, TRANSCRIPT, TRANSCRIPT_WITH_SPEAKERS,
            PROCESSING_TIME_SECONDS, FILE_SIZE_BYTES, AUDIO_DURATION_SECONDS, SPEAKER_COUNT, TRANSCRIPTION_TIMESTAMP
        )
        SELECT 
            FILE_PATH, FILE_NAME, FILE_TYPE, DETECTED_LANGUAGE, TRANSCRIPT,
            CASE 
                WHEN TRANSCRIPT_WITH_SPEAKERS IS NOT NULL 
                THEN PARSE_JSON(TRANSCRIPT_WITH_SPEAKERS)
                ELSE NULL 
            END as TRANSCRIPT_WITH_SPEAKERS,
            PROCESSING_TIME_SECONDS, FILE_SIZE_BYTES, AUDIO_DURATION_SECONDS, SPEAKER_COUNT,
            TO_TIMESTAMP_NTZ(TRANSCRIPTION_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.FF6')
        FROM temp_transcription_data
        """
        
        result = session.sql(insert_sql)
        result.collect()  # Execute the query
        
        print(f"✅ Successfully stored {len(all_transcriptions)} transcription records!")
        
        # Verify the data was written
        count_query = session.sql("SELECT COUNT(*) as TOTAL FROM TRANSCRIPTION_RESULTS")
        total_records = count_query.collect()[0]['TOTAL']
        print(f"   Total records in TRANSCRIPTION_RESULTS table: {total_records}")
        
    except Exception as e:
        print(f"❌ Error storing results: {e}")
        print("   Trying alternative method...")
        
        # Alternative method: Insert records one by one
        try:
            for record in all_transcriptions:
                # Handle JSON speaker data
                speaker_json = "NULL"
                if record['TRANSCRIPT_WITH_SPEAKERS'] is not None:
                    # Escape the JSON for SQL
                    json_str = json.dumps(record['TRANSCRIPT_WITH_SPEAKERS']).replace("'", "''")
                    speaker_json = f"PARSE_JSON('{json_str}')"
                
                insert_sql = f"""
                INSERT INTO TRANSCRIPTION_RESULTS VALUES (
                    '{record['FILE_PATH']}',
                    '{record['FILE_NAME']}',
                    '{record['FILE_TYPE']}',
                    '{record['DETECTED_LANGUAGE']}',
                    $${record['TRANSCRIPT']}$$,
                    {speaker_json},
                    {record['PROCESSING_TIME_SECONDS']},
                    {record['FILE_SIZE_BYTES']},
                    {record['AUDIO_DURATION_SECONDS']},
                    {record['SPEAKER_COUNT']},
                    CURRENT_TIMESTAMP()
                )
                """
                session.sql(insert_sql).collect()
            
            print(f"✅ Successfully stored {len(all_transcriptions)} records using alternative method!")
            
        except Exception as e2:
            print(f"❌ Alternative method also failed: {e2}")
        
else:
    print("⚠️  No transcription results to store.")


## 9. Query and Analyze Results

Let's run some sample queries to explore our transcription data.


In [None]:
# Query recent transcriptions
print("🔍 Recent transcriptions:")
recent_query = """
SELECT 
    FILE_NAME,
    FILE_TYPE,
    DETECTED_LANGUAGE,
    ROUND(AUDIO_DURATION_SECONDS, 1) as DURATION_SEC,
    ROUND(PROCESSING_TIME_SECONDS, 1) as PROCESSING_SEC,
    TRANSCRIPTION_TIMESTAMP
FROM TRANSCRIPTION_RESULTS 
ORDER BY TRANSCRIPTION_TIMESTAMP DESC 
LIMIT 5
"""

try:
    recent_df = session.sql(recent_query).to_pandas()
    if not recent_df.empty:
        print(recent_df.to_string(index=False))
    else:
        print("No results found.")
except Exception as e:
    print(f"Error querying data: {e}")

print("\n" + "="*60)


In [None]:
# Query summary statistics using the view
print("📊 Summary statistics from TRANSCRIPTION_SUMMARY view:")
summary_query = "SELECT * FROM TRANSCRIPTION_SUMMARY ORDER BY FILE_COUNT DESC"

try:
    summary_df = session.sql(summary_query).to_pandas()
    if not summary_df.empty:
        print(summary_df.to_string(index=False))
    else:
        print("No summary data available.")
except Exception as e:
    print(f"Error querying summary: {e}")

print("\n" + "="*60)


In [None]:
# Search transcriptions for specific content (example)
search_term = "meeting"  # Change this to search for different terms
print(f"🔍 Searching for '{search_term}' in transcriptions:")

search_query = f"""
SELECT 
    FILE_NAME,
    DETECTED_LANGUAGE,
    SPEAKER_COUNT,
    SUBSTR(TRANSCRIPT, 1, 100) as TRANSCRIPT_PREVIEW
FROM TRANSCRIPTION_RESULTS 
WHERE TRANSCRIPT ILIKE '%{search_term}%'
LIMIT 3
"""

try:
    search_df = session.sql(search_query).to_pandas()
    if not search_df.empty:
        print(search_df.to_string(index=False))
    else:
        print(f"No transcriptions found containing '{search_term}'")
except Exception as e:
    print(f"Error searching transcriptions: {e}")

print("\n" + "="*60)

# Example: Query speaker segments from JSON data
print("👥 Example: Querying speaker segments from JSON data:")

speaker_query = """
SELECT 
    FILE_NAME,
    SPEAKER_COUNT,
    TRANSCRIPT_WITH_SPEAKERS:file_info:language::STRING as LANGUAGE,
    TRANSCRIPT_WITH_SPEAKERS:speakers[0]:speaker::STRING as FIRST_SPEAKER,
    TRANSCRIPT_WITH_SPEAKERS:speakers[0]:text::STRING as FIRST_SPEAKER_TEXT
FROM TRANSCRIPTION_RESULTS 
WHERE TRANSCRIPT_WITH_SPEAKERS IS NOT NULL
LIMIT 2
"""

try:
    speaker_df = session.sql(speaker_query).to_pandas()
    if not speaker_df.empty:
        print(speaker_df.to_string(index=False))
    else:
        print("No files with speaker data found")
except Exception as e:
    print(f"Error querying speaker data: {e}")


## 10. Cleanup and Next Steps

Clean up temporary files and provide information about next steps.


In [None]:
# Cleanup temporary files
import shutil

try:
    if os.path.exists('media_files'):
        shutil.rmtree('media_files')
        print("🧹 Cleaned up temporary media files")
except Exception as e:
    print(f"Note: Could not clean up temporary files: {e}")

print("\n🎉 Transcription process complete!")
print("\n📋 What you can do next:")
print("   1. Use the Streamlit dashboard to explore and search your transcriptions")
print("   2. Run SQL queries against the TRANSCRIPTION_RESULTS table")
print("   3. Use the TRANSCRIPTION_SUMMARY view for analytics")
print("   4. Set up automated processing with Snowflake Tasks (see README)")
print("\n💡 Tips:")
print("   • Upload more files to AUDIO_VIDEO_STAGE and re-run this notebook")
print("   • Experiment with different Whisper models (tiny, base, small, medium, large)")
print("   • Use the search functionality to find specific content in transcriptions")
print("\n📊 Dashboard: Run 'streamlit run streamlit/transcription_dashboard.py'")
