<a href="https://colab.research.google.com/github/maruf4461/Comparative-analysis-of-RAG-performance-on-Open-Source-LLM_openDB/blob/main/utils.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [20]:
"""
RAG Research Project Utilities
Comprehensive utility functions for multi-model RAG evaluation
"""

import os
import sys
import json
import pickle
import time
import logging
import traceback
from datetime import datetime
from typing import Dict, List, Any, Optional, Union
import pandas as pd
import numpy as np
import torch
import gc
import psutil
from pathlib import Path

class ProjectUtils:
    """Comprehensive utility class for RAG research project"""

    def __init__(self, project_dir: str = '/content/drive/MyDrive/RAG_Research_Complete'):
        self.project_dir = project_dir
        self.logs = []
        self.setup_logging()
        self.ensure_project_structure()

    def setup_logging(self):
        """Setup comprehensive logging"""
        log_dir = os.path.join(self.project_dir, 'logs')
        os.makedirs(log_dir, exist_ok=True)

        # Create logger
        self.logger = logging.getLogger('RAGResearch')
        self.logger.setLevel(logging.DEBUG)

        # Create file handler
        log_file = os.path.join(log_dir, f'rag_research_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log')
        file_handler = logging.FileHandler(log_file)
        file_handler.setLevel(logging.DEBUG)

        # Create console handler
        console_handler = logging.StreamHandler()
        console_handler.setLevel(logging.INFO)

        # Create formatter
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        file_handler.setFormatter(formatter)
        console_handler.setFormatter(formatter)

        # Add handlers to logger
        if not self.logger.handlers:
            self.logger.addHandler(file_handler)
            self.logger.addHandler(console_handler)

    def ensure_project_structure(self):
        """Ensure all project directories exist"""
        directories = [
            'data/raw/msmarco',
            'data/raw/natural_questions',
            'data/raw/squad',
            'data/raw/hotpotqa',
            'data/processed/chunks',
            'data/processed/embeddings',
            'models/llama2_7b',
            'models/llama2_13b',
            'models/mistral_7b',
            'models/codellama_7b',
            'models/llama3_8b',
            'results/experiments',
            'results/analysis',
            'results/plots',
            'results/tables',
            'src/models',
            'src/evaluation',
            'src/data_processing',
            'configs',
            'logs',
            'checkpoints'
        ]

        for directory in directories:
            full_path = os.path.join(self.project_dir, directory)
            os.makedirs(full_path, exist_ok=True)

    def log(self, message: str, level: str = "INFO"):
        """Log messages with timestamp"""
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        log_entry = f"[{timestamp}] {level}: {message}"
        self.logs.append(log_entry)

        # Use logger
        if level.upper() == "ERROR":
            self.logger.error(message)
        elif level.upper() == "WARNING":
            self.logger.warning(message)
        elif level.upper() == "DEBUG":
            self.logger.debug(message)
        else:
            self.logger.info(message)

        # Also print to console with emoji
        emoji_map = {
            "ERROR": "❌",
            "WARNING": "⚠️",
            "INFO": "ℹ️",
            "DEBUG": "🐛",
            "SUCCESS": "✅"
        }
        emoji = emoji_map.get(level.upper(), "📝")
        print(f"{emoji} {message}")

    def save_data(self, data: Any, filepath: str, format: str = 'json'):
        """Save data in various formats"""
        full_path = os.path.join(self.project_dir, filepath)
        os.makedirs(os.path.dirname(full_path), exist_ok=True)

        try:
            if format == 'json':
                with open(full_path, 'w', encoding='utf-8') as f:
                    json.dump(data, f, indent=2, default=str, ensure_ascii=False)
            elif format == 'pickle':
                with open(full_path, 'wb') as f:
                    pickle.dump(data, f)
            elif format == 'csv':
                if isinstance(data, pd.DataFrame):
                    data.to_csv(full_path, index=False)
                elif isinstance(data, str):
                    with open(full_path, 'w', encoding='utf-8') as f:
                        f.write(data)
                else:
                    pd.DataFrame(data).to_csv(full_path, index=False)
            elif format == 'parquet':
                if isinstance(data, pd.DataFrame):
                    data.to_parquet(full_path, index=False)
                else:
                    pd.DataFrame(data).to_parquet(full_path, index=False)
            elif format == 'txt':
                with open(full_path, 'w', encoding='utf-8') as f:
                    if isinstance(data, str):
                        f.write(data)
                    else:
                        f.write(str(data))

            self.log(f"Saved data to {filepath}", "SUCCESS")
            return True

        except Exception as e:
            self.log(f"Failed to save {filepath}: {e}", "ERROR")
            self.logger.exception("Save data error:")
            return False

    def load_data(self, filepath: str, format: str = 'json'):
        """Load data in various formats"""
        full_path = os.path.join(self.project_dir, filepath)

        if not os.path.exists(full_path):
            self.log(f"File not found: {filepath}", "WARNING")
            return None

        try:
            if format == 'json':
                with open(full_path, 'r', encoding='utf-8') as f:
                    return json.load(f)
            elif format == 'pickle':
                with open(full_path, 'rb') as f:
                    return pickle.load(f)
            elif format == 'csv':
                return pd.read_csv(full_path)
            elif format == 'parquet':
                return pd.read_parquet(full_path)
            elif format == 'txt':
                with open(full_path, 'r', encoding='utf-8') as f:
                    return f.read()

        except Exception as e:
            self.log(f"Failed to load {filepath}: {e}", "ERROR")
            self.logger.exception("Load data error:")
            return None

    def clear_gpu_memory(self):
        """Clear GPU memory comprehensively"""
        if torch.cuda.is_available():
            # Clear PyTorch cache
            torch.cuda.empty_cache()

            # Force garbage collection
            gc.collect()

            # Clear CUDA context if needed
            try:
                torch.cuda.synchronize()
            except:
                pass

            self.log("GPU memory cleared", "SUCCESS")

            # Log memory status
            if torch.cuda.is_available():
                memory_allocated = torch.cuda.memory_allocated() / 1e9
                memory_reserved = torch.cuda.memory_reserved() / 1e9
                self.log(f"GPU Memory - Allocated: {memory_allocated:.2f}GB, Reserved: {memory_reserved:.2f}GB")

    def get_system_info(self):
        """Get comprehensive system information"""
        try:
            info = {
                'timestamp': datetime.now().isoformat(),
                'gpu_available': torch.cuda.is_available(),
                'gpu_count': torch.cuda.device_count() if torch.cuda.is_available() else 0,
                'cpu_count': psutil.cpu_count(),
                'memory_total_gb': psutil.virtual_memory().total / 1e9,
                'memory_available_gb': psutil.virtual_memory().available / 1e9,
                'memory_percent': psutil.virtual_memory().percent,
                'disk_free_gb': psutil.disk_usage('/content' if os.path.exists('/content') else '/').free / 1e9,
                'python_version': sys.version,
                'torch_version': torch.__version__ if torch else 'Not available'
            }

            if torch.cuda.is_available():
                info['gpu_name'] = torch.cuda.get_device_name()
                info['gpu_memory_total_gb'] = torch.cuda.get_device_properties(0).total_memory / 1e9
                info['gpu_memory_allocated_gb'] = torch.cuda.memory_allocated() / 1e9
                info['gpu_memory_reserved_gb'] = torch.cuda.memory_reserved() / 1e9

            return info

        except Exception as e:
            self.log(f"Error getting system info: {e}", "ERROR")
            return {'error': str(e)}

    def save_checkpoint(self, data: Dict[str, Any], name: str):
        """Save experiment checkpoint with metadata"""
        timestamp = int(time.time())
        checkpoint_data = {
            'checkpoint_name': name,
            'timestamp': timestamp,
            'datetime': datetime.now().isoformat(),
            'system_info': self.get_system_info(),
            'data': data,
            'logs_snapshot': self.logs[-10:] if len(self.logs) > 10 else self.logs  # Last 10 logs
        }

        filepath = f"checkpoints/{name}_{timestamp}.json"
        success = self.save_data(checkpoint_data, filepath, 'json')

        if success:
            self.log(f"Checkpoint saved: {name}", "SUCCESS")

        return success

    def load_latest_checkpoint(self, name_pattern: str):
        """Load most recent checkpoint matching pattern"""
        checkpoint_dir = os.path.join(self.project_dir, 'checkpoints')

        if not os.path.exists(checkpoint_dir):
            self.log(f"Checkpoint directory not found: {checkpoint_dir}", "WARNING")
            return None

        try:
            files = [f for f in os.listdir(checkpoint_dir)
                    if name_pattern in f and f.endswith('.json')]

            if not files:
                self.log(f"No checkpoints found matching pattern: {name_pattern}", "WARNING")
                return None

            # Sort by timestamp in filename
            latest_file = max(files, key=lambda x: int(x.split('_')[-1].split('.')[0]))

            self.log(f"Loading checkpoint: {latest_file}", "SUCCESS")
            return self.load_data(f"checkpoints/{latest_file}", 'json')

        except Exception as e:
            self.log(f"Error loading checkpoint: {e}", "ERROR")
            return None

    def monitor_resources(self):
        """Monitor system resources"""
        try:
            # CPU and Memory
            cpu_percent = psutil.cpu_percent(interval=1)
            memory = psutil.virtual_memory()

            # GPU if available
            gpu_info = ""
            if torch.cuda.is_available():
                gpu_memory = torch.cuda.memory_allocated() / 1e9
                gpu_total = torch.cuda.get_device_properties(0).total_memory / 1e9
                gpu_percent = (gpu_memory / gpu_total) * 100
                gpu_info = f", GPU: {gpu_percent:.1f}% ({gpu_memory:.1f}/{gpu_total:.1f}GB)"

            self.log(f"Resources - CPU: {cpu_percent:.1f}%, RAM: {memory.percent:.1f}%{gpu_info}")

            # Warning thresholds
            if memory.percent > 90:
                self.log("High memory usage detected!", "WARNING")
            if torch.cuda.is_available() and (torch.cuda.memory_allocated() / torch.cuda.get_device_properties(0).total_memory) > 0.9:
                self.log("High GPU memory usage detected!", "WARNING")

        except Exception as e:
            self.log(f"Error monitoring resources: {e}", "ERROR")

    def handle_exception(self, e: Exception, context: str = ""):
        """Handle exceptions with comprehensive logging"""
        error_msg = f"Exception in {context}: {str(e)}"
        self.log(error_msg, "ERROR")

        # Log full traceback
        self.logger.error(f"Full traceback for {context}:")
        self.logger.error(traceback.format_exc())

        # Save error report
        error_report = {
            'timestamp': datetime.now().isoformat(),
            'context': context,
            'error_message': str(e),
            'error_type': type(e).__name__,
            'traceback': traceback.format_exc(),
            'system_info': self.get_system_info()
        }

        error_filename = f"error_report_{int(time.time())}.json"
        self.save_data(error_report, f"logs/{error_filename}")

        return error_report

    def create_experiment_config(self, models: List[str], rag_configs: List[str],
                               datasets: List[str], metrics: List[str]):
        """Create comprehensive experiment configuration"""
        config = {
            'project_name': 'RAG_Research_Complete',
            'created_timestamp': datetime.now().isoformat(),
            'models': {
                'llama2_7b': {
                    'model_name': 'meta-llama/Llama-2-7b-chat-hf',
                    'display_name': 'LLaMA 2 7B',
                    'max_memory_gb': 14,
                    'use_8bit': True,
                    'model_type': 'llama2'
                },
                'llama2_13b': {
                    'model_name': 'meta-llama/Llama-2-13b-chat-hf',
                    'display_name': 'LLaMA 2 13B',
                    'max_memory_gb': 26,
                    'use_8bit': True,
                    'model_type': 'llama2'
                },
                'mistral_7b': {
                    'model_name': 'mistralai/Mistral-7B-Instruct-v0.1',
                    'display_name': 'Mistral 7B',
                    'max_memory_gb': 14,
                    'use_8bit': True,
                    'model_type': 'mistral'
                },
                'codellama_7b': {
                    'model_name': 'codellama/CodeLlama-7b-Instruct-hf',
                    'display_name': 'CodeLLaMA 7B',
                    'max_memory_gb': 14,
                    'use_8bit': True,
                    'model_type': 'codellama'
                },
                'llama3_8b': {
                    'model_name': 'meta-llama/Meta-Llama-3-8B-Instruct',
                    'display_name': 'LLaMA 3 8B',
                    'max_memory_gb': 16,
                    'use_8bit': True,
                    'model_type': 'llama3'
                }
            },
            'datasets': {
                'msmarco': {
                    'name': 'MS MARCO',
                    'type': 'passage_ranking',
                    'size_limit': 100000,
                    'chunk_size': 256,
                    'overlap': 25
                },
                'natural_questions': {
                    'name': 'Natural Questions',
                    'type': 'question_answering',
                    'size_limit': 10000,
                    'chunk_size': 512,
                    'overlap': 50
                },
                'squad': {
                    'name': 'SQuAD 2.0',
                    'type': 'reading_comprehension',
                    'size_limit': 20000,
                    'chunk_size': 512,
                    'overlap': 50
                },
                'hotpotqa': {
                    'name': 'HotpotQA',
                    'type': 'multi_hop_qa',
                    'size_limit': 10000,
                    'chunk_size': 512,
                    'overlap': 50
                }
            },
            'rag_configurations': {
                'basic': {
                    'name': 'Basic RAG',
                    'description': 'Top-3 retrieved documents with simple concatenation',
                    'top_k': 3,
                    'rerank': False,
                    'query_expansion': False
                },
                'enhanced': {
                    'name': 'Enhanced RAG',
                    'description': 'Top-5 documents with relevance scoring and filtering',
                    'top_k': 5,
                    'rerank': True,
                    'query_expansion': False,
                    'relevance_threshold': 0.5
                },
                'optimized': {
                    'name': 'Optimized RAG',
                    'description': 'Dynamic document selection with query analysis',
                    'top_k': 'dynamic',
                    'rerank': True,
                    'query_expansion': True,
                    'query_analysis': True
                }
            },
            'evaluation_metrics': {
                'generation_quality': ['rouge1_f', 'rouge2_f', 'rougeL_f', 'bleu', 'bert_score_f1'],
                'retrieval_quality': ['recall_at_k', 'mrr', 'ndcg'],
                'semantic_quality': ['semantic_similarity'],
                'efficiency': ['retrieval_time', 'generation_time', 'total_time']
            },
            'vector_databases': ['chromadb', 'faiss'],
            'embedding_model': 'all-MiniLM-L6-v2',
            'experiment_settings': {
                'max_new_tokens': 256,
                'temperature': 0.7,
                'batch_size': 1,
                'max_experiments_per_session': 100,
                'checkpoint_frequency': 10
            }
        }

        self.save_data(config, 'configs/project_config.json')
        self.log("Experiment configuration created", "SUCCESS")
        return config

    def validate_environment(self):
        """Validate that all required components are available"""
        validation_results = {
            'timestamp': datetime.now().isoformat(),
            'checks': {},
            'overall_status': True
        }

        # Check GPU availability
        gpu_available = torch.cuda.is_available()
        validation_results['checks']['gpu'] = {
            'available': gpu_available,
            'count': torch.cuda.device_count() if gpu_available else 0,
            'name': torch.cuda.get_device_name() if gpu_available else None,
            'memory_gb': torch.cuda.get_device_properties(0).total_memory / 1e9 if gpu_available else 0
        }

        if not gpu_available:
            self.log("GPU not available - some operations may be slow", "WARNING")

        # Check memory
        memory = psutil.virtual_memory()
        memory_gb = memory.total / 1e9
        validation_results['checks']['memory'] = {
            'total_gb': memory_gb,
            'available_gb': memory.available / 1e9,
            'adequate': memory_gb >= 12  # Minimum for model loading
        }

        if memory_gb < 12:
            self.log("Low system memory - may cause issues with larger models", "WARNING")
            validation_results['overall_status'] = False

        # Check disk space
        disk_free = psutil.disk_usage('/content' if os.path.exists('/content') else '/').free / 1e9
        validation_results['checks']['disk'] = {
            'free_gb': disk_free,
            'adequate': disk_free >= 20  # Minimum for datasets and models
        }

        if disk_free < 20:
            self.log("Low disk space - may not be sufficient for all datasets", "WARNING")
            validation_results['overall_status'] = False

        # Check project structure
        required_dirs = ['data', 'models', 'results', 'configs', 'logs', 'checkpoints']
        missing_dirs = []
        for dir_name in required_dirs:
            dir_path = os.path.join(self.project_dir, dir_name)
            if not os.path.exists(dir_path):
                missing_dirs.append(dir_name)

        validation_results['checks']['project_structure'] = {
            'all_present': len(missing_dirs) == 0,
            'missing_directories': missing_dirs
        }

        if missing_dirs:
            self.log(f"Missing directories: {missing_dirs}", "WARNING")
            # Try to create them
            self.ensure_project_structure()

        # Save validation results
        self.save_data(validation_results, 'logs/environment_validation.json')

        if validation_results['overall_status']:
            self.log("Environment validation passed", "SUCCESS")
        else:
            self.log("Environment validation completed with warnings", "WARNING")

        return validation_results

    def get_progress_summary(self):
        """Get summary of project progress"""
        summary = {
            'timestamp': datetime.now().isoformat(),
            'phases': {}
        }

        # Check each phase
        phases = {
            'setup': 'configs/project_config.json',
            'data_preparation': 'data/processing_summary.json',
            'model_implementation': 'results/model_loading_stats.json',
            'rag_pipeline': 'results/rag_pipeline_test.json',
            'evaluation': 'results/experiments/complete_results.json',
            'analysis': 'results/analysis/comprehensive_statistical_report.md'
        }

        for phase_name, indicator_file in phases.items():
            file_exists = os.path.exists(os.path.join(self.project_dir, indicator_file))
            summary['phases'][phase_name] = {
                'completed': file_exists,
                'indicator_file': indicator_file
            }

        # Calculate overall progress
        completed_phases = sum(1 for phase in summary['phases'].values() if phase['completed'])
        total_phases = len(summary['phases'])
        summary['overall_progress'] = {
            'completed_phases': completed_phases,
            'total_phases': total_phases,
            'percentage': (completed_phases / total_phases) * 100
        }

        self.log(f"Project progress: {completed_phases}/{total_phases} phases completed", "INFO")
        return summary

    def get_config(self):
        """Get or create project configuration"""
        config_path = 'configs/project_config.json'

        # Try to load existing config
        existing_config = self.load_data(config_path)

        if existing_config is not None:
            self.log("Loaded existing project configuration", "SUCCESS")
            return existing_config

        # Create default config if none exists
        self.log("No existing config found, creating default configuration")

        default_config = self.create_experiment_config(
            models=['llama2_7b', 'llama2_13b', 'mistral_7b', 'codellama_7b', 'llama3_8b'],
            rag_configs=['basic', 'enhanced', 'optimized'],
            datasets=['msmarco', 'natural_questions', 'squad', 'hotpotqa'],
            metrics=['rouge1_f', 'rouge2_f', 'rougeL_f', 'bleu', 'bert_score_f1', 'recall_at_k', 'mrr', 'ndcg']
        )

        return default_config

    def initialize_project(self):
        """Initialize the complete project structure and configuration"""
        self.log("Initializing RAG Research Project", "INFO")

        # Ensure directory structure
        self.ensure_project_structure()

        # Validate environment
        validation = self.validate_environment()

        # Create/load configuration
        config = self.get_config()

        # Log initialization complete
        self.log("Project initialization complete", "SUCCESS")

        return {
            'config': config,
            'validation': validation,
            'project_dir': self.project_dir,
            'timestamp': datetime.now().isoformat()
        }

# Create a global utils instance
utils = None

def get_utils(project_dir: str = '/content/drive/MyDrive/RAG_Research_Complete'):
    """Get or create global utils instance"""
    global utils
    if utils is None:
        utils = ProjectUtils(project_dir)
    return utils

# Convenience functions
def log(message: str, level: str = "INFO"):
    """Convenience function for logging"""
    global utils
    if utils is None:
        utils = get_utils()
    utils.log(message, level)

def save_data(data: Any, filepath: str, format: str = 'json'):
    """Convenience function for saving data"""
    global utils
    if utils is None:
        utils = get_utils()
    return utils.save_data(data, filepath, format)

def load_data(filepath: str, format: str = 'json'):
    """Convenience function for loading data"""
    global utils
    if utils is None:
        utils = get_utils()
    return utils.load_data(filepath, format)