In [7]:
!pip install PyPDF2 python-docx sentence-transformers scikit-learn flask flask-cors

Collecting flask-cors
  Downloading Flask_Cors-5.0.0-py2.py3-none-any.whl.metadata (5.5 kB)
Downloading Flask_Cors-5.0.0-py2.py3-none-any.whl (14 kB)
Installing collected packages: flask-cors
Successfully installed flask-cors-5.0.0


In [8]:
from google.colab import files
import os

# Create upload directory
upload_directory = "./uploaded_files"
os.makedirs(upload_directory, exist_ok=True)

# Upload files
uploaded = files.upload()

# Save uploaded files
for filename in uploaded.keys():
    file_path = os.path.join(upload_directory, filename)
    with open(file_path, 'wb') as f:
        f.write(uploaded[filename])
    print(f"Uploaded file: {filename}")

Saving CSE DEPARTMENT INFO.txt to CSE DEPARTMENT INFO.txt
Uploaded file: CSE DEPARTMENT INFO.txt


In [13]:
import os
import json
import numpy as np
from typing import List, Dict
import PyPDF2
import docx
import logging
import traceback
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import time

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class DocumentProcessor:
    """Handles document loading and text extraction from various file formats."""

    @staticmethod
    def read_text_file(file_path: str) -> str:
        """Read content from text files."""
        try:
            with open(file_path, 'r', encoding='utf-8') as file:
                return file.read()
        except UnicodeDecodeError:
            # Try with a different encoding if utf-8 fails
            with open(file_path, 'r', encoding='latin-1') as file:
                return file.read()

    @staticmethod
    def read_pdf_file(file_path: str) -> str:
        """Read content from PDF files."""
        text = ""
        try:
            with open(file_path, 'rb') as file:
                pdf_reader = PyPDF2.PdfReader(file)
                for page in pdf_reader.pages:
                    text += page.extract_text() + "\n"
            return text
        except Exception as e:
            logger.error(f"Error reading PDF {file_path}: {str(e)}")
            return ""

    @staticmethod
    def read_word_file(file_path: str) -> str:
        """Read content from Word documents."""
        try:
            doc = docx.Document(file_path)
            return "\n".join([paragraph.text for paragraph in doc.paragraphs])
        except Exception as e:
            logger.error(f"Error reading Word document {file_path}: {str(e)}")
            return ""

    @staticmethod
    def process_directory(directory_path: str) -> List[Dict]:
        """Process all supported documents in a directory."""
        processed_docs = []
        supported_extensions = {'.txt', '.pdf', '.docx'}

        try:
            for root, _, files in os.walk(directory_path):
                for file in files:
                    file_path = os.path.join(root, file)
                    extension = os.path.splitext(file)[1].lower()

                    if extension not in supported_extensions:
                        continue

                    try:
                        print(f"Processing file: {file_path}")
                        if extension == '.txt':
                            content = DocumentProcessor.read_text_file(file_path)
                        elif extension == '.pdf':
                            content = DocumentProcessor.read_pdf_file(file_path)
                        elif extension == '.docx':
                            content = DocumentProcessor.read_word_file(file_path)
                        else:
                            continue

                        if not content.strip():
                            print(f"Warning: No content extracted from {file_path}")
                            continue

                        chunks = DocumentProcessor.chunk_text(content)
                        print(f"Created {len(chunks)} chunks from {file_path}")

                        for chunk in chunks:
                            processed_docs.append({
                                "content": chunk,
                                "metadata": {
                                    "source": file_path,
                                    "type": extension[1:],
                                    "chunk_size": len(chunk)
                                }
                            })

                    except Exception as e:
                        logger.error(f"Error processing {file_path}: {str(e)}")
                        print(traceback.format_exc())

            return processed_docs

        except Exception as e:
            logger.error(f"Error walking directory {directory_path}: {str(e)}")
            print(traceback.format_exc())
            return []

    @staticmethod
    def chunk_text(text: str, chunk_size: int = 1000, overlap: int = 100) -> List[str]:
        """Split text into overlapping chunks."""
        chunks = []
        start = 0
        text_length = len(text)

        while start < text_length:
            end = start + chunk_size

            if end < text_length:
                # Find the last space before chunk_size
                while end > start and text[end] != ' ':
                    end -= 1

            chunk = text[start:end].strip()
            if chunk:  # Only add non-empty chunks
                chunks.append(chunk)

            start = end - overlap

        return chunks

class CSEChatbot:
    def __init__(self, model_name: str = 'all-MiniLM-L6-v2'):
        """Initialize the chatbot with necessary components."""
        try:
            print("Initializing SentenceTransformer...")
            self.encoder = SentenceTransformer(model_name)
            self.documents = []
            self.embeddings = None
            print("Initialization successful")
        except Exception as e:
            print(f"Error initializing chatbot: {str(e)}")
            print(traceback.format_exc())
            raise

    def load_documents(self, directory_path: str):
        """Load and process documents from the specified directory."""
        try:
            logger.info(f"Processing documents from {directory_path}")
            print(f"Processing documents from {directory_path}")

            # Process documents
            docs = DocumentProcessor.process_directory(directory_path)
            self.documents = docs

            print(f"Number of documents processed: {len(self.documents)}")
            if len(self.documents) > 0:
                print(f"Sample document content: {self.documents[0]['content'][:200]}...")

            if not self.documents:
                print("Warning: No documents were processed.")
                return

            # Generate embeddings
            print("Generating embeddings...")
            texts = [doc["content"] for doc in self.documents]
            self.embeddings = self.encoder.encode(texts, show_progress_bar=True)

            print(f"Generated embeddings shape: {self.embeddings.shape}")

        except Exception as e:
            print(f"Error loading documents: {str(e)}")
            print(traceback.format_exc())
            raise

    def save_knowledge_base(self, file_path: str):
        """Save the processed documents and embeddings."""
        try:
            data = {
                "documents": self.documents,
                "embeddings": self.embeddings.tolist() if self.embeddings is not None else None
            }

            with open(file_path, 'w', encoding='utf-8') as f:
                json.dump(data, f)
            print(f"Knowledge base saved to {file_path}")

        except Exception as e:
            print(f"Error saving knowledge base: {str(e)}")
            print(traceback.format_exc())

    def load_knowledge_base(self, file_path: str):
        """Load previously processed documents and embeddings."""
        try:
            print(f"Loading knowledge base from {file_path}")
            with open(file_path, 'r', encoding='utf-8') as f:
                data = json.load(f)

            if "documents" in data and "embeddings" in data:
                self.documents = data["documents"]
                self.embeddings = np.array(data["embeddings"]) if data["embeddings"] else None
                print(f"Loaded {len(self.documents)} documents and embeddings shape: {self.embeddings.shape if self.embeddings is not None else None}")
            else:
                raise ValueError("JSON file does not contain required keys 'documents' and 'embeddings'")

        except Exception as e:
            print(f"Error loading knowledge base: {str(e)}")
            print(traceback.format_exc())
            raise

    def get_response(self, query: str, top_k: int = 3) -> Dict:
        """Process query and return response with relevant context."""
        try:
            start_time = time.time()

            print(f"Processing query: {query}")

            if not query:
                return {
                    "query": query,
                    "response": "Empty query received",
                    "relevant_documents": [],
                    "processing_time": 0
                }

            # Check if embeddings exist
            if self.embeddings is None:
                print("No embeddings found")
                return {
                    "query": query,
                    "response": "System not initialized properly. No embeddings found.",
                    "relevant_documents": [],
                    "processing_time": time.time() - start_time
                }

            # Generate query embedding
            print("Generating query embedding...")
            query_embedding = self.encoder.encode(query)
            print("Query embedding generated successfully")

            # Calculate similarities
            print("Calculating similarities...")
            similarities = cosine_similarity([query_embedding], self.embeddings)[0]
            print(f"Max similarity score: {similarities.max()}")

            if similarities.max() < 0.2:
                return {
                    "query": query,
                    "response": "No relevant information found",
                    "relevant_documents": [],
                    "processing_time": time.time() - start_time
                }

            # Get top-k most similar documents
            top_indices = np.argsort(similarities)[-top_k:][::-1]

            relevant_docs = []
            for idx in top_indices:
                doc = self.documents[idx]
                relevant_docs.append({
                    "content": doc["content"],
                    "metadata": doc["metadata"],
                    "similarity": float(similarities[idx])
                })

            response = self._generate_simple_response(query, relevant_docs)

            processing_time = time.time() - start_time

            return {
                "query": query,
                "response": response,
                "relevant_documents": relevant_docs,
                "processing_time": processing_time
            }

        except Exception as e:
            print(f"Error in get_response: {str(e)}")
            print(traceback.format_exc())
            return {
                "query": query,
                "response": f"Error processing query: {str(e)}",
                "relevant_documents": [],
                "processing_time": 0
            }

    def _generate_simple_response(self, query: str, relevant_docs: List[Dict]) -> str:
        """Generate a simple response based on the most relevant document."""
        if not relevant_docs:
            return "No relevant information found"

        return relevant_docs[0]["content"]

In [14]:
try:
    # Initialize chatbot
    print("Initializing chatbot...")
    chatbot = CSEChatbot()
    print("Chatbot initialized successfully")

    # Check for existing knowledge base
    kb_path = "knowledge_base.json"
    if os.path.exists(kb_path):
        print("Loading existing knowledge base...")
        chatbot.load_knowledge_base(kb_path)
        print("Knowledge base loaded successfully")
        print(f"Number of documents loaded: {len(chatbot.documents)}")
        print(f"Embeddings shape: {chatbot.embeddings.shape if chatbot.embeddings is not None else None}")
    else:
        print("Processing documents...")
        chatbot.load_documents(upload_directory)
        print(f"Documents processed. Number of documents: {len(chatbot.documents)}")
        chatbot.save_knowledge_base(kb_path)
        print("Knowledge base saved successfully")
except Exception as e:
    print(f"Error during initialization: {str(e)}")
    print(traceback.format_exc())

Initializing chatbot...
Initializing SentenceTransformer...
Initialization successful
Chatbot initialized successfully
Loading existing knowledge base...
Loading knowledge base from knowledge_base.json
Loaded 13 documents and embeddings shape: (13, 384)
Knowledge base loaded successfully
Number of documents loaded: 13
Embeddings shape: (13, 384)


In [15]:
# Install required packages
!pip install flask flask-cors

from flask import Flask, request, jsonify, render_template_string
from flask_cors import CORS
import threading
from google.colab import output
import IPython
import traceback  # Add this import

app = Flask(__name__)
CORS(app)

# HTML template for the frontend
HTML_TEMPLATE = """
<!DOCTYPE html>
<html>
<head>
    <title>CSE Department Chatbot</title>
    <style>
        body {
            font-family: Arial, sans-serif;
            max-width: 800px;
            margin: 0 auto;
            padding: 20px;
            background-color: #f5f5f5;
        }
        .chat-container {
            background-color: white;
            border-radius: 10px;
            padding: 20px;
            box-shadow: 0 2px 5px rgba(0,0,0,0.1);
        }
        .input-container {
            display: flex;
            gap: 10px;
            margin-top: 20px;
        }
        #questionInput {
            flex-grow: 1;
            padding: 10px;
            border: 1px solid #ddd;
            border-radius: 5px;
        }
        button {
            padding: 10px 20px;
            background-color: #007bff;
            color: white;
            border: none;
            border-radius: 5px;
            cursor: pointer;
        }
        button:hover {
            background-color: #0056b3;
        }
        #response {
            margin-top: 20px;
            white-space: pre-wrap;
        }
        .source-document {
            margin-top: 10px;
            padding: 10px;
            background-color: #f8f9fa;
            border-radius: 5px;
        }
        .loading {
            display: none;
            margin-top: 20px;
            color: #666;
        }
    </style>
</head>
<body>
    <div class="chat-container">
        <h1>CSE Department Chatbot</h1>
        <div class="input-container">
            <input type="text" id="questionInput" placeholder="Ask your question...">
            <button onclick="askQuestion()">Ask</button>
        </div>
        <div id="loading" class="loading">Processing your question...</div>
        <div id="response"></div>
    </div>

    <script>
        async function askQuestion() {
            const questionInput = document.getElementById('questionInput');
            const responseDiv = document.getElementById('response');
            const loadingDiv = document.getElementById('loading');
            const question = questionInput.value.trim();

            if (!question) return;

            // Show loading message
            loadingDiv.style.display = 'block';
            responseDiv.innerHTML = '';

            try {
                const response = await fetch('/ask', {
                    method: 'POST',
                    headers: {
                        'Content-Type': 'application/json',
                    },
                    body: JSON.stringify({ question: question })
                });

                const data = await response.json();

                // Hide loading message
                loadingDiv.style.display = 'none';

                // Display response
                let responseHtml = `<h3>Response:</h3><p>${data.response}</p>`;

                if (data.relevant_documents && data.relevant_documents.length > 0) {
                    responseHtml += '<h3>Source Documents:</h3>';
                    data.relevant_documents.forEach((doc, index) => {
                        responseHtml += `
                            <div class="source-document">
                                <p><strong>Source ${index + 1}</strong> (Similarity: ${doc.similarity.toFixed(2)})</p>
                                <p>File: ${doc.metadata.source}</p>
                            </div>
                        `;
                    });
                }

                responseHtml += `<p><em>Processing time: ${data.processing_time.toFixed(2)} seconds</em></p>`;
                responseDiv.innerHTML = responseHtml;

            } catch (error) {
                loadingDiv.style.display = 'none';
                responseDiv.innerHTML = '<p style="color: red;">Error processing your question. Please try again.</p>';
                console.error('Error:', error);
            }
        }

        // Allow Enter key to submit question
        document.getElementById('questionInput').addEventListener('keypress', function(e) {
            if (e.key === 'Enter') {
                askQuestion();
            }
        });
    </script>
</body>
</html>
"""

@app.route('/')
def home():
    return render_template_string(HTML_TEMPLATE)

@app.route('/ask', methods=['POST'])
def ask():
    try:
        data = request.json
        question = data.get('question', '')

        if not question:
            return jsonify({'error': 'Question is empty'}), 400

        # Add debug print
        print(f"Received question: {question}")

        result = chatbot.get_response(question)

        # Add debug print
        print(f"Generated response: {result}")

        return jsonify(result)

    except Exception as e:
        # Print the full error traceback
        print("Error occurred:")
        print(traceback.format_exc())
        return jsonify({
            'error': str(e),
            'traceback': traceback.format_exc()
        }), 500

def run_flask():
    app.run(port=8000, debug=True)  # Add debug=True

# Start Flask server
flask_thread = threading.Thread(target=run_flask)
flask_thread.daemon = True
flask_thread.start()

# Display the URL where the app can be accessed
output.serve_kernel_port_as_window(8000)
print("Chatbot web interface is running! Click the URL above to open it in a new window.")



<IPython.core.display.Javascript object>

Try `serve_kernel_port_as_iframe` instead. [0m
Chatbot web interface is running! Click the URL above to open it in a new window.
 * Serving Flask app '__main__'
 * Debug mode: on


Address already in use
Port 8000 is in use by another program. Either identify and stop that program, or start the server with a different port.
