In [None]:
#pip install -U openai-whisper

In [None]:
#pip install --upgrade --no-deps --force-reinstall git+https://github.com/openai/whisper.git

In [None]:
#pip install aihwkit

In [None]:
import sys
import torch

print("Python Version:", sys.version)
print("PyTorch Version:", torch.__version__)
print("CUDA available:", torch.cuda.is_available())
print("Torch CUDA Version:", torch.version.cuda)
print("CUDA device count:", torch.cuda.device_count())


In [None]:
import torch
import torch.nn as nn
from transformers import WhisperForConditionalGeneration, GenerationConfig
from aihwkit.nn import AnalogLinear
from aihwkit.simulator.configs import InferenceRPUConfig

# Debug mode flag
DEBUG = True

def debug_print(*args, **kwargs):
    """Print debug information only if DEBUG is True"""
    if DEBUG:
        print(*args, **kwargs)

# Define RPU configuration for analog inference
rpu_config = InferenceRPUConfig()
debug_print("Initialized RPU configuration for analog inference")

class AnalogWhisperForConditionalGeneration(WhisperForConditionalGeneration):
    def __init__(self, config, rpu_config=None, debug=False):
        super().__init__(config)
        global DEBUG
        DEBUG = debug
        debug_print("Initializing AnalogWhisperForConditionalGeneration model...")
        
        # Count the number of linear layers to be replaced
        linear_count = sum(1 for _ in self.named_modules() if isinstance(_[1], nn.Linear))
        debug_print(f"Found {linear_count} linear layers to convert to analog")
        
        # Replace all linear layers with analog linear layers
        for name, module in self.named_modules():
            if isinstance(module, nn.Linear):
                debug_print(f"Converting linear layer '{name}' to analog")
                # Create analog layer with same dimensions and bias configuration
                analog_layer = AnalogLinear(
                    module.in_features,
                    module.out_features,
                    bias=module.bias is not None,
                    rpu_config=rpu_config
                )
                # Navigate to the parent module to replace the layer
                parent = self
                for n in name.split('.')[:-1]:
                    parent = getattr(parent, n)
                setattr(parent, name.split('.')[-1], analog_layer)
        
        debug_print("All linear layers converted to analog successfully")
        
        # Fix the forced_decoder_ids issue as recommended in GitHub issues
        if hasattr(self.generation_config, "forced_decoder_ids"):
            debug_print("Converting forced_decoder_ids to input_ids in model's generation config")
            self.generation_config.input_ids = self.generation_config.forced_decoder_ids
            self.generation_config.forced_decoder_ids = None

    def transfer_digital_weights(self, digital_model):
        """
        Transfer weights from digital model to analog model using from_digital method
        """
        debug_print("Starting weight transfer from digital to analog model...")
        
        # Count the number of analog layers to transfer weights to
        analog_count = sum(1 for _ in self.named_modules() if isinstance(_[1], AnalogLinear))
        debug_print(f"Found {analog_count} analog layers to transfer weights to")
        
        for name, module in self.named_modules():
            if isinstance(module, AnalogLinear):
                debug_print(f"Transferring weights for analog layer '{name}'")
                # Get the corresponding digital module
                digital_module = digital_model
                for n in name.split('.'):
                    digital_module = getattr(digital_module, n)
                # Transfer weights using from_digital method
                module.from_digital(digital_module, rpu_config=rpu_config)
        
        debug_print("Weight transfer completed successfully")

# Example: Load pretrained model and create analog inference model
debug_print("\nLoading pretrained Whisper model...")
from transformers import WhisperConfig, WhisperForConditionalGeneration, WhisperProcessor

# Load pretrained model configuration and weights
debug_print("Loading model configuration...")
config = WhisperConfig.from_pretrained("openai/whisper-small.en")
debug_print("Loading pretrained weights...")
digital_model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-small.en")

# Initialize our analog model
debug_print("\nCreating analog model...")
analog_model = AnalogWhisperForConditionalGeneration(config, rpu_config=rpu_config, debug=True)

# Transfer weights from digital to analog model
debug_print("\nTransferring weights from digital to analog model...")
analog_model.transfer_digital_weights(digital_model)

# Move model to CUDA
debug_print("\nMoving model to CUDA...")
analog_model = analog_model.to('cuda')

# Switch to evaluation mode for inference
debug_print("\nSwitching to evaluation mode...")
analog_model.eval()

# Example input for testing
debug_print("\nPreparing test input...")
from datasets import load_dataset
from transformers import WhisperProcessor

# Load processor and a sample audio file
processor = WhisperProcessor.from_pretrained("openai/whisper-small.en")
ds = load_dataset("hf-internal-testing/librispeech_asr_dummy", "clean", split="validation")
audio_sample = ds[0]["audio"]

# Process audio into input features
input_features = processor(
    audio_sample["array"], 
    sampling_rate=audio_sample["sampling_rate"], 
    return_tensors="pt"
).input_features.to('cuda')
debug_print(f"Input features shape: {input_features.shape}")

# Set up decoder starting token (important!)
language = "en"
task = "transcribe"

# Get language and task token IDs
language_token_id = processor.tokenizer.convert_tokens_to_ids(language)
task_token_id = processor.tokenizer.convert_tokens_to_ids(task)

# Create decoder_input_ids with the right starting tokens
decoder_input_ids = torch.tensor([[processor.tokenizer.bos_token_id, language_token_id, task_token_id]]).to('cuda')

# Create a custom generation config by copying from model's config
import copy

# Create a custom generation config by copying from model's config
debug_print("\nCreating custom generation config...")
generation_config = copy.deepcopy(analog_model.generation_config)

# Ensure forced_decoder_ids is handled properly in custom config
if hasattr(generation_config, "forced_decoder_ids"):
    debug_print("Converting forced_decoder_ids to input_ids in custom generation config")
    generation_config.input_ids = generation_config.forced_decoder_ids
    generation_config.forced_decoder_ids = None


# Ensure forced_decoder_ids is handled properly in custom config
if hasattr(generation_config, "forced_decoder_ids"):
    debug_print("Converting forced_decoder_ids to input_ids in custom generation config")
    generation_config.input_ids = generation_config.forced_decoder_ids
    generation_config.forced_decoder_ids = None

# Run inference
debug_print("\nRunning inference...")
with torch.no_grad():
    generated_ids = analog_model.generate(
        input_features,
        decoder_input_ids=decoder_input_ids,
        generation_config=generation_config
    )
    transcription = processor.batch_decode(generated_ids, skip_special_tokens=True)
    debug_print(f"Transcription: {transcription}")


## Test

In [None]:
# Create the whisper_analog.py file using standard Python
with open('whisper_analog.py', 'w') as f:
    f.write('''
import torch
import torch.nn as nn
import copy
from transformers import WhisperForConditionalGeneration, GenerationConfig
from aihwkit.nn import AnalogLinear
from aihwkit.simulator.configs import InferenceRPUConfig

# Debug mode flag
DEBUG = True

def debug_print(*args, **kwargs):
    """Print debug information only if DEBUG is True"""
    if DEBUG:
        print(*args, **kwargs)

def verify_weight_transfer(analog_model, digital_model):
    """Verify that weights have been properly transferred"""
    for name, module in analog_model.named_modules():
        if isinstance(module, AnalogLinear):
            # Find corresponding digital module
            digital_module = digital_model
            try:
                for n in name.split('.'):
                    digital_module = getattr(digital_module, n)
                
                # Compare weight stats
                analog_weight = module.weight.analog_tile.tile.get_weights()[0]
                digital_weight = digital_module.weight.data
                
                print(f"Layer: {name}")
                print(f"  Digital weight stats: min={digital_weight.min():.4f}, max={digital_weight.max():.4f}, mean={digital_weight.mean():.4f}")
                print(f"  Analog weight stats: min={analog_weight.min():.4f}, max={analog_weight.max():.4f}, mean={analog_weight.mean():.4f}")
                
                # Check for large differences
                if abs(digital_weight.mean() - analog_weight.mean()) > 0.1:
                    print("  WARNING: Large difference in mean values!")
            except Exception as e:
                print(f"  Error comparing layer {name}: {e}")
                
    print("Weight verification complete")

class AnalogWhisperForConditionalGeneration(WhisperForConditionalGeneration):
    def __init__(self, config, rpu_config=None, debug=False):
        super().__init__(config)
        global DEBUG
        DEBUG = debug
        # Store rpu_config as an instance attribute - THIS IS THE KEY FIX
        self.rpu_config = rpu_config
        debug_print("Initializing AnalogWhisperForConditionalGeneration model...")
        
        # Count the number of linear layers to be replaced
        linear_count = sum(1 for _ in self.named_modules() if isinstance(_[1], nn.Linear))
        debug_print(f"Found {linear_count} linear layers to convert to analog")
        
        # Replace all linear layers with analog linear layers
        for name, module in self.named_modules():
            if isinstance(module, nn.Linear):
                debug_print(f"Converting linear layer '{name}' to analog")
                # Create analog layer with same dimensions and bias configuration
                analog_layer = AnalogLinear(
                    module.in_features,
                    module.out_features,
                    bias=module.bias is not None,
                    rpu_config=rpu_config
                )
                # Navigate to the parent module to replace the layer
                parent = self
                for n in name.split('.')[:-1]:
                    parent = getattr(parent, n)
                setattr(parent, name.split('.')[-1], analog_layer)
        
        debug_print("All linear layers converted to analog successfully")
        
        # Fix the forced_decoder_ids issue as recommended in GitHub issues
        if hasattr(self.generation_config, "forced_decoder_ids"):
            debug_print("Converting forced_decoder_ids to input_ids in model's generation config")
            self.generation_config.input_ids = self.generation_config.forced_decoder_ids
            self.generation_config.forced_decoder_ids = None

    def transfer_digital_weights(self, digital_model):
        """
        Transfer weights from digital model to analog model using from_digital method
        This ensures proper weight transfer considering analog hardware characteristics
        """
        debug_print("Starting weight transfer from digital to analog model...")
        
        # Count the number of analog layers to transfer weights to
        analog_count = sum(1 for _ in self.named_modules() if isinstance(_[1], AnalogLinear))
        debug_print(f"Found {analog_count} analog layers to transfer weights to")
        
        for name, module in self.named_modules():
            if isinstance(module, AnalogLinear):
                debug_print(f"Transferring weights for analog layer '{name}'")
                # Get the corresponding digital module
                digital_module = digital_model
                for n in name.split('.'):
                    digital_module = getattr(digital_module, n)
                # Use self.rpu_config instead of rpu_config - THIS IS THE KEY FIX
                module.from_digital(digital_module, rpu_config=self.rpu_config)
        
        debug_print("Weight transfer completed successfully")

''')

print("whisper_analog.py file created successfully")


In [None]:
import torch
import torch.nn as nn
import copy
from torch.utils.data import DataLoader
from datasets import load_dataset
from transformers import WhisperProcessor, WhisperForConditionalGeneration, WhisperConfig
import numpy as np
from tqdm import tqdm
import time
from typing import Dict, List, Tuple
import logging
from aihwkit.simulator.configs import InferenceRPUConfig
from aihwkit.exceptions import TileModuleError
from itertools import islice

# Import your AnalogWhisperForConditionalGeneration class
from whisper_analog import AnalogWhisperForConditionalGeneration

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Check CUDA availability
if not torch.cuda.is_available():
    logger.warning("CUDA is not available. Falling back to CPU.")
    DEVICE = "cpu"
else:
    DEVICE = "cuda"
    # Initialize CUDA
    torch.cuda.init()
    logger.info(f"CUDA initialized. Using device: {torch.cuda.get_device_name(0)}")

class WhisperPerformanceEvaluator:
    def __init__(
        self,
        model_name: str = "openai/whisper-tiny.en",  # Default to tiny model
        dataset_name: str = "librispeech_asr",
        split: str = "test.clean",
        batch_size: int = 4,
        debug: bool = False,
        use_mini_dataset: bool = True
    ):
        self.debug = debug
        self.batch_size = batch_size
        self.device = DEVICE
        self.model_name = model_name

        # Load processor and model
        logger.info("Loading processor and model...")
        self.processor = WhisperProcessor.from_pretrained(model_name)

        # Load digital model
        logger.info(f"Loading digital Whisper model: {model_name}")
        self.digital_model = WhisperForConditionalGeneration.from_pretrained(model_name)
        self.digital_model.eval()
        self.digital_model = self.digital_model.to(self.device)

        # Load analog model
        logger.info("Creating and initializing analog Whisper model...")
        config = WhisperConfig.from_pretrained(model_name)
        rpu_config = InferenceRPUConfig()
        self.analog_model = AnalogWhisperForConditionalGeneration(
            config,
            rpu_config=rpu_config,
            debug=debug
        )
        self.analog_model.transfer_digital_weights(self.digital_model)
        
        # Fix for Whisper forced_decoder_ids issue
        if hasattr(self.analog_model.generation_config, "forced_decoder_ids"):
            logger.info("Converting forced_decoder_ids to input_ids in model's generation config")
            self.analog_model.generation_config.input_ids = self.analog_model.generation_config.forced_decoder_ids
            self.analog_model.generation_config.forced_decoder_ids = None
            
        self.analog_model.eval()
        self.analog_model = self.analog_model.to(self.device)

        # Load dataset
        if use_mini_dataset:
            logger.info("Loading Mini LibriSpeech dataset...")
            self.dataset = load_dataset("librispeech_asr", "clean", split="test", streaming=True)
            self.dataset = list(islice(self.dataset, 50))
        else:
            logger.info(f"Loading dataset {dataset_name}...")
            self.dataset = load_dataset(dataset_name, split=split)

        logger.info(f"Dataset size: {len(self.dataset)} samples")
        
        # Create decoder input IDs for Whisper (needed for generation)
        self.language = "en"
        self.task = "transcribe"
        self.language_token_id = self.processor.tokenizer.convert_tokens_to_ids(self.language)
        self.task_token_id = self.processor.tokenizer.convert_tokens_to_ids(self.task)

    def custom_collate_fn(self, batch):
        # Just return the batch as is, we'll handle the processing in preprocess_batch
        return batch

    def preprocess_batch(self, batch: Dict) -> Tuple[torch.Tensor, torch.Tensor]:
        # Extract audio arrays from the batch
        audio_arrays = [item["audio"]["array"] for item in batch]
        
        # Process the batch with padding
        processed = self.processor(
            audio_arrays,
            sampling_rate=16000,
            return_tensors="pt",
            padding=True
        )
        
        # Get input features (mel spectrograms)
        input_features = processed.input_features
        
        # Ensure the features are exactly 3000 frames in length
        batch_size, num_mels, seq_len = input_features.shape
        if seq_len < 3000:
            # Pad to 3000 frames (30 seconds)
            padding = torch.zeros(batch_size, num_mels, 3000 - seq_len, device=input_features.device)
            input_features = torch.cat([input_features, padding], dim=2)
        elif seq_len > 3000:
            # Truncate to 3000 frames (30 seconds)
            input_features = input_features[:, :, :3000]
        
        # Create appropriate attention mask
        attention_mask = torch.ones((batch_size, input_features.shape[2]), dtype=torch.long, device=input_features.device)
        
        return input_features, attention_mask


    def calculate_wer(self, predictions: List[str], references: List[str]) -> float:
        """Calculate Word Error Rate between predictions and references"""
        total_words = sum(len(ref.split()) for ref in references)
        errors = 0
        for pred, ref in zip(predictions, references):
            # Clean and normalize text for comparison
            pred = pred.strip().lower()
            ref = ref.strip().lower()
            
            pred_words = pred.split()
            ref_words = ref.split()
            
            # Calculate Levenshtein distance (minimum edit operations)
            from jiwer import wer
            try:
                current_wer = wer(ref, pred)
                errors += current_wer * len(ref_words)
            except:
                # Fallback method if jiwer fails
                distance = self.levenshtein_distance(pred_words, ref_words)
                errors += distance
                
        return errors / total_words if total_words > 0 else 1.0

    def levenshtein_distance(self, s1, s2):
        """Simple Levenshtein distance implementation for word sequences"""
        if len(s1) < len(s2):
            return self.levenshtein_distance(s2, s1)

        if len(s2) == 0:
            return len(s1)

        previous_row = range(len(s2) + 1)
        for i, c1 in enumerate(s1):
            current_row = [i + 1]
            for j, c2 in enumerate(s2):
                insertions = previous_row[j + 1] + 1
                deletions = current_row[j] + 1
                substitutions = previous_row[j] + (c1 != c2)
                current_row.append(min(insertions, deletions, substitutions))
            previous_row = current_row

        return previous_row[-1]

    def direct_inference(self, model, input_features, attention_mask, decoder_input_ids, device="cuda"):
        """Perform direct inference without using generate() if it's problematic"""
        model.eval()
        with torch.no_grad():
            # Use forward pass instead of generate
            outputs = model(
                input_features, 
                attention_mask=attention_mask,
                decoder_input_ids=decoder_input_ids,
                return_dict=True
            )
            
            # Get the predicted IDs from logits
            predicted_ids = torch.argmax(outputs.logits, dim=-1)
            
            # Decode the predicted IDs to text
            transcription = self.processor.batch_decode(predicted_ids, skip_special_tokens=True)
            
            return transcription

    def evaluate_model(
        self,
        model: nn.Module,
        dataloader: DataLoader,
        device: str = "cuda",
        use_generate: bool = True  # Option to use generate or direct inference
    ) -> Dict:
        """Evaluate a model on the given dataloader and return performance metrics"""
        model = model.to(device)
        predictions = []
        references = []
        inference_times = []

        # Create a custom generation config to handle forced_decoder_ids issue
        generation_config = copy.deepcopy(model.generation_config)
        if hasattr(generation_config, "forced_decoder_ids"):
            generation_config.input_ids = generation_config.forced_decoder_ids
            generation_config.forced_decoder_ids = None

        with torch.no_grad():
            for batch in tqdm(dataloader, desc="Evaluating"):
                # Preprocess batch - get both features and attention mask
                input_features, attention_mask = self.preprocess_batch(batch)
                input_features = input_features.to(device)
                attention_mask = attention_mask.to(device)
                
                # Create decoder_input_ids with the right starting tokens
                decoder_input_ids = torch.tensor([[
                    self.processor.tokenizer.bos_token_id, 
                    self.language_token_id, 
                    self.task_token_id
                ]]).to(device)

                # Measure inference time
                start_time = time.time()
                
                try:
                    if use_generate:
                        # Generate predictions with attention mask and simplified parameters
                        generated_ids = model.generate(
                            input_features,
                            attention_mask=attention_mask,  # Add attention mask
                            decoder_input_ids=decoder_input_ids,
                            generation_config=generation_config,
                            max_length=128,
                            num_beams=1,  # Simple greedy decoding for reliability
                            return_dict_in_generate=True,  # Get the full output object
                            output_scores=False,  # Don't need scores
                            return_timestamps=False  # Don't return timestamps for simplicity
                        ).sequences
                        
                        # Decode the generated IDs to text
                        transcription = self.processor.batch_decode(generated_ids, skip_special_tokens=True)
                    else:
                        # Use direct inference as fallback
                        transcription = self.direct_inference(
                            model, 
                            input_features, 
                            attention_mask, 
                            decoder_input_ids, 
                            device
                        )
                    
                    inference_time = time.time() - start_time
                    inference_times.append(inference_time)
                    
                    predictions.extend(transcription)
                    # Extract reference text from each item in the batch
                    references.extend([item["text"] for item in batch])
                    
                except Exception as e:
                    logger.error(f"Error during inference: {e}")
                    # Try fallback to direct inference if generate failed
                    if use_generate:
                        logger.info("Falling back to direct inference...")
                        try:
                            start_time = time.time()
                            transcription = self.direct_inference(
                                model, 
                                input_features, 
                                attention_mask, 
                                decoder_input_ids, 
                                device
                            )
                            inference_time = time.time() - start_time
                            inference_times.append(inference_time)
                            
                            predictions.extend(transcription)
                            references.extend([item["text"] for item in batch])
                        except Exception as e2:
                            logger.error(f"Fallback also failed: {e2}")
                            # Skip this batch
                            continue
                    else:
                        # Skip this batch
                        continue

        # Calculate metrics only if we have predictions
        if predictions:
            wer = self.calculate_wer(predictions, references)
            avg_inference_time = np.mean(inference_times)

            # Print sample predictions
            logger.info("Sample predictions:")
            for i in range(min(3, len(predictions))):
                logger.info(f"Reference: {references[i]}")
                logger.info(f"Prediction: {predictions[i]}")
                logger.info("---")

            return {
                "wer": wer,
                "avg_inference_time": avg_inference_time,
                "total_samples": len(predictions)
            }
        else:
            logger.warning("No successful predictions were generated")
            return {
                "wer": 1.0,  # Maximum error
                "avg_inference_time": 0.0,
                "total_samples": 0
            }

    def run_comparison(self, num_samples: int = 20, use_generate: bool = True) -> Dict:
        """Run a comparison between digital and analog models"""
        data = list(islice(self.dataset, num_samples))
        dataloader = DataLoader(
            data,
            batch_size=self.batch_size,
            collate_fn=self.custom_collate_fn
        )

        logger.info("Evaluating digital model...")
        digital_results = self.evaluate_model(self.digital_model, dataloader, use_generate=use_generate)

        logger.info("Evaluating analog model...")
        analog_results = self.evaluate_model(self.analog_model, dataloader, use_generate=use_generate)

        wer_diff = analog_results["wer"] - digital_results["wer"]
        time_diff = analog_results["avg_inference_time"] - digital_results["avg_inference_time"]
        
        # Calculate acceleration and quality degradation percentages
        time_speedup_pct = (digital_results["avg_inference_time"] / max(0.001, analog_results["avg_inference_time"]) - 1) * 100
        wer_increase_pct = (analog_results["wer"] / max(0.001, digital_results["wer"]) - 1) * 100

        return {
            "digital": digital_results,
            "analog": analog_results,
            "wer_difference": wer_diff,
            "wer_increase_percent": wer_increase_pct,
            "time_difference": time_diff,
            "time_speedup_percent": time_speedup_pct
        }

    def analyze_noise_sensitivity(
        self,
        noise_levels: List[float],
        num_samples: int = 10,
        use_generate: bool = True
    ) -> Dict:
        """Analyze the impact of different noise levels on model performance"""
        results = {}

        for noise_level in noise_levels:
            logger.info(f"Testing noise level: {noise_level}")

            # Create a new RPU config with specified noise level
            rpu_config = InferenceRPUConfig()
            rpu_config.forward.out_noise = noise_level  # Adjust noise parameter

            # Create a new analog model with this noise level
            noisy_model = AnalogWhisperForConditionalGeneration(
                self.digital_model.config,
                rpu_config=rpu_config,
                debug=self.debug
            )
            noisy_model.transfer_digital_weights(self.digital_model)
            
            # Fix for Whisper forced_decoder_ids issue
            if hasattr(noisy_model.generation_config, "forced_decoder_ids"):
                noisy_model.generation_config.input_ids = noisy_model.generation_config.forced_decoder_ids
                noisy_model.generation_config.forced_decoder_ids = None
                
            noisy_model.eval()

            try:
                noisy_model = noisy_model.to(self.device)
            except TileModuleError as e:
                logger.warning(f"Analog model with noise could not use CUDA. Falling back to CPU. Reason: {e}")
                self.device = "cpu"
                noisy_model = noisy_model.to(self.device)

            data = list(islice(self.dataset, num_samples))
            dataloader = DataLoader(
                data,
                batch_size=self.batch_size,
                collate_fn=self.custom_collate_fn
            )
            results[noise_level] = self.evaluate_model(
                noisy_model, 
                dataloader, 
                device=self.device,
                use_generate=use_generate
            )

        return results

def main():
    # Print transformers version for debugging
    import transformers
    logger.info(f"Using transformers version: {transformers.__version__}")
    
    # You can choose different model sizes: 
    # "openai/whisper-tiny.en" (39M params)
    # "openai/whisper-base.en" (74M params)
    # "openai/whisper-small.en" (244M params)
    evaluator = WhisperPerformanceEvaluator(
        model_name="openai/whisper-tiny.en",
        debug=False, 
        use_mini_dataset=True,
        batch_size=2  # Smaller batch size to avoid CUDA OOM
    )

    # Set to False to use direct inference instead of generate()
    use_generate = False
    
    logger.info(f"Running basic comparison between digital and analog Whisper models (use_generate={use_generate})...")
    comparison_results = evaluator.run_comparison(num_samples=10, use_generate=use_generate)
    logger.info("==== DIGITAL VS ANALOG COMPARISON RESULTS ====")
    logger.info(f"Digital WER: {comparison_results['digital']['wer']:.4f}")
    logger.info(f"Analog WER: {comparison_results['analog']['wer']:.4f}")
    logger.info(f"WER increase: {comparison_results['wer_increase_percent']:.2f}%")
    logger.info(f"Digital inference time: {comparison_results['digital']['avg_inference_time']:.4f}s")
    logger.info(f"Analog inference time: {comparison_results['analog']['avg_inference_time']:.4f}s")
    logger.info(f"Speed change: {comparison_results['time_speedup_percent']:.2f}%")
    logger.info("============================================")

    # Uncomment to run noise sensitivity analysis
    # logger.info("Running noise sensitivity analysis...")
    # noise_levels = [0.0, 0.05, 0.1, 0.2]
    # noise_results = evaluator.analyze_noise_sensitivity(noise_levels, num_samples=5, use_generate=use_generate)
    # logger.info("==== NOISE SENSITIVITY RESULTS ====")
    # for level, result in noise_results.items():
    #     logger.info(f"Noise level {level}: WER = {result['wer']:.4f}, Inference time = {result['avg_inference_time']:.4f}s")
    # logger.info("===================================")

if __name__ == "__main__":
    main()


### New Experiments

### OPENAI

In [1]:
import time
import whisper
import os
from pathlib import Path

# Get the current working directory
current_dir = os.getcwd()
print(f"Current working directory: {current_dir}")

# Option 1: List all audio files in the current directory
audio_extensions = ['.mp3', '.wav', '.flac', '.m4a', '.ogg']
audio_files = []

for file in os.listdir(current_dir):
    # Check if the file has an audio extension
    if any(file.lower().endswith(ext) for ext in audio_extensions):
        audio_files.append(file)

# Print available audio files
if audio_files:
    print("Available audio files:")
    for i, file in enumerate(audio_files):
        print(f"{i+1}. {file}")
    
    selected_file = audio_files[0]  # Use the first audio file by default
    # selected_file = input("Enter the number or name of the file you want to use: ")
    
    # Handle numeric selection
    if selected_file.isdigit() and 1 <= int(selected_file) <= len(audio_files):
        selected_file = audio_files[int(selected_file)-1]
    
    # Create full path to the audio file
    audio_path = os.path.join(current_dir, selected_file)
    print(f"Using audio file: {audio_path}")

Current working directory: /home/tgs2126
Available audio files:
1. audio.wav
Using audio file: /home/tgs2126/audio.wav


In [3]:
import time
import whisper

# Load the Whisper "small" model once (this is outside the loop to avoid reloading each time)
print("Loading Whisper 'small' model...")
model = whisper.load_model("small")
print("Model loaded.")

# Define number of runs and an array to accumulate elapsed time
n_runs = 100
times = []
last_result = None

print(f"\nRunning full pipeline (audio loading + inference) {n_runs} times...")

# Loop over runs
for i in range(n_runs):
    start_time = time.time()
    
    # --- Audio Loading and Preprocessing ---
    # Load the audio file
    audio = whisper.load_audio(audio_path)
    # Pad or trim the audio to 30 seconds (default behavior)
    audio = whisper.pad_or_trim(audio)
    
    # Generate log-mel spectrogram and move to the model's device
    mel = whisper.log_mel_spectrogram(audio, n_mels=model.dims.n_mels).to(model.device)
    
    # --- Inference ---
    # Detect spoken language (optional)
    _, probs = model.detect_language(mel)
    detected_language = max(probs, key=probs.get)
    
    # Decode the audio using default decoding options
    options = whisper.DecodingOptions()
    last_result = whisper.decode(model, mel, options)
    
    elapsed = time.time() - start_time
    times.append(elapsed)
    print(f"Run {i+1}: {elapsed:.3f} seconds, Detected language: {detected_language}")

# Compute and print the average time over all runs
avg_time = sum(times) / n_runs
print(f"\nAverage full pipeline runtime over {n_runs} runs: {avg_time:.3f} seconds")

# Print the recognized text from the last run
print("\nRecognized text from the last run:")
print(last_result.text)


Loading Whisper 'small' model...
Model loaded.

Running full pipeline (audio loading + inference) 100 times...
Run 1: 1.261 seconds, Detected language: en
Run 2: 1.157 seconds, Detected language: en
Run 3: 1.168 seconds, Detected language: en
Run 4: 1.167 seconds, Detected language: en
Run 5: 1.174 seconds, Detected language: en
Run 6: 1.161 seconds, Detected language: en
Run 7: 1.159 seconds, Detected language: en
Run 8: 1.166 seconds, Detected language: en
Run 9: 1.180 seconds, Detected language: en
Run 10: 1.183 seconds, Detected language: en
Run 11: 1.185 seconds, Detected language: en
Run 12: 1.169 seconds, Detected language: en
Run 13: 1.166 seconds, Detected language: en
Run 14: 1.176 seconds, Detected language: en
Run 15: 1.172 seconds, Detected language: en
Run 16: 1.173 seconds, Detected language: en
Run 17: 1.166 seconds, Detected language: en
Run 18: 1.179 seconds, Detected language: en
Run 19: 1.176 seconds, Detected language: en
Run 20: 1.179 seconds, Detected language: e

### IBM

In [5]:
import time
import whisper
import torch
from aihwkit.simulator.configs import InferenceRPUConfig
from aihwkit.nn.conversion import convert_to_analog  # Import the conversion utility

# --- Load the digital Whisper model ---
print("Loading digital Whisper 'small' model...")
model = whisper.load_model("small")
print("Digital model loaded.")

# --- Create RPU configuration for analog inference ---
rpu_config = InferenceRPUConfig()

# --- Convert the model to analog using the convert_to_analog utility ---
print("Converting model to analog...")
# Convert the entire model at once using the utility function
analog_model = convert_to_analog(model, rpu_config)
print("Analog conversion complete.")

# --- Benchmark the full pipeline ---
n_runs = 100
times = []
last_result = None

print(f"\nRunning full pipeline (audio loading, preprocessing & inference) {n_runs} times...")

for i in range(n_runs):
    start_time = time.time()
    
    # Audio Loading and Preprocessing:
    audio = whisper.load_audio(audio_path)
    audio = whisper.pad_or_trim(audio)
    mel = whisper.log_mel_spectrogram(audio, n_mels=model.dims.n_mels).to(analog_model.device)
    
    # Inference with the analog model:
    _, probs = analog_model.detect_language(mel)
    detected_language = max(probs, key=probs.get)
    options = whisper.DecodingOptions()  # default options
    last_result = whisper.decode(analog_model, mel, options)
    
    elapsed = time.time() - start_time
    times.append(elapsed)
    print(f"Run {i+1}: {elapsed:.3f} seconds, Detected language: {detected_language}")

avg_time = sum(times) / n_runs
print(f"\nAverage total pipeline runtime over {n_runs} runs: {avg_time:.3f} seconds")
print("\nRecognized text from the last run:")
print(last_result.text)


Loading digital Whisper 'small' model...
Digital model loaded.
Converting model to analog...
Analog conversion complete.

Running full pipeline (audio loading, preprocessing & inference) 100 times...
Run 1: 1.287 seconds, Detected language: en
Run 2: 1.199 seconds, Detected language: en
Run 3: 1.166 seconds, Detected language: en
Run 4: 1.194 seconds, Detected language: en
Run 5: 1.164 seconds, Detected language: en
Run 6: 1.171 seconds, Detected language: en
Run 7: 1.190 seconds, Detected language: en
Run 8: 1.194 seconds, Detected language: en
Run 9: 1.169 seconds, Detected language: en
Run 10: 1.179 seconds, Detected language: en
Run 11: 1.187 seconds, Detected language: en
Run 12: 1.175 seconds, Detected language: en
Run 13: 1.204 seconds, Detected language: en
Run 14: 1.181 seconds, Detected language: en
Run 15: 1.207 seconds, Detected language: en
Run 16: 1.200 seconds, Detected language: en
Run 17: 1.233 seconds, Detected language: en
Run 18: 1.203 seconds, Detected language: en