# Whisper Speech Recognition on AI PCs: Local Audio Intelligence

## Introduction

This notebook demonstrates how to run OpenAI's Whisper speech recognition model locally on an AI PC. It is optimized for Intel® Core™ Ultra processors, utilizing the integrated GPU (Intel® Arc™ Graphics) for efficient audio transcription and translation workloads.

## What is an AI PC?

An AI PC is a next-generation computing platform equipped with a CPU, GPU, and NPU, each designed with specific AI acceleration capabilities.

**Fast Response (CPU)**  
The central processing unit (CPU) is optimized for smaller, low-latency workloads, making it ideal for quick responses and general-purpose tasks.

**High Throughput (GPU)**  
The graphics processing unit (GPU) excels at handling large-scale workloads that require high parallelism and throughput, making it suitable for tasks like speech recognition and audio processing.

**Power Efficiency (NPU)**  
The neural processing unit (NPU) is designed for sustained, heavily-used AI workloads, delivering high efficiency and low power consumption for continuous inference tasks.

The AI PC represents a transformative shift in computing, enabling advanced AI applications like Whisper speech recognition to run seamlessly on local hardware. This innovation enhances privacy, reduces latency, and eliminates dependency on cloud services for audio processing.

## What is Whisper?

Whisper is a state-of-the-art automatic speech recognition (ASR) system developed by OpenAI. It was trained on 680,000 hours of multilingual and multitask supervised data, making it robust to accents, background noise, and technical language.

## Learning Objectives

By the end of this workshop, participants will be able to:

1. **Remember**: Recall the main components of a speech-to-text (STT) pipeline using Whisper models
2. **Understand**: Explain how Whisper models process audio data and generate transcriptions
3. **Apply**: Implement a speech transcription application using Whisper models and PyTorch
4. **Analyze**: Examine the performance characteristics of Whisper models and identify optimization strategies
5. **Evaluate**: Compare different Whisper model configurations and their impact on transcription quality and speed
6. **Create**: Develop a custom speech recognition application with streaming output optimized for Intel XPU hardware

## Key Features of This Implementation

- **Local Processing**: All audio data stays on your device for privacy
- **GPU Acceleration**: Utilizes Intel® Arc™ Graphics for fast inference
- **Multiple Languages**: Support for English, Spanish, French, German, and Chinese
- **Real-time Performance**: Optimized for responsive audio processing
- **Memory Efficiency**: Smart caching and memory management for sustained usage

## 1. Setting Up the Environment

### Code Walkthrough:

• **Import Essential Libraries**: PyTorch for deep learning, Transformers for Whisper models, datasets for audio data loading

• **Global State Management**: MODEL_CACHE stores loaded models to prevent redundant loading, GENERATION_COUNT tracks inference operations

• **Device Detection**: Automatically detects Intel XPU (GPU) availability and sets appropriate data types (FP16 for XPU, FP32 for CPU)

• **Memory Management Functions**: 
  - `clear_gpu_memory()`: Clears XPU cache, synchronizes device, and performs garbage collection
  - `reset_model_state()`: Resets model's internal caches and decoder states to prevent memory accumulation

• **XPU Optimizations**: When Intel Arc Graphics is detected, uses FP16 precision for 2-3x speedup with minimal accuracy loss

In [None]:
import time
import torch
import numpy as np
import gc
from datasets import load_dataset
from transformers import AutoModelForSpeechSeq2Seq, pipeline, AutoProcessor
from transformers.models.whisper import WhisperFeatureExtractor, WhisperTokenizer
from IPython.display import display, Audio, clear_output
import warnings
warnings.filterwarnings('ignore')

# Global model cache to prevent reloading
MODEL_CACHE = {}

# Track generation count for memory management
GENERATION_COUNT = 0
MAX_GENERATIONS_BEFORE_RESET = 50

# Check if we have access to Intel XPU hardware
if hasattr(torch, 'xpu') and torch.xpu.is_available():
    device = 'xpu'
    dtype = torch.float16
    print(f"Using Intel XPU device: {torch.xpu.get_device_name()}")
else:
    device = 'cpu'
    dtype = torch.float32
    print("Using CPU for inference")

def clear_gpu_memory():
    """
    Clear GPU/XPU memory and reset cache.
    
    This function performs garbage collection and clears GPU memory cache
    to free up resources for subsequent operations.
    """
    if device == 'xpu':
        torch.xpu.empty_cache()
        torch.xpu.synchronize()
        if hasattr(torch.xpu, 'reset_peak_memory_stats'):
            torch.xpu.reset_peak_memory_stats()
    gc.collect()
    print("GPU memory cleared")

def reset_model_state(model):
    """
    Reset model's internal state and caches.
    
    Args:
        model: The Whisper model to reset
        
    Returns:
        The reset model or None if model is invalid
    """
    if model is None:
        return None
        
    if hasattr(model, 'model'):
        model = model.model
    
    # Clear any cached states in encoder/decoder
    if hasattr(model, 'encoder'):
        if hasattr(model.encoder, 'clear_cache'):
            model.encoder.clear_cache()
    
    if hasattr(model, 'decoder'):
        if hasattr(model.decoder, 'clear_cache'):
            model.decoder.clear_cache()
        model.decoder.past_key_values = None
    
    # Reset generation config safely
    if hasattr(model, 'generation_config') and model.generation_config is not None:
        try:
            model.generation_config.use_cache = True
        except AttributeError:
            pass
    
    return model

## 2. Loading a Sample Audio Dataset

### Code Walkthrough:

• **Dataset Loading**: Uses Hugging Face's streaming API to load LibriSpeech dataset without downloading the entire dataset

• **Audio Properties**: Extracts audio waveform array and sampling rate (typically 16kHz for speech)

• **Error Handling**: Falls back to synthetic audio generation if dataset loading fails (useful for offline testing)

• **Audio Playback**: Displays an interactive audio player in the notebook for listening to the sample

In [None]:
# Load LibriSpeech sample dataset
print("Loading sample audio from LibriSpeech dataset...")
try:
    dataset = load_dataset("distil-whisper/librispeech_long", "clean", split="validation", streaming=True)
    dataset_iter = iter(dataset)
    sample_data = next(dataset_iter)
    
    # Get a sample audio
    audio = sample_data["audio"]
    print(f"Audio sample rate: {audio['sampling_rate']} Hz")
    print(f"Audio duration: {len(audio['array']) / audio['sampling_rate']:.2f} seconds")
    
    # Display audio waveform
    display(Audio(audio['array'], rate=audio['sampling_rate']))
except Exception as e:
    print(f"Error loading dataset: {e}")
    # Create synthetic audio as fallback
    sr = 16000
    duration = 5
    t = np.linspace(0, duration, int(sr * duration), False)
    audio = {
        "array": 0.5 * np.sin(2*np.pi*440*t).astype(np.float32),
        "sampling_rate": sr
    }
    print("Using synthetic audio for demonstration")

## 3. Loading the Whisper Model

### Code Walkthrough:

• **Model Loading Function**: Comprehensive function that handles model initialization with caching and proper configuration

• **Key Components Loaded**:
  - **AutoModelForSpeechSeq2Seq**: The main Whisper model architecture for sequence-to-sequence speech recognition
  - **WhisperTokenizer**: Converts text to/from token IDs, handles special tokens
  - **WhisperFeatureExtractor**: Converts raw audio to mel-spectrogram features
  - **AutoProcessor**: Combines feature extractor and tokenizer for streamlined processing

• **Model Configuration**:
  - Detects English-only vs multilingual models automatically
  - Sets appropriate decoder start tokens and forced decoder IDs
  - Disables gradient computation for inference efficiency

• **Memory Optimization**:
  - Uses `low_cpu_mem_usage=True` to reduce RAM usage during loading
  - Loads with appropriate dtype (FP16 for XPU, FP32 for CPU)
  - Implements model caching to avoid redundant loading

• **XPU Optimizations**: When running on Intel Arc Graphics, models are automatically moved to XPU device for acceleration

In [None]:
def load_whisper_model(model_name="distil-whisper/distil-small.en", language="english", force_reload=False):
    """
    Load Whisper model with proper initialization.
    
    Args:
        model_name (str): HuggingFace model identifier
        language (str): Language for transcription
        force_reload (bool): Force reload even if cached
        
    Returns:
        dict: Dictionary containing model, tokenizer, processor, and metadata
    """
    global MODEL_CACHE, dtype, device, GENERATION_COUNT
    
    # Reset generation count when loading new model
    GENERATION_COUNT = 0
    
    # Check if model is already loaded
    if not force_reload and model_name in MODEL_CACHE:
        print(f"Using cached model: {model_name}")
        return MODEL_CACHE[model_name]
    
    print(f"Loading model: {model_name}")
    
    # Clear previous models
    if MODEL_CACHE:
        print("Clearing previous models...")
        for cached_model in list(MODEL_CACHE.keys()):
            try:
                if 'model' in MODEL_CACHE[cached_model]:
                    MODEL_CACHE[cached_model]['model'].cpu()
                    del MODEL_CACHE[cached_model]['model']
                del MODEL_CACHE[cached_model]
            except:
                pass
        MODEL_CACHE.clear()
        clear_gpu_memory()
    
    try:
        # Determine if this is an English-only model
        is_english_only = any(x in model_name.lower() for x in ['.en', 'english-only'])
        print(f"Model type: {'English-only' if is_english_only else 'Multilingual'}")
        
        # Load tokenizer and processor
        if is_english_only:
            tokenizer = WhisperTokenizer.from_pretrained(model_name)
            processor = AutoProcessor.from_pretrained(model_name)
        else:
            tokenizer = WhisperTokenizer.from_pretrained(model_name, language=language, task="transcribe")
            processor = AutoProcessor.from_pretrained(model_name, language=language, task="transcribe")
        
        # Load model
        model = AutoModelForSpeechSeq2Seq.from_pretrained(
            model_name,
            torch_dtype=dtype,
            low_cpu_mem_usage=True,
            use_safetensors=True
        )
        
        # Move to device and set to eval mode
        model.to(device)
        model.eval()
        
        # Disable gradient computation
        for param in model.parameters():
            param.requires_grad = False
        
        # Configure model
        model.config.forced_decoder_ids = None
        model.config.suppress_tokens = []
        
        # Set up decoder start token
        if model.config.decoder_start_token_id is None:
            if is_english_only:
                model.config.decoder_start_token_id = tokenizer.pad_token_id or 50257
                model.config.forced_decoder_ids = [
                    [1, 50259],  # Start of transcript token
                    [2, 50359],  # Transcribe token
                    [3, 50363]   # No timestamps token
                ]
            else:
                model.config.decoder_start_token_id = tokenizer.pad_token_id
        
        # Load feature extractor
        feature_extractor = WhisperFeatureExtractor.from_pretrained(model_name)
        
        # Ensure tokenizer has proper tokens
        if tokenizer.pad_token is None:
            tokenizer.pad_token = tokenizer.eos_token
        
        # Cache model
        MODEL_CACHE[model_name] = {
            'model': model,
            'feature_extractor': feature_extractor,
            'tokenizer': tokenizer,
            'processor': processor,
            'dtype': dtype,
            'is_english_only': is_english_only,
            'generation_count': 0
        }
        
        print(f"✅ Model loaded successfully: {model_name}")
        print(f"   Decoder start token ID: {model.config.decoder_start_token_id}")
        print(f"   Pad token ID: {tokenizer.pad_token_id}")
        clear_gpu_memory()
        
        return MODEL_CACHE[model_name]
        
    except Exception as e:
        print(f"Error loading model: {e}")
        import traceback
        traceback.print_exc()
        clear_gpu_memory()
        raise

# Load the model
print("\n" + "="*50)
print("Loading Distil-Whisper Small English model...")
print("="*50)
model_components = load_whisper_model("distil-whisper/distil-small.en")

# Extract components
model = model_components['model']
feature_extractor = model_components['feature_extractor']
tokenizer = model_components['tokenizer']
processor = model_components['processor']

print("\nModel ready for transcription!")

## 4. Streaming Transcription with Intel XPU Optimizations

### Understanding Intel XPU Acceleration

Intel XPU (eXtended Processing Unit) is Intel's unified abstraction for accessing Intel GPUs in PyTorch. When running on Intel® Arc™ Graphics or Intel® Data Center GPUs, XPU provides significant acceleration for deep learning workloads.

### Key Optimization Techniques

#### 1. **Automatic Mixed Precision (AMP) with torch.autocast**

Automatic Mixed Precision allows models to use both FP16 and FP32 computations automatically:
- **FP16 operations**: Used where precision loss is acceptable (most computations)  
- **FP32 operations**: Maintained for operations requiring high precision  
- **Benefits**: 2-3x speedup with minimal accuracy loss

Example:
```python
with torch.autocast(device_type="xpu", dtype=torch.float16, enabled=True):
    # Model computations automatically use optimal precision
    output = model(input)
```

#### 2. **Gradient Computation Control with torch.no_grad()**
During inference, we disable gradient computation to:

- Reduce memory usage by ~50%
- Accelerate forward pass computation
- Prevent unnecessary gradient accumulation

Example:
```python
with torch.no_grad():
    # All operations here won't track gradients
    output = model(input)
```

#### 3. **Device Synchronization**
Intel XPU operations are asynchronous by default. Synchronization ensures:

- All GPU operations complete before CPU continues
- Accurate timing measurements
- Proper memory cleanup

Example:
```python
torch.xpu.synchronize()  # Wait for all XPU operations to complete
```

### Code Walkthrough:

• **Streaming Architecture**: Processes audio in chunks (default 10s) for real-time feedback and memory efficiency

• **Chunk Processing Pipeline**:
  1. Split audio into overlapping chunks
  2. Process each chunk through feature extraction
  3. Generate transcription with beam search
  4. Display intermediate results in real-time

• **XPU Optimizations Applied**:
  - **torch.autocast**: Automatic mixed precision for 2-3x speedup
  - **torch.no_grad()**: Disables gradient tracking for inference
  - **torch.xpu.synchronize()**: Ensures proper timing measurements
  - **Periodic memory cleanup**: Clears cache every 3 chunks

• **Generation Parameters**:
  - `max_new_tokens`: Dynamically set based on chunk duration
  - `num_beams=1`: Greedy decoding for speed
  - `use_cache=False`: Prevents memory accumulation in long streams

• **Error Recovery**: Continues processing even if individual chunks fail, ensuring robustness

In [None]:
def transcribe_audio_streaming(model_components, audio, chunk_length_s=10, max_chunk_length=20):
    """
    Perform streaming transcription with chunk processing and Intel XPU optimizations.
    
    Args:
        model_components (dict): Dictionary containing model and tokenizer
        audio (dict): Audio dictionary with 'array' and 'sampling_rate'
        chunk_length_s (int): Length of each chunk in seconds
        max_chunk_length (int): Maximum allowed chunk length
        
    Returns:
        tuple: (transcription_text, inference_time)
    """
    global GENERATION_COUNT, MAX_GENERATIONS_BEFORE_RESET
    
    if not model_components or 'model' not in model_components:
        print("Invalid model components. Please run Cell 3 to load the model first.")
        return "", 0
    
    model = model_components['model']
    processor = model_components['processor']
    tokenizer = model_components['tokenizer']
    model_dtype = model_components.get('dtype', torch.float32)
    is_english_only = model_components.get('is_english_only', False)
    
    # Get audio data
    audio_array = audio["array"]
    sampling_rate = audio["sampling_rate"]
    
    # Limit chunk length
    chunk_length_s = min(chunk_length_s, max_chunk_length)
    chunk_size_samples = int(chunk_length_s * sampling_rate)
    
    # Create chunks
    chunks = []
    for i in range(0, len(audio_array), chunk_size_samples):
        chunk = audio_array[i:i+chunk_size_samples]
        if len(chunk) >= sampling_rate * 0.5:
            chunks.append(chunk)
    
    print(f"Processing {len(chunks)} chunks of {chunk_length_s} seconds")
    print(f"Model: {'English-only' if is_english_only else 'Multilingual'}")
    print(f"Device: {device}, Dtype: {model_dtype}")
    
    full_transcription = ""
    start_time = time.time()
    successful_chunks = 0
    
    # Use autocast for XPU optimization
    autocast_dtype = model_dtype if device == 'xpu' else torch.float32
    use_autocast = device == 'xpu' and model_dtype in [torch.float16, torch.bfloat16]
    
    for i, chunk in enumerate(chunks):
        try:
            print(f"Processing chunk {i+1}/{len(chunks)}...")
            
            # Process chunk
            inputs = processor(chunk, sampling_rate=sampling_rate, return_tensors="pt")
            
            # Move input features to device with proper dtype
            input_features = inputs.input_features.to(device=device, dtype=model_dtype)
            
            # Generate with proper initialization and XPU optimizations
            with torch.no_grad():
                with torch.autocast(device_type=device, dtype=autocast_dtype, enabled=use_autocast):
                    chunk_duration = len(chunk) / sampling_rate
                    max_tokens = min(100, int(chunk_duration * 20))
                    
                    # Base generation kwargs
                    generate_kwargs = {
                        "inputs": input_features,
                        "max_new_tokens": max_tokens,
                        "num_beams": 1,
                        "do_sample": False,
                        "use_cache": False,
                        "return_dict_in_generate": False,
                    }
                    
                    # Add decoder_start_token_id if not set
                    if model.config.decoder_start_token_id is not None:
                        generate_kwargs["decoder_start_token_id"] = model.config.decoder_start_token_id
                    
                    # Handle forced_decoder_ids
                    if hasattr(model.config, 'forced_decoder_ids') and model.config.forced_decoder_ids:
                        generate_kwargs["forced_decoder_ids"] = model.config.forced_decoder_ids
                    
                    try:
                        # Try generation
                        generated_ids = model.generate(**generate_kwargs)
                    except Exception as e:
                        if "index" in str(e) and "out of bounds" in str(e):
                            print(f"Retrying with explicit decoder_input_ids...")
                            decoder_input_ids = torch.tensor(
                                [[model.config.decoder_start_token_id or tokenizer.pad_token_id]], 
                                device=device
                            )
                            generate_kwargs["decoder_input_ids"] = decoder_input_ids
                            generate_kwargs.pop("decoder_start_token_id", None)
                            generated_ids = model.generate(**generate_kwargs)
                        else:
                            raise
                
                # Increment generation count
                GENERATION_COUNT += 1
                
                # Decode - move to CPU for decoding
                generated_ids_cpu = generated_ids.cpu()
                chunk_text = tokenizer.decode(generated_ids_cpu[0], skip_special_tokens=True).strip()
                
                if chunk_text:
                    full_transcription += chunk_text + " "
                    successful_chunks += 1
            
            # Clear tensors
            del inputs, input_features, generated_ids
            if 'generated_ids_cpu' in locals():
                del generated_ids_cpu
            if 'decoder_input_ids' in locals():
                del decoder_input_ids
            
            # Update display
            clear_output(wait=True)
            print(f"Processed {i+1}/{len(chunks)} chunks ({successful_chunks} successful)")
            print(f"Generation count: {GENERATION_COUNT}")
            print("\nCurrent Transcription:")
            print("-" * 80)
            print(full_transcription.strip())
            print("-" * 80)
            
            # Memory cleanup with XPU-specific handling
            if (i + 1) % 3 == 0:
                if device == 'xpu':
                    torch.xpu.empty_cache()
                    torch.xpu.synchronize()  # Ensure all operations are complete
                gc.collect()
                
        except Exception as e:
            print(f"Error in chunk {i+1}: {str(e)}")
            import traceback
            traceback.print_exc()
            
            # Try to continue with next chunk
            if device == 'xpu':
                torch.xpu.empty_cache()
                torch.xpu.synchronize()
            gc.collect()
            continue
    
    end_time = time.time()
    inference_time = end_time - start_time
    
    print(f"\n Completed in {inference_time:.2f} seconds")
    print(f"   Successful chunks: {successful_chunks}/{len(chunks)}")
    
    # Clear memory
    clear_gpu_memory()
    
    return full_transcription.strip(), inference_time

# Run transcription
if 'model_components' in globals() and 'audio' in globals():
    print("\n" + "="*50)
    print("Starting streaming transcription...")
    print("="*50)
    transcription, time_taken = transcribe_audio_streaming(model_components, audio)
    print("\nFinal Transcription:")
    print("-" * 80)
    print(transcription if transcription else "No transcription generated")
    print("-" * 80)
    print(f"Total time: {time_taken:.2f} seconds")

## 5. System Status and Memory Management

### Code Walkthrough:

• **Memory Monitoring**: Tracks XPU memory allocation and reservation in real-time

• **Model Cache Management**: Provides utilities to clear cached models and free GPU memory

• **System Status Display**: Shows current device, cached models, and memory usage

• **Generation Tracking**: Monitors the number of inference operations for debugging memory leaks

In [None]:
def monitor_memory_usage():
    """
    Monitor GPU memory usage and system status.
    """
    if device == 'xpu' and hasattr(torch.xpu, 'memory_allocated'):
        allocated = torch.xpu.memory_allocated() / 1024**3  # GB
        reserved = torch.xpu.memory_reserved() / 1024**3   # GB
        print(f"XPU Memory: {allocated:.2f}GB allocated, {reserved:.2f}GB reserved")
        print(f"Generation count: {GENERATION_COUNT}")
    else:
        print("Memory monitoring not available for this device")

def clear_all_models():
    """
    Clear all cached models and free memory.
    """
    global MODEL_CACHE, GENERATION_COUNT
    
    print("Clearing all cached models...")
    
    for model_name in list(MODEL_CACHE.keys()):
        try:
            if 'model' in MODEL_CACHE[model_name]:
                MODEL_CACHE[model_name]['model'].cpu()
                del MODEL_CACHE[model_name]['model']
            del MODEL_CACHE[model_name]
        except Exception as e:
            print(f"Error clearing {model_name}: {e}")
    
    MODEL_CACHE.clear()
    GENERATION_COUNT = 0
    clear_gpu_memory()
    print("All models cleared from cache")

# Check current status
print("\n" + "="*50)
print("Current Status:")
print("="*50)
monitor_memory_usage()
print(f"Cached models: {list(MODEL_CACHE.keys())}")
print(f"Audio loaded: {'audio' in globals()}")
print(f"Model loaded: {'model_components' in globals()}")

## 6. Audio Generation Tool

### Code Walkthrough:

• **Text-to-Speech (TTS) Features**:
  - Uses Google TTS (gTTS) for converting text to speech
  - Supports 5 languages: English, Spanish, French, German, Chinese
  - Includes example prompts for quick testing
  - Auto-detects language from selected examples

• **Microphone Recording Features**:
  - Uses sounddevice library for real-time audio capture
  - Configurable recording duration (1-30 seconds)
  - Device selection for multi-microphone setups
  - Real-time progress indicator during recording

• **Audio Processing**:
  - Normalizes audio levels to prevent clipping
  - Saves in standard WAV format (16kHz, 16-bit)
  - Displays interactive audio player for immediate playback

• **User Interface**:
  - Tab-based interface for different audio sources
  - Interactive widgets for parameter control
  - Real-time status updates and error handling

In [None]:
def audio_generation_tool():
    """
    Interactive tool for generating audio through TTS or microphone recording.
    
    Provides two methods:
    1. Text-to-Speech using gTTS library
    2. Microphone recording using sounddevice
    """
    import ipywidgets as widgets
    from IPython.display import display, Audio as IPythonAudio, clear_output
    import numpy as np
    import scipy.io.wavfile as wavfile
    import os
    import time
    
    # Try imports
    try:
        from gtts import gTTS
        gtts_available = True
    except:
        gtts_available = False
    
    try:
        import sounddevice as sd
        sd_available = True
    except:
        sd_available = False
    
    if not gtts_available and not sd_available:
        print("❌ Please install required packages:")
        print("   pip install gtts sounddevice")
        return
    
    # Create UI
    method_tabs = widgets.Tab()
    tabs_list = []
    
    # Text-to-Speech Tab
    if gtts_available:
        tts_container = widgets.VBox()
        
        # Example prompts dropdown
        example_prompts = widgets.Dropdown(
            options=[
                ('Custom Text', ''),
                ('-- English Examples --', ''),
                ('Technical Documentation', 'The machine learning model uses a transformer architecture with attention mechanisms.'),
                ('News Report', 'Breaking news: Scientists have discovered a new method for carbon capture.'),
                ('-- Multilingual Examples --', ''),
                ('Spanish', '¡Hola! Me llamo Ana y soy tu asistente virtual.'),
                ('French', 'Bonjour, je suis ravi de vous rencontrer.'),
                ('German', 'Bitte öffnen Sie die Datei und klicken Sie auf Speichern.'),
                ('Chinese', '欢迎使用语音识别系统。'),
            ],
            value='',
            description='Examples:',
            style={'description_width': 'initial'},
            layout=widgets.Layout(width='600px')
        )
        
        text_input = widgets.Textarea(
            value='Hello, this is a test of the text to speech system.',
            placeholder='Enter text to convert to speech...',
            description='Text:',
            layout=widgets.Layout(width='100%', height='100px')
        )
        
        # Language dropdown - 5 main languages
        language_dropdown = widgets.Dropdown(
            options=[
                ('English', 'en'),
                ('Spanish', 'es'),
                ('French', 'fr'),
                ('German', 'de'),
                ('Chinese (Simplified)', 'zh-cn'),
            ],
            value='en',
            description='Language:',
            style={'description_width': 'initial'}
        )
        
        filename_tts = widgets.Text(
            value='generated_speech.mp3',
            placeholder='filename.mp3',
            description='Filename:',
            style={'description_width': 'initial'}
        )
        
        generate_button = widgets.Button(
            description='Generate Speech',
            button_style='primary',
            icon='volume-up'
        )
        
        tts_output = widgets.Output()
        
        tts_container.children = [
            widgets.HTML("<h4>Text-to-Speech Generator</h4>"),
            example_prompts,
            text_input,
            language_dropdown,
            filename_tts,
            generate_button,
            tts_output
        ]
        
        tabs_list.append(('Text-to-Speech', tts_container))
    
    # Microphone Recording Tab
    if sd_available:
        mic_container = widgets.VBox()
        
        # Device selection
        device_dropdown = widgets.Dropdown(
            description='Input Device:',
            style={'description_width': 'initial'},
            layout=widgets.Layout(width='400px')
        )
        
        # Get input devices
        try:
            devices = sd.query_devices()
            input_devices = []
            
            for idx, device in enumerate(devices):
                if device['max_input_channels'] > 0:
                    device_name = f"{idx}: {device['name']} ({device['max_input_channels']} ch)"
                    input_devices.append((device_name, idx))
            
            if not input_devices:
                input_devices.append(("Default Device", None))
            
            device_dropdown.options = input_devices
            if input_devices:
                device_dropdown.value = input_devices[0][1]
                
        except Exception as e:
            input_devices = [("Default Device", None)]
            device_dropdown.options = input_devices
        
        duration_slider = widgets.FloatSlider(
            value=5.0,
            min=1.0,
            max=30.0,
            step=0.5,
            description='Duration (s):',
            style={'description_width': 'initial'}
        )
        
        filename_mic = widgets.Text(
            value='recorded_audio.wav',
            placeholder='filename.wav',
            description='Filename:',
            style={'description_width': 'initial'}
        )
        
        record_button = widgets.Button(
            description='Start Recording',
            button_style='danger',
            icon='microphone'
        )
        
        mic_output = widgets.Output()
        
        mic_container.children = [
            widgets.HTML("<h4>Microphone Recorder</h4>"),
            device_dropdown,
            duration_slider,
            filename_mic,
            record_button,
            mic_output
        ]
        
        tabs_list.append(('Microphone', mic_container))
    
    # Set up tabs
    method_tabs.children = [tab[1] for tab in tabs_list]
    for i, (title, _) in enumerate(tabs_list):
        method_tabs.set_title(i, title)
    
    # Display
    display(widgets.VBox([
        widgets.HTML("<h3>Audio Generation Tool</h3>"),
        method_tabs
    ]))
    
    # Event handlers
    if gtts_available:
        def update_text_from_example(change):
            """Update text input when example is selected"""
            if change['new'] and not change['new'].startswith('--'):
                text_input.value = change['new']
                # Auto-select appropriate language
                if 'Spanish' in example_prompts.label:
                    language_dropdown.value = 'es'
                elif 'French' in example_prompts.label:
                    language_dropdown.value = 'fr'
                elif 'German' in example_prompts.label:
                    language_dropdown.value = 'de'
                elif 'Chinese' in example_prompts.label:
                    language_dropdown.value = 'zh-cn'
                else:
                    language_dropdown.value = 'en'
        
        example_prompts.observe(update_text_from_example, names='value')
        
        def on_generate_click(b):
            """Handle text-to-speech generation"""
            with tts_output:
                clear_output(wait=True)
                
                text = text_input.value.strip()
                if not text:
                    print("❌ Please enter some text")
                    return
                
                try:
                    # Get language name for display
                    lang_name = next((name for name, code in language_dropdown.options if code == language_dropdown.value), language_dropdown.value)
                    print(f"🔊 Generating {lang_name} speech...")
                    
                    # Create TTS object
                    tts = gTTS(text=text, lang=language_dropdown.value, slow=False, tld='com')
                    
                    # Save to file
                    filename = filename_tts.value
                    if not filename.endswith('.mp3'):
                        filename += '.mp3'
                    
                    filepath = os.path.join(os.getcwd(), filename)
                    tts.save(filepath)
                    
                    print(f"✅ Audio saved: {filename}")
                    print(f"   Size: {os.path.getsize(filepath) / 1024:.1f} KB")
                    print(f"   Language: {lang_name}")
                    
                    # Display audio player
                    display(IPythonAudio(filepath))
                    
                    print(f"\n📝 Generated text ({len(text)} chars):")
                    print(f'"{text[:200]}{"..." if len(text) > 200 else ""}"')
                    
                except Exception as e:
                    print(f"❌ Error: {str(e)}")
                    print("Note: gTTS requires internet connection")
        
        generate_button.on_click(on_generate_click)
    
    if sd_available:
        recording_state = {'is_recording': False}
        
        def on_record_click(b):
            """Handle microphone recording"""
            if recording_state['is_recording']:
                return
            
            with mic_output:
                clear_output(wait=True)
                
                duration = duration_slider.value
                sample_rate = 16000  # Fixed for speech
                device_id = device_dropdown.value
                
                try:
                    recording_state['is_recording'] = True
                    record_button.description = 'Recording...'
                    record_button.button_style = 'warning'
                    record_button.disabled = True
                    
                    print(f"🎤 Recording for {duration:.1f} seconds...")
                    print("\n⏱️  ", end='', flush=True)
                    
                    # Record audio
                    if device_id is not None:
                        audio_data = sd.rec(int(duration * sample_rate), 
                                          samplerate=sample_rate, 
                                          channels=1, 
                                          device=device_id,
                                          dtype='float32')
                    else:
                        audio_data = sd.rec(int(duration * sample_rate), 
                                          samplerate=sample_rate, 
                                          channels=1,
                                          dtype='float32')
                    
                    # Show progress
                    start_time = time.time()
                    while time.time() - start_time < duration:
                        elapsed = time.time() - start_time
                        progress = int((elapsed / duration) * 20)
                        print(f"\r⏱️  [{'█' * progress}{'░' * (20 - progress)}] {elapsed:.1f}s", 
                              end='', flush=True)
                        time.sleep(0.1)
                    
                    sd.wait()
                    print(f"\r⏱️  [{'█' * 20}] {duration:.1f}s")
                    
                    # Process and save
                    audio_data = audio_data.flatten()
                    max_val = np.max(np.abs(audio_data))
                    if max_val > 0:
                        audio_data = audio_data / max_val * 0.9
                    
                    filename = filename_mic.value
                    if not filename.endswith('.wav'):
                        filename += '.wav'
                    
                    filepath = os.path.join(os.getcwd(), filename)
                    audio_int16 = (audio_data * 32767).astype(np.int16)
                    wavfile.write(filepath, sample_rate, audio_int16)
                    
                    print(f"\n✅ Audio saved: {filename}")
                    print(f"   Duration: {duration}s")
                    print(f"   Sample rate: 16kHz")
                    
                    # Display audio player
                    display(IPythonAudio(audio_data, rate=sample_rate))
                    
                except Exception as e:
                    print(f"❌ Error: {str(e)}")
                
                finally:
                    recording_state['is_recording'] = False
                    record_button.description = 'Start Recording'
                    record_button.button_style = 'danger'
                    record_button.disabled = False
        
        record_button.on_click(on_record_click)
    
    # Initial message
    print("Ready to generate audio files!")
    if gtts_available:
        print("• Text-to-Speech: Select examples or enter custom text")
    if sd_available:
        print("• Microphone: Record your voice")

# Run the tool
audio_generation_tool()

## 7. Complete Streaming Transcription Application

### Code Walkthrough:

• **Model Selection Interface**:
  - Dropdown with English-only and multilingual Whisper variants
  - Automatic detection of model capabilities
  - Support for both Distil-Whisper (faster) and standard Whisper models

• **Task Configuration**:
  - **Transcribe**: Convert speech to text in original language
  - **Translate**: Convert foreign speech to English text
  - Language selection for multilingual models (5 main languages)

• **Audio Input Options**:
  - LibriSpeech dataset samples (streaming, no download required)
  - File upload support (MP3, WAV, M4A, FLAC, OGG)
  - Uses librosa for robust audio loading

• **Performance Features**:
  - Real-time progress tracking with chunk counter
  - Performance metrics (inference time, real-time factor)
  - Comparison with previous runs
  - Model caching to avoid reloading

• **XPU Optimizations**:
  - Precision selection (FP16/FP32)
  - Automatic mixed precision for XPU
  - Memory management with periodic cleanup

• **User Experience**:
  - Interactive progress bar during processing
  - Live transcription updates
  - Audio preview before processing
  - Clear error messages and recovery

In [None]:
def streaming_transcription_app():
    """
    Interactive Whisper transcription application with streaming capabilities.
    
    This application provides a user-friendly interface for:
    - Loading various Whisper models (English-only and multilingual)
    - Transcribing audio from LibriSpeech datasets or uploaded files
    - Translating audio to English (multilingual models only)
    - Real-time progress tracking during transcription
    - GPU memory management for sustained usage
    
    The app supports Intel XPU acceleration when available and automatically
    handles model caching to improve performance.
    """
    import time
    import torch
    import numpy as np
    from datasets import load_dataset
    from IPython.display import display, clear_output
    import ipywidgets as widgets
    from IPython.display import Audio as IPythonAudio
    import os
    import tempfile
    import librosa
    import gc
    
    global MODEL_CACHE, device, dtype
    
    # Track inference history
    if not hasattr(streaming_transcription_app, 'inference_history'):
        streaming_transcription_app.inference_history = []
    
    if hasattr(streaming_transcription_app, "is_running") and streaming_transcription_app.is_running:
        print("Application is already running. Please wait or restart the kernel.")
        return
    
    streaming_transcription_app.is_running = True
    
    try:
        # Model dropdown
        model_dropdown = widgets.Dropdown(
            options=[
                (' Distil Whisper Small EN', 'distil-whisper/distil-small.en'),
                (' Whisper Base EN', 'openai/whisper-base.en'),
                ('Whisper Tiny EN', 'openai/whisper-tiny.en'),
                ('Distil Whisper Medium EN', 'distil-whisper/distil-medium.en'),
                ('--- Multilingual Models ---', None),
                (' Whisper Base Multi', 'openai/whisper-base'),
                (' Whisper Small Multi', 'openai/whisper-small'),
                (' Whisper Medium Multi', 'openai/whisper-medium'),
                (' Distil Whisper Large v2 Multi', 'distil-whisper/distil-large-v2'),
                (' Distil Whisper Large v3 Multi', 'distil-whisper/distil-large-v3'),
            ],
            value='distil-whisper/distil-small.en',
            description='Model:',
            style={'description_width': 'initial'}
        )
        
        # Task selector
        task_radio = widgets.RadioButtons(
            options=['Transcribe', 'Translate to English'],
            value='Transcribe',
            description='Task:',
            disabled=True
        )
        
        # Language selector - 5 main languages only
        target_lang_dropdown = widgets.Dropdown(
            options=[
                ('Auto-detect', 'auto'),
                ('English', 'en'),
                ('Spanish', 'es'),
                ('French', 'fr'),
                ('German', 'de'),
                ('Chinese', 'zh'),
            ],
            value='auto',
            description='Language:',
            style={'description_width': 'initial'},
            disabled=True
        )
        
        # Precision selector
        precision_radio = widgets.RadioButtons(
            options=['FP16 (Faster)', 'FP32 (More Stable)'],
            value='FP16 (Faster)' if device == 'xpu' else 'FP32 (More Stable)',
            description='Precision:',
            disabled=False
        )
        
        source_radio = widgets.RadioButtons(
            options=['Sample Dataset', 'Upload File'],
            value='Sample Dataset',
            description='Audio Source:'
        )
        
        # Simplified LibriSpeech datasets
        dataset_dropdown = widgets.Dropdown(
            options=[
                ('LibriSpeech Long - Clean', 'librispeech_long'),                
            ],
            value='librispeech_long',
            description='Dataset:',
            style={'description_width': 'initial'}
        )
        
        file_upload = widgets.FileUpload(
            accept='.mp3, .wav, .m4a, .flac, .ogg',
            multiple=False,
            description='Upload Audio:'
        )
        
        load_button = widgets.Button(
            description='Load Model & Audio',
            button_style='primary'
        )
        
        clear_memory_button = widgets.Button(
            description='Clear GPU Memory',
            button_style='warning'
        )
        
        model_info = widgets.HTML(value="")
        
        setup_output = widgets.Output()
        display(widgets.VBox([
            widgets.HTML(value="<h3>Whisper Transcription & Translation</h3>"),
            model_dropdown,
            widgets.HBox([task_radio, target_lang_dropdown]),
            precision_radio,
            model_info,
            source_radio,
            dataset_dropdown,
            file_upload,
            widgets.HBox([load_button, clear_memory_button]),
            setup_output
        ]))
        
        app_container = widgets.Output()
        display(app_container)
        
        # State variables
        current_model_name = None
        current_model_components = None
        audio_data = None
        
        def update_model_info(change):
            """
            Update the model information display based on selected model.
            
            Args:
                change: Widget change event (optional)
            """
            if model_dropdown.value is None:
                return
                
            model_name = model_dropdown.value
            info = "<b>Model:</b> "
            is_multilingual = not any(x in model_name for x in ['.en', 'EN'])
            
            task_radio.disabled = not is_multilingual
            target_lang_dropdown.disabled = not is_multilingual
            
            if not is_multilingual:
                task_radio.value = 'Transcribe'
                info += "English-only"
            else:
                info += "Multilingual (96+ languages)"
            
            model_info.value = info
        
        model_dropdown.observe(update_model_info, names='value')
        update_model_info(None)
        
        def on_source_change(change):
            """
            Handle audio source selection changes.
            
            Args:
                change: Widget change event
            """
            if change['new'] == 'Upload File':
                file_upload.layout.display = 'block'
                dataset_dropdown.layout.display = 'none'
            else:
                file_upload.layout.display = 'none'
                dataset_dropdown.layout.display = 'block'
        
        source_radio.observe(on_source_change, names='value')
        file_upload.layout.display = 'none'
        
        def clear_memory_handler(b):
            """
            Handle clear memory button click.
            
            Args:
                b: Button widget
            """
            with setup_output:
                clear_output()
                clear_gpu_memory()
                print("GPU memory cleared")
        
        clear_memory_button.on_click(clear_memory_handler)
        
        def load_audio_from_upload(uploaded_file):
            """
            Load audio from uploaded file.
            
            Args:
                uploaded_file: FileUpload widget data
                
            Returns:
                dict: Audio dictionary with 'array' and 'sampling_rate' keys, or None if failed
            """
            print(f"Loading: {uploaded_file.name}")
            
            with tempfile.NamedTemporaryFile(delete=False, suffix=f".{uploaded_file.name.split('.')[-1]}") as tmp_file:
                tmp_file.write(uploaded_file.content)
                tmp_path = tmp_file.name
            
            try:
                y, sr = librosa.load(tmp_path, sr=16000, mono=True)
                audio = {
                    "array": y.astype(np.float32),
                    "sampling_rate": sr
                }
                os.unlink(tmp_path)
                print(f"Loaded: {len(y)/sr:.2f} seconds")
                return audio
            except Exception as e:
                print(f"Error: {e}")
                try:
                    os.unlink(tmp_path)
                except:
                    pass
                return None
        
        def load_sample_audio_from_dataset(dataset_name):
            """
            Load audio sample from LibriSpeech dataset.
            
            Args:
                dataset_name (str): Dataset identifier
                
            Returns:
                dict: Audio dictionary with 'array' and 'sampling_rate' keys
            """
            print(f"Loading {dataset_name}...")
            
            try:
                if dataset_name == 'librispeech_asr_test':
                    # Load short test samples
                    dataset = load_dataset("librispeech_asr", "clean", split="test", streaming=True)
                else:
                    # Load standard long samples
                    dataset = load_dataset("distil-whisper/librispeech_long", "clean", split="validation", streaming=True)
                
                sample = next(iter(dataset))
                audio = sample['audio']
                print(f"Loaded: {len(audio['array'])/audio['sampling_rate']:.2f} seconds")
                return audio
                
            except Exception as e:
                print(f"Error: {str(e)[:100]}...")
                print("Using fallback dataset...")
                dataset = load_dataset("distil-whisper/librispeech_long", "clean", split="validation", streaming=True)
                sample = next(iter(dataset))
                return sample["audio"]
        
        def load_whisper_model_with_lang(model_name, language="auto", task="transcribe"):
            """
            Load Whisper model with language and task configuration.
            
            Args:
                model_name (str): HuggingFace model identifier
                language (str): Target language code or "auto"
                task (str): "transcribe" or "translate"
                
            Returns:
                dict: Model components dictionary
            """
            global MODEL_CACHE, dtype, device
            
            cache_key = f"{model_name}_{language}_{task}"
            if cache_key in MODEL_CACHE:
                print(f"Using cached model: {model_name}")
                return MODEL_CACHE[cache_key]
            
            if len(MODEL_CACHE) > 2:
                print("Clearing model cache...")
                MODEL_CACHE.clear()
                clear_gpu_memory()
            
            print(f"Loading: {model_name}")
            
            is_english_only = any(x in model_name.lower() for x in ['.en', 'english-only'])
            
            tokenizer = WhisperTokenizer.from_pretrained(model_name)
            processor = AutoProcessor.from_pretrained(model_name)
            
            model = AutoModelForSpeechSeq2Seq.from_pretrained(
                model_name,
                torch_dtype=dtype,
                low_cpu_mem_usage=True,
                use_safetensors=True
            )
            
            model.to(device)
            model.eval()
            
            model.config.forced_decoder_ids = None
            model.config.suppress_tokens = []
            
            if model.config.decoder_start_token_id is None:
                if is_english_only:
                    model.config.decoder_start_token_id = tokenizer.pad_token_id or 50257
                    model.config.forced_decoder_ids = [[1, 50259], [2, 50359], [3, 50363]]
                else:
                    model.config.decoder_start_token_id = tokenizer.pad_token_id
            
            feature_extractor = WhisperFeatureExtractor.from_pretrained(model_name)
            
            if tokenizer.pad_token is None:
                tokenizer.pad_token = tokenizer.eos_token
            
            MODEL_CACHE[cache_key] = {
                'model': model,
                'feature_extractor': feature_extractor,
                'tokenizer': tokenizer,
                'processor': processor,
                'dtype': dtype,
                'is_english_only': is_english_only,
                'language': language,
                'task': task
            }
            
            print(f" Model loaded")
            return MODEL_CACHE[cache_key]
        
        def on_load_button_click(b):
            """
            Handle load button click to initialize model and audio.
            
            Args:
                b: Button widget
            """
            nonlocal current_model_name, current_model_components, audio_data
            
            if hasattr(on_load_button_click, "is_running") and on_load_button_click.is_running:
                return
            on_load_button_click.is_running = True
            
            setup_output.clear_output()
            app_container.clear_output()
            
            with setup_output:
                try:
                    load_button.disabled = True
                    
                    if model_dropdown.value is None:
                        print("Please select valid model")
                        load_button.disabled = False
                        on_load_button_click.is_running = False
                        return
                    
                    global dtype
                    dtype = torch.float16 if "FP16" in precision_radio.value else torch.float32
                    print(f"Precision: {dtype}")
                    
                    model_name = model_dropdown.value
                    task = "translate" if "Translate" in task_radio.value else "transcribe"
                    language = target_lang_dropdown.value if target_lang_dropdown.value != "auto" else None
                    
                    current_model_components = load_whisper_model_with_lang(
                        model_name, 
                        language=language or "auto", 
                        task=task
                    )
                    current_model_name = model_name
                    
                    print("\nLoading audio...")
                    if source_radio.value == 'Upload File':
                        if not file_upload.value:
                            print("Please upload an audio file.")
                            load_button.disabled = False
                            on_load_button_click.is_running = False
                            return
                        audio_data = load_audio_from_upload(file_upload.value[0])
                    else:
                        audio_data = load_sample_audio_from_dataset(dataset_dropdown.value)
                    
                    if audio_data is None:
                        print("Failed to load audio.")
                        load_button.disabled = False
                        on_load_button_click.is_running = False
                        return
                    
                    print("\n Ready!")
                    setup_transcription_interface(current_model_components, audio_data)
                    
                except Exception as e:
                    print(f"Error: {e}")
                    load_button.disabled = False
                finally:
                    on_load_button_click.is_running = False
        
        def transcribe_with_translation(model_components, audio, chunk_length_s=10):
            """
            Perform streaming transcription or translation with chunk processing and XPU optimizations.
            
            Args:
                model_components (dict): Model components dictionary
                audio (dict): Audio dictionary with 'array' and 'sampling_rate'
                chunk_length_s (int): Chunk length in seconds
                
            Returns:
                tuple: (transcription_text, inference_time, performance_metrics)
            """
            model = model_components['model']
            processor = model_components['processor']
            tokenizer = model_components['tokenizer']
            model_dtype = model_components.get('dtype', torch.float32)
            is_english_only = model_components.get('is_english_only', False)
            task = model_components.get('task', 'transcribe')
            language = model_components.get('language', 'auto')
            
            audio_array = audio["array"]
            sampling_rate = audio["sampling_rate"]
            chunk_size_samples = int(chunk_length_s * sampling_rate)
            
            chunks = []
            for i in range(0, len(audio_array), chunk_size_samples):
                chunk = audio_array[i:i+chunk_size_samples]
                if len(chunk) >= sampling_rate * 0.5:
                    chunks.append(chunk)
            
            print(f"Processing {len(chunks)} chunks - Task: {task}")
            print(f"Using {device} with {model_dtype}")
            
            full_transcription = ""
            start_time = time.time()
            successful_chunks = 0
            
            # Setup autocast for XPU
            autocast_dtype = model_dtype if device == 'xpu' else torch.float32
            use_autocast = device == 'xpu' and model_dtype in [torch.float16, torch.bfloat16]
            
            for i, chunk in enumerate(chunks):
                try:
                    print(f"Chunk {i+1}/{len(chunks)}...")
                    
                    inputs = processor(chunk, sampling_rate=sampling_rate, return_tensors="pt")
                    input_features = inputs.input_features.to(device=device, dtype=model_dtype)
                    
                    with torch.no_grad():
                        with torch.autocast(device_type=device, dtype=autocast_dtype, enabled=use_autocast):
                            chunk_duration = len(chunk) / sampling_rate
                            max_tokens = min(100, int(chunk_duration * 20))
                            
                            generate_kwargs = {
                                "inputs": input_features,
                                "max_new_tokens": max_tokens,
                                "num_beams": 1,
                                "do_sample": False,
                                "use_cache": False,
                                "return_dict_in_generate": False,
                            }
                            
                            if not is_english_only:
                                if task == "translate":
                                    generate_kwargs["language"] = language if language != "auto" else None
                                    generate_kwargs["task"] = "translate"
                                else:
                                    generate_kwargs["language"] = language if language != "auto" else None
                                    generate_kwargs["task"] = "transcribe"
                            
                            if model.config.decoder_start_token_id is not None:
                                generate_kwargs["decoder_start_token_id"] = model.config.decoder_start_token_id
                            
                            if hasattr(model.config, 'forced_decoder_ids') and model.config.forced_decoder_ids:
                                generate_kwargs["forced_decoder_ids"] = model.config.forced_decoder_ids
                            
                            try:
                                generated_ids = model.generate(**generate_kwargs)
                            except Exception as e:
                                if "language" in str(e) or "task" in str(e):
                                    generate_kwargs.pop("language", None)
                                    generate_kwargs.pop("task", None)
                                    generated_ids = model.generate(**generate_kwargs)
                                else:
                                    raise
                            
                            # Move to CPU for decoding
                            generated_ids_cpu = generated_ids.cpu()
                            chunk_text = tokenizer.decode(generated_ids_cpu[0], skip_special_tokens=True).strip()
                            
                            if chunk_text:
                                full_transcription += chunk_text + " "
                                successful_chunks += 1
                    
                    del inputs, input_features, generated_ids, generated_ids_cpu
                    
                    clear_output(wait=True)
                    print(f"Progress: {i+1}/{len(chunks)} chunks")
                    print("\nCurrent output:")
                    print("-" * 80)
                    print(full_transcription.strip())
                    print("-" * 80)
                    
                    if (i + 1) % 3 == 0:
                        if device == 'xpu':
                            torch.xpu.empty_cache()
                            torch.xpu.synchronize()
                        gc.collect()
                        
                except Exception as e:
                    print(f"Error in chunk {i+1}: {str(e)}")
                    if device == 'xpu':
                        torch.xpu.empty_cache()
                        torch.xpu.synchronize()
                    continue
            
            end_time = time.time()
            inference_time = end_time - start_time
            audio_duration = len(audio_array) / sampling_rate
            rtf = audio_duration / inference_time if inference_time > 0 else 0
            
            # Performance metrics
            performance_metrics = {
                'audio_duration': audio_duration,
                'inference_time': inference_time,
                'rtf': rtf,
                'chunks_processed': successful_chunks,
                'total_chunks': len(chunks),
                'model': current_model_name,
                'device': device,
                'precision': str(model_dtype)
            }
            
            print(f"\n Done in {inference_time:.2f}s ({successful_chunks}/{len(chunks)} chunks)")
            print(f"   Audio duration: {audio_duration:.2f}s")
            print(f"   Real-time factor: {rtf:.2f}x")
            
            clear_gpu_memory()
            return full_transcription.strip(), inference_time, performance_metrics
        
        def setup_transcription_interface(model_components, audio_data):
            """
            Set up the transcription interface with controls and output areas.
            
            Args:
                model_components (dict): Model components dictionary
                audio_data (dict): Audio data dictionary
            """
            with app_container:
                clear_output()
                
                print("Audio preview:")
                display(IPythonAudio(audio_data["array"], rate=audio_data["sampling_rate"]))
                
                duration = len(audio_data["array"]) / audio_data["sampling_rate"]
                print(f"Duration: {duration:.2f}s | Model: {current_model_name}")
                
                chunk_slider = widgets.IntSlider(
                    value=8 if model_components['dtype'] == torch.float16 else 10,
                    min=3, 
                    max=15,
                    step=1,
                    description='Chunk size (s):'
                )
                
                start_button = widgets.Button(
                    description='Start',
                    button_style='success'
                )
                
                progress_bar = widgets.IntProgress(
                    value=0, min=0, max=100,
                    description='Progress:'
                )
                
                status_text = widgets.HTML(value="Ready")
                
                # Performance comparison display
                performance_display = widgets.HTML(value="")
                
                output_label = "Translation:" if model_components.get('task') == 'translate' else "Transcription:"
                transcription_area = widgets.Textarea(
                    value='',
                    placeholder=f'{output_label} will appear here...',
                    description=output_label,
                    layout=widgets.Layout(width='100%', height='200px')
                )
                
                output = widgets.Output()
                
                display(widgets.VBox([
                    chunk_slider,
                    start_button,
                    progress_bar,
                    status_text,
                    performance_display,
                    transcription_area,
                    output
                ]))
                
                def on_start_click(b):
                    """
                    Handle start button click to begin transcription.
                    
                    Args:
                        b: Button widget
                    """
                    if hasattr(on_start_click, "is_running") and on_start_click.is_running:
                        return
                    
                    on_start_click.is_running = True
                    
                    try:
                        transcription_area.value = ""
                        progress_bar.value = 0
                        start_button.disabled = True
                        output.clear_output()
                        
                        chunk_length_s = chunk_slider.value
                        
                        with output:
                            original_print = print
                            
                            def custom_print(*args, **kwargs):
                                """Custom print to update progress bar"""
                                msg = ' '.join(str(arg) for arg in args)
                                if "Chunk" in msg and "/" in msg:
                                    try:
                                        parts = msg.split()
                                        for i, part in enumerate(parts):
                                            if "/" in part:
                                                current = int(parts[i-1])
                                                total = int(part.split("/")[1])
                                                progress_bar.max = total
                                                progress_bar.value = current
                                                status_text.value = f"<b>Processing {current}/{total}</b>"
                                    except:
                                        pass
                                original_print(*args, **kwargs)
                            
                            import builtins
                            builtins.print = custom_print
                            
                            try:
                                result, time_taken, metrics = transcribe_with_translation(
                                    model_components, 
                                    audio_data, 
                                    chunk_length_s=chunk_length_s
                                )
                                
                                transcription_area.value = result
                                
                                # Update performance display
                                streaming_transcription_app.inference_history.append(metrics)
                                
                                # Show current and previous performance
                                perf_html = "<b>Performance Metrics:</b><br>"
                                perf_html += f"<b>Current Run:</b> {metrics['inference_time']:.2f}s | {metrics['rtf']:.2f}x realtime<br>"
                                
                                if len(streaming_transcription_app.inference_history) > 1:
                                    prev_metrics = streaming_transcription_app.inference_history[-2]
                                    speed_diff = metrics['rtf'] - prev_metrics['rtf']
                                    speed_color = 'green' if speed_diff > 0 else 'red'
                                    perf_html += f"<b>Previous Run:</b> {prev_metrics['inference_time']:.2f}s | {prev_metrics['rtf']:.2f}x realtime<br>"
                                    perf_html += f"<b>Speed Change:</b> <span style='color: {speed_color}'>{speed_diff:+.2f}x</span>"
                                
                                performance_display.value = perf_html
                                
                                status_text.value = f"<b>Done! {time_taken:.1f}s ({metrics['rtf']:.1f}x realtime)</b>"
                                
                            finally:
                                builtins.print = original_print
                        
                    except Exception as e:
                        with output:
                            print(f"Error: {e}")
                        status_text.value = "<b style='color: red;'>Error occurred</b>"
                    finally:
                        start_button.disabled = False
                        on_start_click.is_running = False
                        clear_gpu_memory()
                
                start_button.on_click(on_start_click)
                load_button.disabled = False
        
        load_button.on_click(on_load_button_click)
        
        with setup_output:
            print("Ready to transcribe!")
            print("\nMultilingual models can:")
            print("- Transcribe audio in multiple languages")
            print("- Translate foreign audio to English")
            print("- Auto-detect language or specify manually")
        
    finally:
        streaming_transcription_app.is_running = False

# Run the app
streaming_transcription_app()

## 8. Summary and Cleanup

### Code Walkthrough:

• **Application Features Summary**: Lists all implemented capabilities and optimizations

• **Memory Cleanup Utilities**: 
  - `clear_all_models()`: Removes all cached models from memory
  - `check_memory_status()`: Displays current cache and device status

• **Best Practices**: Demonstrates proper cleanup procedures for long-running applications

In [None]:
# Summary and cleanup
print("""
Application Features:
1. Model caching - prevents reloading the same model
2. Memory management - clears GPU memory periodically
3. Error handling - prevents crashes during transcription
4. Support for English-only and multilingual models
5. Optimized chunk processing for stability
6. Text-to-speech and microphone recording for audio generation
7. Support for multiple languages and translation

To free GPU memory, use the 'Clear GPU Memory' button in the app.
""")

# Optional: Clear all cached models
def clear_all_models():
    """Clear all cached models and free memory"""
    global MODEL_CACHE
    MODEL_CACHE.clear()
    clear_gpu_memory()
    print("✅ All models cleared from cache")
    print("   GPU/XPU memory freed")

# Clear all models
clear_all_models()

# Check current memory status
def check_memory_status():
    """Check current cache status"""
    print(f"Device: {device.upper()}")
    print(f"Precision: {dtype}")
    print(f"Cached models: {len(MODEL_CACHE)}")
    
    if MODEL_CACHE:
        print("\nCached models:")
        for key in MODEL_CACHE:
            print(f"  - {key}")
    else:
        print("\nNo models currently cached")

# Check current status
check_memory_status()

## Conclusion

In this workshop, we've explored how to build a speech-to-text application using Whisper models with PyTorch on Intel XPU hardware. We've covered:

1. **Setting up the environment** for Intel AI PCs
2. **Loading and managing Whisper models** efficiently
3. **Implementing streaming transcription** for real-time feedback
4. **Building interactive applications** with audio generation
5. **Supporting multiple languages** with transcription and translation

### Next Steps

To continue your learning journey:

1. Experiment with different model sizes and their trade-offs
2. Integrate real-time microphone capture for live transcription
3. Build REST APIs for transcription services
4. Explore model quantization for edge deployment
5. Combine with RAG systems for audio-based question answering

The power of local AI processing on Intel AI PCs enables privacy-preserving, low-latency audio intelligence applications that can transform how we interact with spoken content.