In [None]:
# PART 1: SETUP AND DEPENDENCIES
# Run this in Google Colab first

# Install dependencies
!pip install streamlit>=1.28.0
!pip install openai-whisper
!pip install sentence-transformers
!pip install faiss-cpu
!pip install gtts
!pip install pydub
!pip install groq
!pip install python-docx
!pip install PyPDF2
!pip install soundfile
!pip install librosa
!pip install torch
!pip install pyngrok

# System packages for Colab
!apt-get update &> /dev/null
!apt-get install -y ffmpeg &> /dev/null

print("✅ Dependencies installed successfully!")

Collecting openai-whisper
  Downloading openai-whisper-20240930.tar.gz (800 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/800.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━[0m [32m553.0/800.5 kB[0m [31m18.1 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m800.5/800.5 kB[0m [31m16.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch->openai-whisper)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch->openai-whisper)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda

In [None]:
# PART 2: IMPORTS AND CONFIGURATION

import streamlit as st
import whisper
import torch
import numpy as np
from sentence_transformers import SentenceTransformer
import faiss
import json
import os
from gtts import gTTS
from pydub import AudioSegment
import tempfile
import io
from pathlib import Path
import PyPDF2
from docx import Document
import logging
from typing import List, Dict, Tuple, Optional
import time
from datetime import datetime
import base64
import soundfile as sf
import librosa
from groq import Groq
import re

# For Colab compatibility
try:
    import IPython.display as ipd
    from google.colab import files
    COLAB_MODE = True
    print("✅ Running in Google Colab mode")
except ImportError:
    COLAB_MODE = False
    print("✅ Running in local mode")

# Configuration Class
class Config:
    """Configuration class for the RAG system"""

    # API Configuration
    GROQ_API_KEY = ""  # Will be set via Streamlit input
    GROQ_MODEL = "llama3-8b-8192"  # Llama model via Groq

    # Model configurations
    WHISPER_MODEL = "base"  # Options: tiny, base, small, medium, large
    EMBEDDING_MODEL = "all-MiniLM-L6-v2"  # Multilingual sentence transformer

    # Supported languages
    SUPPORTED_LANGUAGES = {
        'en': 'English',
        'es': 'Spanish',
        'fr': 'French',
        'de': 'German',
        'it': 'Italian',
        'pt': 'Portuguese',
        'ru': 'Russian',
        'zh': 'Chinese',
        'ja': 'Japanese',
        'ko': 'Korean',
        'hi': 'Hindi',
        'ar': 'Arabic'
    }

    # File paths
    DOCUMENTS_DIR = "documents"
    AUDIO_DIR = "audio"
    EMBEDDINGS_FILE = "embeddings.index"
    METADATA_FILE = "metadata.json"

    # Vector database settings
    EMBEDDING_DIM = 384
    TOP_K_RETRIEVAL = 5

    # Audio settings
    SAMPLE_RATE = 16000
    AUDIO_FORMAT = "wav"

    # System prompts
    SYSTEM_PROMPT = """You are an AI assistant designed to help visually impaired users access and understand content.
    You provide clear, concise, and helpful responses based on retrieved documents.
    Always respond in the same language as the user's question.
    Be empathetic and accessibility-focused in your responses."""

print("✅ Configuration loaded successfully!")

✅ Running in Google Colab mode
✅ Configuration loaded successfully!


In [None]:
# PART 3: DOCUMENT PROCESSING

class DocumentProcessor:
    """Handles document loading and preprocessing"""

    def __init__(self):
        self.supported_formats = ['.txt', '.pdf', '.docx', '.md']
        print("✅ DocumentProcessor initialized")

    def load_documents(self, directory: str) -> List[Dict]:
        """Load and process documents from directory"""
        documents = []

        if not os.path.exists(directory):
            os.makedirs(directory)
            print(f"Created directory: {directory}")
            return documents

        for file_path in Path(directory).rglob('*'):
            if file_path.suffix.lower() in self.supported_formats:
                try:
                    content = self._extract_text(file_path)
                    if content.strip():
                        chunks = self._chunk_text(content)
                        for i, chunk in enumerate(chunks):
                            documents.append({
                                'id': f"{file_path.stem}_{i}",
                                'filename': file_path.name,
                                'filepath': str(file_path),
                                'content': chunk,
                                'chunk_index': i,
                                'total_chunks': len(chunks)
                            })
                        print(f"Processed: {file_path.name} -> {len(chunks)} chunks")
                except Exception as e:
                    print(f"Error processing {file_path}: {e}")

        print(f"✅ Loaded {len(documents)} document chunks total")
        return documents

    def _extract_text(self, file_path: Path) -> str:
        """Extract text from different file formats"""
        try:
            if file_path.suffix.lower() == '.txt':
                return file_path.read_text(encoding='utf-8', errors='ignore')

            elif file_path.suffix.lower() == '.pdf':
                text = ""
                with open(file_path, 'rb') as file:
                    reader = PyPDF2.PdfReader(file)
                    for page in reader.pages:
                        text += page.extract_text() + "\n"
                return text

            elif file_path.suffix.lower() == '.docx':
                doc = Document(file_path)
                return '\n'.join([paragraph.text for paragraph in doc.paragraphs])

            elif file_path.suffix.lower() == '.md':
                return file_path.read_text(encoding='utf-8', errors='ignore')

        except Exception as e:
            print(f"Error extracting text from {file_path}: {e}")
            return ""

        return ""

    def _chunk_text(self, text: str, chunk_size: int = 500, overlap: int = 50) -> List[str]:
        """Split text into overlapping chunks"""
        words = text.split()
        chunks = []

        for i in range(0, len(words), chunk_size - overlap):
            chunk = ' '.join(words[i:i + chunk_size])
            if chunk.strip():
                chunks.append(chunk)

            if i + chunk_size >= len(words):
                break

        return chunks if chunks else [text]

# Test the DocumentProcessor
def test_document_processor():
    """Test the document processor"""
    print("Testing DocumentProcessor...")

    # Create test directory and file
    test_dir = "test_documents"
    os.makedirs(test_dir, exist_ok=True)

    # Create sample document
    with open(f"{test_dir}/sample.txt", "w") as f:
        f.write("This is a sample document for testing the RAG system. " * 20)

    # Test processor
    processor = DocumentProcessor()
    docs = processor.load_documents(test_dir)

    print(f"✅ Test completed: {len(docs)} chunks processed")
    return processor

# Run test
if __name__ == "__main__":
    test_processor = test_document_processor()

Testing DocumentProcessor...
✅ DocumentProcessor initialized
Processed: sample.txt -> 1 chunks
✅ Loaded 1 document chunks total
✅ Test completed: 1 chunks processed


In [None]:
# PART 4: VOICE PROCESSING

class VoiceProcessor:
    """Handles speech-to-text and text-to-speech conversion"""

    def __init__(self):
        print("Loading Whisper model...")
        self.whisper_model = whisper.load_model(Config.WHISPER_MODEL)
        self.audio_dir = Config.AUDIO_DIR
        os.makedirs(self.audio_dir, exist_ok=True)
        print("✅ VoiceProcessor initialized")

    def speech_to_text(self, audio_file_path: str) -> Tuple[str, str]:
        """Convert speech to text using Whisper"""
        try:
            print(f"Transcribing audio: {audio_file_path}")
            result = self.whisper_model.transcribe(audio_file_path)
            text = result["text"].strip()
            language = result.get("language", "en")

            print(f"✅ Transcription completed: '{text}' (language: {language})")
            return text, language

        except Exception as e:
            print(f"Error in speech-to-text: {e}")
            return "", "en"

    def speech_to_text_from_bytes(self, audio_data: bytes) -> Tuple[str, str]:
        """Convert speech to text from audio bytes"""
        try:
            # Save audio data to temporary file
            with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file:
                tmp_file.write(audio_data)
                tmp_file_path = tmp_file.name

            # Transcribe
            text, language = self.speech_to_text(tmp_file_path)

            # Clean up
            os.unlink(tmp_file_path)

            return text, language

        except Exception as e:
            print(f"Error in speech-to-text from bytes: {e}")
            return "", "en"

    def text_to_speech(self, text: str, language: str = "en") -> bytes:
        """Convert text to speech using gTTS"""
        try:
            print(f"Generating speech for: '{text[:50]}...' (language: {language})")

            # Map language code for gTTS
            tts_lang_map = {
                'en': 'en', 'es': 'es', 'fr': 'fr', 'de': 'de', 'it': 'it',
                'pt': 'pt', 'ru': 'ru', 'zh': 'zh', 'ja': 'ja', 'ko': 'ko',
                'hi': 'hi', 'ar': 'ar'
            }
            tts_lang = tts_lang_map.get(language, "en")

            # Generate speech
            tts = gTTS(text=text, lang=tts_lang, slow=False)

            # Save to bytes
            audio_buffer = io.BytesIO()
            tts.write_to_fp(audio_buffer)
            audio_buffer.seek(0)

            print("✅ Speech generation completed")
            return audio_buffer.getvalue()

        except Exception as e:
            print(f"Error in text-to-speech: {e}")
            # Fallback to English
            try:
                tts = gTTS(text=text, lang="en", slow=False)
                audio_buffer = io.BytesIO()
                tts.write_to_fp(audio_buffer)
                audio_buffer.seek(0)
                return audio_buffer.getvalue()
            except:
                return b""

    def save_audio_file(self, audio_data: bytes, filename: str) -> str:
        """Save audio data to file"""
        filepath = os.path.join(self.audio_dir, filename)
        with open(filepath, 'wb') as f:
            f.write(audio_data)
        print(f"✅ Audio saved: {filepath}")
        return filepath

# Test the VoiceProcessor
# Test the VoiceProcessor
def test_voice_processor():
    """Test the voice processor"""
    print("Testing VoiceProcessor...")

    # Initialize processor
    processor = VoiceProcessor()

    # Test text-to-speech
    test_text = "Hello, this is a test of the voice processing system."
    audio_data = processor.text_to_speech(test_text, "en")

    if audio_data:
        # Save test audio
        audio_file = processor.save_audio_file(audio_data, "test_output.mp3")
        print(f"✅ Test audio generated: {len(audio_data)} bytes")

        # If in Colab, display audio player
        if COLAB_MODE:
            ipd.display(ipd.Audio(audio_data))

    return processor

# Run test
if __name__ == "__main__":
    test_voice = test_voice_processor()


Testing VoiceProcessor...
Loading Whisper model...


100%|███████████████████████████████████████| 139M/139M [00:01<00:00, 73.0MiB/s]


✅ VoiceProcessor initialized
Generating speech for: 'Hello, this is a test of the voice processing syst...' (language: en)
✅ Speech generation completed
✅ Audio saved: audio/test_output.mp3
✅ Test audio generated: 31296 bytes


In [None]:
# PART 5: VECTOR DATABASE AND RETRIEVAL

class VectorDatabase:
    """Handles document embeddings and similarity search"""

    def __init__(self):
        print("Loading embedding model...")
        self.embedding_model = SentenceTransformer(Config.EMBEDDING_MODEL)
        self.index = None
        self.documents = []
        self.metadata = []
        print("✅ VectorDatabase initialized")

    def build_index(self, documents: List[Dict]):
        """Build FAISS index from documents"""
        self.documents = documents

        if not documents:
            print("⚠️ No documents found to index")
            return False

        print(f"Building index for {len(documents)} document chunks...")

        # Extract text content
        texts = [doc['content'] for doc in documents]

        # Generate embeddings
        print("Generating embeddings...")
        embeddings = self.embedding_model.encode(texts, show_progress_bar=True)

        # Build FAISS index
        self.index = faiss.IndexFlatIP(Config.EMBEDDING_DIM)

        # Normalize embeddings for cosine similarity
        embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)
        self.index.add(embeddings.astype('float32'))

        # Store metadata
        self.metadata = documents

        print(f"✅ Built index with {len(documents)} document chunks")
        return True

    def search(self, query: str, k: int = Config.TOP_K_RETRIEVAL) -> List[Dict]:
        """Search for similar documents"""
        if self.index is None or len(self.documents) == 0:
            print("⚠️ No index available for search")
            return []

        print(f"Searching for: '{query}'")

        # Generate query embedding
        query_embedding = self.embedding_model.encode([query])
        query_embedding = query_embedding / np.linalg.norm(query_embedding, axis=1, keepdims=True)

        # Search
        scores, indices = self.index.search(query_embedding.astype('float32'), k)

        # Return results
        results = []
        for score, idx in zip(scores[0], indices[0]):
            if idx >= 0 and idx < len(self.metadata):  # Valid index check
                result = self.metadata[idx].copy()
                result['similarity_score'] = float(score)
                results.append(result)

        print(f"✅ Found {len(results)} relevant documents")
        for i, result in enumerate(results):
            print(f"  {i+1}. {result['filename']} (score: {result['similarity_score']:.3f})")

        return results

    def save_index(self, filepath: str):
        """Save the FAISS index to file"""
        if self.index is not None:
            faiss.write_index(self.index, filepath)

            # Save metadata
            metadata_path = filepath.replace('.index', '_metadata.json')
            with open(metadata_path, 'w') as f:
                json.dump(self.metadata, f, indent=2)

            print(f"✅ Index saved to {filepath}")

    def load_index(self, filepath: str):
        """Load FAISS index from file"""
        if os.path.exists(filepath):
            self.index = faiss.read_index(filepath)

            # Load metadata
            metadata_path = filepath.replace('.index', '_metadata.json')
            if os.path.exists(metadata_path):
                with open(metadata_path, 'r') as f:
                    self.metadata = json.load(f)
                    self.documents = self.metadata

            print(f"✅ Index loaded from {filepath}")
            return True
        return False

# Test the VectorDatabase
def test_vector_database():
    """Test the vector database"""
    print("Testing VectorDatabase...")

    # Create sample documents
    sample_docs = [
        {
            'id': 'doc1_0',
            'filename': 'accessibility.txt',
            'content': 'Screen readers help visually impaired users navigate computers and websites.',
            'chunk_index': 0
        },
        {
            'id': 'doc1_1',
            'filename': 'accessibility.txt',
            'content': 'Voice assistants can control smart home devices and provide information.',
            'chunk_index': 1
        },
        {
            'id': 'doc2_0',
            'filename': 'technology.txt',
            'content': 'Braille displays provide tactile feedback for reading digital content.',
            'chunk_index': 0
        }
    ]

    # Initialize and build index
    vector_db = VectorDatabase()
    success = vector_db.build_index(sample_docs)

    if success:
        # Test search
        results = vector_db.search("How do screen readers work?", k=2)
        print(f"✅ Search test completed: {len(results)} results")

    return vector_db

# Run test
if __name__ == "__main__":
    test_vector_db = test_vector_database()

Testing VectorDatabase...
Loading embedding model...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md:   0%|          | 0.00/10.5k [00:00<?, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

✅ VectorDatabase initialized
Building index for 3 document chunks...
Generating embeddings...


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

✅ Built index with 3 document chunks
Searching for: 'How do screen readers work?'
✅ Found 2 relevant documents
  1. accessibility.txt (score: 0.693)
  2. technology.txt (score: 0.495)
✅ Search test completed: 2 results


In [None]:
# PART 6: LLM PROCESSING WITH GROQ

class LLMProcessor:
    """Handles LLM interactions using Groq API"""

    def __init__(self, api_key: str):
        if not api_key:
            raise ValueError("Groq API key is required")

        self.client = Groq(api_key="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")
        print("✅ LLMProcessor initialized with Groq API")

    def generate_response(self, query: str, context_docs: List[Dict], language: str = "en") -> str:
        """Generate response using Llama via Groq"""
        try:
            print(f"Generating response for query: '{query[:50]}...'")

            # Prepare context
            context = self._prepare_context(context_docs)

            # Create prompt
            prompt = self._create_prompt(query, context, language)

            print("Calling Groq API...")

            # Generate response
            response = self.client.chat.completions.create(
                model=Config.GROQ_MODEL,
                messages=[
                    {"role": "system", "content": Config.SYSTEM_PROMPT},
                    {"role": "user", "content": prompt}
                ],
                max_tokens=1024,
                temperature=0.7
            )

            result = response.choices[0].message.content.strip()
            print(f"✅ Response generated: {len(result)} characters")
            return result

        except Exception as e:
            error_msg = f"Error generating response: {e}"
            print(f"❌ {error_msg}")
            return f"I apologize, but I encountered an error while processing your request: {str(e)}"

    def _prepare_context(self, docs: List[Dict]) -> str:
        """Prepare context from retrieved documents"""
        if not docs:
            return "No relevant documents found."

        context_parts = []
        for i, doc in enumerate(docs):
            # Truncate content to avoid token limits
            content = doc['content'][:500]
            context_parts.append(f"Document {i+1} ({doc['filename']}):\n{content}...")

        return "\n\n".join(context_parts)

    def _create_prompt(self, query: str, context: str, language: str) -> str:
        """Create prompt for the LLM"""
        lang_name = Config.SUPPORTED_LANGUAGES.get(language, "English")

        prompt = f"""Based on the following context documents, please answer the user's question in {lang_name}.

Context:
{context}

User Question: {query}

Please provide a helpful, accurate, and accessible response based on the context provided. If the context doesn't contain enough information to answer the question, please say so clearly. Keep your response concise and focused on helping visually impaired users."""

        return prompt

    def test_connection(self) -> bool:
        """Test the Groq API connection"""
        try:
            print("Testing Groq API connection...")
            response = self.client.chat.completions.create(
                model=Config.GROQ_MODEL,
                messages=[
                    {"role": "user", "content": "Hello, please respond with 'Connection successful'"}
                ],
                max_tokens=50
            )

            result = response.choices[0].message.content.strip()
            print(f"✅ API connection successful: {result}")
            return True

        except Exception as e:
            print(f"❌ API connection failed: {e}")
            return False

# Test the LLMProcessor
def test_llm_processor():
    """Test the LLM processor"""
    print("Testing LLMProcessor...")

    # You need to provide your Groq API key here
    api_key = input("Enter your Groq API key: ").strip()

    if not api_key:
        print("⚠️ No API key provided, skipping LLM test")
        return None

    try:
        # Initialize processor
        llm = LLMProcessor(api_key)

        # Test connection
        if llm.test_connection():
            # Test response generation
            sample_docs = [
                {
                    'filename': 'accessibility.txt',
                    'content': 'Screen readers are software that help visually impaired users by converting text to speech.',
                    'similarity_score': 0.85
                }
            ]

            response = llm.generate_response(
                "What are screen readers?",
                sample_docs,
                "en"
            )

            print(f"✅ Test response: {response}")
            return llm

    except Exception as e:
        print(f"❌ LLM test failed: {e}")
        return None

# Run test
if __name__ == "__main__":
    test_llm = test_llm_processor()

Testing LLMProcessor...
Enter your Groq API key: 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
✅ LLMProcessor initialized with Groq API
Testing Groq API connection...
✅ API connection successful: Connection successful
Generating response for query: 'What are screen readers?...'
Calling Groq API...
✅ Response generated: 410 characters
✅ Test response: Based on the provided context, I'm happy to help you with your question!

According to Document 1, screen readers are software that help visually impaired users by converting text to speech. This means that screen readers can read out loud the text that appears on a computer screen, allowing visually impaired users to access and understand digital content.

I hope this helps clarify what screen readers are!


In [None]:
# PART 7: MAIN RAG SYSTEM

class MultilingualRAGSystem:
    """Main RAG system coordinating all components"""

    def __init__(self, groq_api_key: str):
        print("Initializing Multilingual RAG System...")

        try:
            self.doc_processor = DocumentProcessor()
            self.voice_processor = VoiceProcessor()
            self.vector_db = VectorDatabase()
            self.llm_processor = LLMProcessor(groq_api_key)
            self.conversation_history = []

            print("✅ RAG System initialized successfully")

        except Exception as e:
            print(f"❌ Error initializing RAG system: {e}")
            raise

    def initialize_system(self, documents_dir: str):
        """Initialize the RAG system with documents"""
        print(f"Initializing system with documents from: {documents_dir}")

        # Load documents
        documents = self.doc_processor.load_documents(documents_dir)

        if documents:
            # Build vector index
            success = self.vector_db.build_index(documents)
            if success:
                print("✅ System initialization completed")
                return True

        print("⚠️ System initialization completed but no documents indexed")
        return False

    def process_voice_query_from_file(self, audio_file_path: str) -> Tuple[str, str, bytes]:
        """Process voice query from audio file and return text response and audio"""
        print(f"Processing voice query from file: {audio_file_path}")

        # Speech to text
        query_text, detected_language = self.voice_processor.speech_to_text(audio_file_path)

        if not query_text:
            return "", "Could not understand the audio input.", b""

        # Process query
        response_text = self.process_text_query(query_text, detected_language)

        # Text to speech
        response_audio = self.voice_processor.text_to_speech(response_text, detected_language)

        return query_text, response_text, response_audio

    def process_voice_query_from_bytes(self, audio_data: bytes) -> Tuple[str, str, bytes]:
        """Process voice query from audio bytes and return text response and audio"""
        print("Processing voice query from audio bytes")

        # Speech to text
        query_text, detected_language = self.voice_processor.speech_to_text_from_bytes(audio_data)

        if not query_text:
            return "", "Could not understand the audio input.", b""

        # Process query
        response_text = self.process_text_query(query_text, detected_language)

        # Text to speech
        response_audio = self.voice_processor.text_to_speech(response_text, detected_language)

        return query_text, response_text, response_audio

    def process_text_query(self, query: str, language: str = "en") -> str:
        """Process text query and return response"""
        print(f"Processing text query: '{query}' (language: {language})")

        # Retrieve relevant documents
        retrieved_docs = self.vector_db.search(query)

        # Generate response
        response = self.llm_processor.generate_response(query, retrieved_docs, language)

        # Store in conversation history
        self.conversation_history.append({
            'timestamp': datetime.now(),
            'query': query,
            'language': language,
            'response': response,
            'retrieved_docs': len(retrieved_docs)
        })

        print(f"✅ Query processed successfully")
        return response

    def get_conversation_history(self) -> List[Dict]:
        """Get conversation history"""
        return self.conversation_history

    def clear_conversation_history(self):
        """Clear conversation history"""
        self.conversation_history = []
        print("✅ Conversation history cleared")

# Create sample documents for testing
# Create sample documents for testing
def create_sample_documents():
    """Create sample documents for demonstration"""
    docs_dir = Config.DOCUMENTS_DIR
    os.makedirs(docs_dir, exist_ok=True)

    # Sample document 1: Accessibility Guide
    with open(f"{docs_dir}/accessibility_guide.txt", "w", encoding="utf-8") as f:
        f.write("""Accessibility Guide for Visually Impaired Users

Introduction
This guide provides essential information about accessibility tools and techniques for visually impaired individuals.

Screen Readers
Screen readers are software applications that convert text and interface elements into speech or Braille output. Popular screen readers include:
- NVDA (NonVisual Desktop Access) - Free and open source
- JAWS (Job Access With Speech) - Commercial screen reader
- VoiceOver - Built into Apple devices
- TalkBack - Built into Android devices

Navigation Techniques
Effective navigation using assistive technology involves:
1. Learning keyboard shortcuts for faster navigation
2. Using heading navigation to jump between sections
3. Utilizing landmarks to understand page structure
4. Taking advantage of skip links to bypass repetitive content

Web Accessibility
When browsing the web, look for sites that follow WCAG guidelines:
- Proper heading structure
- Alternative text for images
- Keyboard-accessible controls
- High contrast color schemes
- Descriptive link text""")

    # Sample document 2: Technology Tips
    with open(f"{docs_dir}/technology_tips.txt", "w", encoding="utf-8") as f:
        f.write("""Technology Tips for Enhanced Accessibility

Voice Assistants
Voice assistants can significantly improve daily productivity:
- Set reminders and alarms
- Control smart home devices
- Get weather and news updates
- Make phone calls and send messages
- Search for information hands-free

Audio Books and Podcasts
Digital audio content provides access to vast libraries:
- Audible and similar services offer extensive catalogs
- Many public libraries provide free audiobook access
- Podcast apps organize content by topics and interests
- Speed control allows for personalized listening preferences

Braille Displays
Modern refreshable Braille displays offer:
- Tactile feedback for digital content
- Portable designs for mobility
- Bluetooth connectivity with devices
- Multiple Braille grade support""")

    print(f"✅ Created sample documents in {docs_dir}")

In [13]:
# MULTILINGUAL DOCUMENT CHATBOT IN COLAB

# Install required packages
!pip install -q openai-whisper sentence-transformers faiss-cpu gtts pydub PyPDF2 python-docx groq

# Import libraries
import os
import io
import numpy as np
from pathlib import Path
from google.colab import files
from IPython.display import Audio, display, clear_output
import whisper
from sentence_transformers import SentenceTransformer
import faiss
from gtts import gTTS
from pydub import AudioSegment
import PyPDF2
from docx import Document
from groq import Groq

# Configuration
class Config:
    GROQ_API_KEY = "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"
    GROQ_MODEL = "llama3-8b-8192"
    WHISPER_MODEL = "base"
    EMBEDDING_MODEL = "all-MiniLM-L6-v2"
    DOCUMENTS_DIR = "/content/documents"
    EMBEDDING_DIM = 384
    TOP_K_RETRIEVAL = 3
    SYSTEM_PROMPT = """You are a helpful AI assistant that answers questions based on provided documents.
    Respond in the same language as the question."""
    SUPPORTED_LANGUAGES = {
        'en': 'English',
        'es': 'Spanish',
        'fr': 'French',
        'de': 'German',
        'it': 'Italian',
        'pt': 'Portuguese'
    }

# Initialize models
def load_models():
    print("Loading models...")
    return {
        'whisper': whisper.load_model(Config.WHISPER_MODEL),
        'embedding': SentenceTransformer(Config.EMBEDDING_MODEL),
        'groq': Groq(api_key=Config.GROQ_API_KEY)
    }

models = load_models()

# Document processing
def process_uploaded_files():
    os.makedirs(Config.DOCUMENTS_DIR, exist_ok=True)
    uploaded = files.upload()
    documents = []

    for filename, content in uploaded.items():
        filepath = os.path.join(Config.DOCUMENTS_DIR, filename)
        with open(filepath, "wb") as f:
            f.write(content)

        try:
            if filename.lower().endswith('.pdf'):
                with open(filepath, "rb") as f:
                    reader = PyPDF2.PdfReader(f)
                    text = "\n".join([page.extract_text() for page in reader.pages])
            elif filename.lower().endswith('.docx'):
                doc = Document(filepath)
                text = "\n".join([para.text for para in doc.paragraphs])
            else:  # Assume text file
                with open(filepath, "r", encoding='utf-8') as f:
                    text = f.read()

            if text.strip():
                documents.append({
                    'filename': filename,
                    'content': text[:2000]  # First 2000 chars
                })
                print(f"Processed: {filename}")
        except Exception as e:
            print(f"Error processing {filename}: {str(e)}")

    return documents

# Build search index
def build_index(documents):
    if not documents:
        return None

    texts = [doc['content'] for doc in documents]
    embeddings = models['embedding'].encode(texts)

    index = faiss.IndexFlatIP(Config.EMBEDDING_DIM)
    embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)
    index.add(embeddings.astype('float32'))

    return index, documents

# Search documents
def search_documents(query, index, docs, k=3):
    query_embedding = models['embedding'].encode([query])
    query_embedding = query_embedding / np.linalg.norm(query_embedding)
    scores, indices = index.search(query_embedding.astype('float32'), k)

    results = []
    for score, idx in zip(scores[0], indices[0]):
        if idx >= 0:
            result = docs[idx].copy()
            result['score'] = float(score)
            results.append(result)

    return results

# Generate response
def generate_response(query, context_docs):
    context = "\n\n".join([f"Document {i+1} ({doc['filename']}):\n{doc['content']}"
                          for i, doc in enumerate(context_docs)])

    try:
        response = models['groq'].chat.completions.create(
            model=Config.GROQ_MODEL,
            messages=[
                {"role": "system", "content": Config.SYSTEM_PROMPT},
                {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}"}
            ],
            max_tokens=1024
        )
        return response.choices[0].message.content
    except Exception as e:
        return f"Error: {str(e)}"

# Text to speech
def speak(text, language='en'):
    tts = gTTS(text=text, lang=language, slow=False)
    audio_buffer = io.BytesIO()
    tts.write_to_fp(audio_buffer)
    audio_buffer.seek(0)

    # Convert to playable format
    sound = AudioSegment.from_file(audio_buffer, format="mp3")
    audio_buffer = io.BytesIO()
    sound.export(audio_buffer, format="wav")

    return Audio(audio_buffer.getvalue(), autoplay=True)

# Voice interface
def voice_chat(index, docs):
    print("\n=== Voice Chat Mode ===")
    print("1. Upload an audio file with your question")
    print("2. I'll transcribe and answer it")

    uploaded = files.upload()
    if not uploaded:
        return

    audio_file = next(iter(uploaded))
    audio_bytes = uploaded[audio_file]

    # Transcribe
    with open("temp_audio.wav", "wb") as f:
        f.write(audio_bytes)

    result = models['whisper'].transcribe("temp_audio.wav")
    query = result["text"]
    language = result.get("language", "en")

    print(f"\nYou asked: {query}")

    # Get answer
    results = search_documents(query, index, docs)
    if results:
        response = generate_response(query, results)
        print(f"\nAssistant: {response}")

        # Speak response
        display(speak(response, language))
    else:
        print("No relevant documents found")

# Text interface
def text_chat(index, docs):
    print("\n=== Text Chat Mode ===")
    query = input("Type your question (or 'quit' to exit): ")

    if query.lower() in ['quit', 'exit']:
        return False

    results = search_documents(query, index, docs)
    if results:
        response = generate_response(query, results)
        print(f"\nAssistant: {response}")

        # Detect language from query (simple approach)
        lang = 'en'
        if any(word in query.lower() for word in ['hola', 'cómo']):
            lang = 'es'
        elif any(word in query.lower() for word in ['bonjour', 'comment']):
            lang = 'fr'

        display(speak(response, lang))
    else:
        print("No relevant documents found")

    return True

# Main function
def main():
    print("=== Document Chatbot ===")

    # Step 1: Upload documents
    print("\nStep 1: Upload your documents (PDF, DOCX, TXT)")
    input("Press Enter when ready to upload files...")
    documents = process_uploaded_files()

    if not documents:
        print("No valid documents found. Please try again.")
        return

    # Step 2: Build search index
    print("\nBuilding search index...")
    index, docs = build_index(documents)
    print(f"Ready! Indexed {len(docs)} documents")

    # Step 3: Chat interface
    while True:
        print("\nChoose an option:")
        print("1. Voice chat (upload audio question)")
        print("2. Text chat")
        print("3. Exit")

        choice = input("Your choice (1-3): ")

        if choice == '1':
            clear_output()
            voice_chat(index, docs)
        elif choice == '2':
            clear_output()
            if not text_chat(index, docs):
                break
        elif choice == '3':
            print("Goodbye!")
            break
        else:
            print("Invalid choice")

if __name__ == "__main__":
    main()


=== Text Chat Mode ===


KeyboardInterrupt: Interrupted by user

In [None]:
# MULTILINGUAL REAL-TIME VOICE CHATBOT IN COLAB (USER-CONTROLLED RECORDING)

# Install required packages
!pip install -q openai-whisper sentence-transformers faiss-cpu gtts pydub PyPDF2 python-docx groq

# Import libraries
import os
import io
import numpy as np
from pathlib import Path
from google.colab import files
from IPython.display import Audio, display, clear_output, Javascript
import whisper
from sentence_transformers import SentenceTransformer
import faiss
from gtts import gTTS
from pydub import AudioSegment
import PyPDF2
from docx import Document
from groq import Groq
import base64

# Configuration
class Config:
    GROQ_API_KEY = "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"
    GROQ_MODEL = "llama3-8b-8192"
    WHISPER_MODEL = "base"
    EMBEDDING_MODEL = "all-MiniLM-L6-v2"
    DOCUMENTS_DIR = "/content/documents"
    EMBEDDING_DIM = 384
    TOP_K_RETRIEVAL = 3
    SYSTEM_PROMPT = """You are a helpful AI assistant that answers questions based on provided documents.
    Respond in the same language as the question. If the question is in a non-English language,
    you may include English translations or explanations when helpful."""
    SUPPORTED_LANGUAGES = {
        'en': 'English',
        'es': 'Spanish',
        'fr': 'French',
        'de': 'German',
        'it': 'Italian',
        'pt': 'Portuguese'
    }

# Initialize models
def load_models():
    print("Loading models...")
    return {
        'whisper': whisper.load_model(Config.WHISPER_MODEL),
        'embedding': SentenceTransformer(Config.EMBEDDING_MODEL),
        'groq': Groq(api_key=Config.GROQ_API_KEY)
    }

models = load_models()

# JavaScript for user-controlled recording
RECORD_JS = """
window.recorder = null;
window.chunks = [];

function startRecording() {
    return new Promise(resolve => {
        navigator.mediaDevices.getUserMedia({ audio: true })
            .then(stream => {
                window.recorder = new MediaRecorder(stream);
                window.chunks = [];
                window.recorder.ondataavailable = e => window.chunks.push(e.data);
                window.recorder.start();
                resolve("Recording started");
            });
    });
}

function stopRecording() {
    return new Promise(resolve => {
        const reader = new FileReader();
        reader.onloadend = () => resolve(reader.result);

        window.recorder.onstop = () => {
            const blob = new Blob(window.chunks);
            reader.readAsDataURL(blob);
        };
        window.recorder.stop();
    });
}
"""

def record_audio():
    display(Javascript(RECORD_JS))
    print("\nPress Enter to start recording...")
    input()

    # Start recording
    eval_js("startRecording()")
    print("Recording... Press Enter to stop")

    # Wait for user to stop
    input()

    # Stop recording and get audio
    js_result = eval_js("stopRecording()")
    audio_data = base64.b64decode(js_result.split(',')[1])

    # Save to file
    with open("recorded_audio.wav", "wb") as f:
        f.write(audio_data)
    return "recorded_audio.wav"

def eval_js(js_code):
    from google.colab import output
    return output.eval_js(js_code)

# Document processing
def process_uploaded_files():
    os.makedirs(Config.DOCUMENTS_DIR, exist_ok=True)
    uploaded = files.upload()
    documents = []

    for filename, content in uploaded.items():
        filepath = os.path.join(Config.DOCUMENTS_DIR, filename)
        with open(filepath, "wb") as f:
            f.write(content)

        try:
            if filename.lower().endswith('.pdf'):
                with open(filepath, "rb") as f:
                    reader = PyPDF2.PdfReader(f)
                    text = "\n".join([page.extract_text() for page in reader.pages])
            elif filename.lower().endswith('.docx'):
                doc = Document(filepath)
                text = "\n".join([para.text for para in doc.paragraphs])
            else:  # Assume text file
                with open(filepath, "r", encoding='utf-8') as f:
                    text = f.read()

            if text.strip():
                documents.append({
                    'filename': filename,
                    'content': text[:2000]  # First 2000 chars
                })
                print(f"Processed: {filename}")
        except Exception as e:
            print(f"Error processing {filename}: {str(e)}")

    return documents

# Build search index
def build_index(documents):
    if not documents:
        return None

    texts = [doc['content'] for doc in documents]
    embeddings = models['embedding'].encode(texts)

    index = faiss.IndexFlatIP(Config.EMBEDDING_DIM)
    embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)
    index.add(embeddings.astype('float32'))

    return index, documents

# Search documents
def search_documents(query, index, docs, k=3):
    query_embedding = models['embedding'].encode([query])
    query_embedding = query_embedding / np.linalg.norm(query_embedding)
    scores, indices = index.search(query_embedding.astype('float32'), k)

    results = []
    for score, idx in zip(scores[0], indices[0]):
        if idx >= 0:
            result = docs[idx].copy()
            result['score'] = float(score)
            results.append(result)

    return results

# Generate response with improved multilingual support
def generate_response(query, context_docs, detected_lang):
    context = "\n\n".join([f"Document {i+1} ({doc['filename']}):\n{doc['content']}"
                          for i, doc in enumerate(context_docs)])

    # Enhance system prompt with language information
    system_prompt = f"{Config.SYSTEM_PROMPT}\n\nDetected language: {detected_lang}"

    try:
        response = models['groq'].chat.completions.create(
            model=Config.GROQ_MODEL,
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}"}
            ],
            max_tokens=1024
        )
        return response.choices[0].message.content
    except Exception as e:
        return f"Error: {str(e)}"

# Text to speech with language auto-detection
def speak(text, language='en'):
    try:
        tts = gTTS(text=text, lang=language, slow=False)
        audio_buffer = io.BytesIO()
        tts.write_to_fp(audio_buffer)
        audio_buffer.seek(0)

        # Convert to playable format
        sound = AudioSegment.from_file(audio_buffer, format="mp3")
        audio_buffer = io.BytesIO()
        sound.export(audio_buffer, format="wav")

        return Audio(audio_buffer.getvalue(), autoplay=True)
    except Exception as e:
        print(f"Error in TTS: {e}")
        return None

# Voice chat with user-controlled recording
def voice_chat(index, docs):
    print("\n=== Real-time Voice Chat ===")
    print("Instructions:")
    print("1. Press Enter to start recording")
    print("2. Speak your question")
    print("3. Press Enter again to stop recording")
    print("4. Say 'quit' or 'exit' to end voice chat")

    while True:
        audio_file = record_audio()

        # Transcribe
        result = models['whisper'].transcribe(audio_file)
        query = result["text"]
        language = result.get("language", "en")

        print(f"\nDetected language: {language}")
        print(f"You asked: {query}")

        if query.strip().lower() in ['quit', 'exit', 'stop']:
            print("Exiting voice chat...")
            break

        # Get answer
        results = search_documents(query, index, docs)
        if results:
            response = generate_response(query, results, language)
            print(f"\nAssistant: {response}")

            # Speak response in the detected language
            audio_output = speak(response, language)
            if audio_output:
                display(audio_output)
            else:
                print("Could not generate audio response")
        else:
            print("No relevant documents found")

# Text chat interface
def text_chat(index, docs):
    print("\n=== Text Chat Mode ===")
    query = input("Type your question (or 'quit' to exit): ")

    if query.lower() in ['quit', 'exit']:
        return False

    # Simple language detection from query
    lang = 'en'
    if any(word in query.lower() for word in ['hola', 'cómo', 'qué']):
        lang = 'es'
    elif any(word in query.lower() for word in ['bonjour', 'comment', 'ça va']):
        lang = 'fr'
    elif any(word in query.lower() for word in ['hallo', 'wie', 'geht']):
        lang = 'de'
    elif any(word in query.lower() for word in ['ciao', 'come', 'stai']):
        lang = 'it'
    elif any(word in query.lower() for word in ['olá', 'como', 'você']):
        lang = 'pt'

    results = search_documents(query, index, docs)
    if results:
        response = generate_response(query, results, lang)
        print(f"\nAssistant: {response}")
        display(speak(response, lang))
    else:
        print("No relevant documents found")

    return True

# Main function
def main():
    print("=== Multilingual Document Chatbot ===")
    print("Now with user-controlled voice chat!")

    # Step 1: Upload documents
    print("\nStep 1: Upload your documents (PDF, DOCX, TXT)")
    input("Press Enter when ready to upload files...")
    documents = process_uploaded_files()

    if not documents:
        print("No valid documents found. Please try again.")
        return

    # Step 2: Build search index
    print("\nBuilding search index...")
    index, docs = build_index(documents)
    print(f"Ready! Indexed {len(docs)} documents")

    # Step 3: Chat interface
    while True:
        print("\nChoose an option:")
        print("1. Voice chat (press to start/stop recording)")
        print("2. Text chat")
        print("3. Exit")

        choice = input("Your choice (1-3): ")

        if choice == '1':
            clear_output()
            voice_chat(index, docs)
        elif choice == '2':
            clear_output()
            if not text_chat(index, docs):
                break
        elif choice == '3':
            print("Goodbye!")
            break
        else:
            print("Invalid choice")

if __name__ == "__main__":
    main()


=== Real-time Voice Chat ===
Instructions:
1. Press Enter to start recording
2. Speak your question
3. Press Enter again to stop recording
4. Say 'quit' or 'exit' to end voice chat


<IPython.core.display.Javascript object>


Press Enter to start recording...

Recording... Press Enter to stop






Detected language: en
You asked:  What PLA does on digit status?

Assistant: According to the document, PLA (Perceptron Learning Algorithm) does not work well on digit data that is not linearly separable, meaning that the data cannot be separated using a linear boundary. In this case, PLA will not converge and may mess up other points while it is trying to correct one specific point. As a result, the algorithm will be forced to terminate after a certain number of iterations (in this case, 1000).


<IPython.core.display.Javascript object>


Press Enter to start recording...

Recording... Press Enter to stop






Detected language: hi
You asked:  Please explain me this in Hindi. Can you explain me this in Hindi? This is the question.

Assistant: Here's an explanation of the document in Hindi:

प Apost Office से एक आंकड़ा सेट है, जिसमें लोगों ने पिनकोड के रूप में 16x16 छाया स्तर नंबर लिखे हैं। हम चाहते हैं कि माशीन लर्निंग पद्वति इस सेट के साथ क्या कर सकता है।

द्वारा दिली Chowdhury के साथ प्रेपेर्ड Linear Model I, हम आज पहचानने के लिए प्रयास करेंगे कि पिनकोड में कौन सा नंबर है।

हमने देखा कि पेर्सप्टरोन मॉडल का प्रयोग करना है, लेकिन इसमें 257 प्रामाणिक उपाय हैं, जिससे ढेर सारे पैरमीटर हैं। इसके बजाय, हम intensity और सिमेट्री जैसे फीचर्स को निकालते हैं, जिन्हें सीखिंग एल्गोरिथ्म के लिए देते हैं और एल्गोरिथ्म से पैटर्न की खोज करते हैं।

अब मॉडल में 3 प्रामाणिक उपाय हैं - w0, w1, w2। PLA (पेर्सप्टरोन Algorithms) काम नहीं करता है जब डेटा अनलाइनरी सेपरेबल नहीं है, जिसके कारण इसमें शेष 1000 इटेरेशन में सीमित है。

इसके बजाय, हम पोकेट एल्गोरिथ्म (Pocket Algorithm) का प्रयोग करते हैं, जिसमें हम प्रत्ये

<IPython.core.display.Javascript object>


Press Enter to start recording...
