In [1]:
"""
RAG Application with Groq API for Google Colab
Supports PDF, Excel, and Text files
"""

# First, install required packages (run this cell first in Colab)
"""
!pip install -q groq gradio langchain langchain-community langchain-text-splitters \
    sentence-transformers chromadb pypdf openpyxl pandas
"""

import os
import gradio as gr
from groq import Groq
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain.schema import Document
import pandas as pd
import pypdf
from typing import List, Tuple
import tempfile
import shutil

class RAGApplication:
    def __init__(self):
        self.client = None
        self.vectorstore = None
        self.embeddings = None
        self.api_key_verified = False
        self.temp_dir = tempfile.mkdtemp()

    def verify_api_key(self, api_key: str) -> Tuple[str, bool]:
        """Verify the Groq API key"""
        try:
            self.client = Groq(api_key=api_key)
            # Test the API key with a simple request
            response = self.client.chat.completions.create(
                messages=[{"role": "user", "content": "Hi"}],
                model="llama-3.3-70b-versatile",
                max_tokens=10
            )
            self.api_key_verified = True
            return "‚úÖ API Key verified successfully!", True
        except Exception as e:
            self.api_key_verified = False
            return f"‚ùå API Key verification failed: {str(e)}", False

    def extract_text_from_pdf(self, file_path: str) -> str:
        """Extract text from PDF file"""
        text = ""
        with open(file_path, 'rb') as file:
            pdf_reader = pypdf.PdfReader(file)
            for page in pdf_reader.pages:
                text += page.extract_text() + "\n"
        return text

    def extract_text_from_excel(self, file_path: str) -> str:
        """Extract text from Excel file"""
        df = pd.read_excel(file_path)
        return df.to_string()

    def extract_text_from_txt(self, file_path: str) -> str:
        """Extract text from text file"""
        with open(file_path, 'r', encoding='utf-8') as file:
            return file.read()

    def process_documents(self, files) -> str:
        """Process uploaded documents and create vector store"""
        if not self.api_key_verified:
            return "‚ùå Please verify your API key first!"

        if not files:
            return "‚ùå Please upload at least one document!"

        try:
            # Extract text from all uploaded files
            documents = []
            for file in files:
                file_path = file.name
                file_ext = os.path.splitext(file_path)[1].lower()

                if file_ext == '.pdf':
                    text = self.extract_text_from_pdf(file_path)
                elif file_ext in ['.xlsx', '.xls']:
                    text = self.extract_text_from_excel(file_path)
                elif file_ext == '.txt':
                    text = self.extract_text_from_txt(file_path)
                else:
                    continue

                documents.append(Document(
                    page_content=text,
                    metadata={"source": os.path.basename(file_path)}
                ))

            if not documents:
                return "‚ùå No supported documents found! Please upload PDF, Excel, or TXT files."

            # Split documents into chunks
            text_splitter = RecursiveCharacterTextSplitter(
                chunk_size=1000,
                chunk_overlap=200,
                length_function=len
            )
            chunks = text_splitter.split_documents(documents)

            # Create embeddings
            self.embeddings = HuggingFaceEmbeddings(
                model_name="sentence-transformers/all-MiniLM-L6-v2"
            )

            # Create vector store
            self.vectorstore = Chroma.from_documents(
                documents=chunks,
                embedding=self.embeddings,
                persist_directory=os.path.join(self.temp_dir, "chroma_db")
            )

            return f"‚úÖ Successfully processed {len(files)} document(s) with {len(chunks)} chunks!"

        except Exception as e:
            return f"‚ùå Error processing documents: {str(e)}"

    def query_documents(self, question: str, chat_history: List) -> Tuple[str, List]:
        """Query the documents using RAG"""
        if not self.api_key_verified:
            return "‚ùå Please verify your API key first!", chat_history

        if not self.vectorstore:
            return "‚ùå Please upload and process documents first!", chat_history

        if not question.strip():
            return "‚ùå Please enter a question!", chat_history

        try:
            # Retrieve relevant documents
            relevant_docs = self.vectorstore.similarity_search(question, k=3)

            # Create context from retrieved documents
            context = "\n\n".join([doc.page_content for doc in relevant_docs])

            # Create prompt
            prompt = f"""Based on the following context, answer the question. If the answer cannot be found in the context, say "I cannot find this information in the provided documents."

Context:
{context}

Question: {question}

Answer:"""

            # Get response from Groq
            response = self.client.chat.completions.create(
                messages=[{"role": "user", "content": prompt}],
                model="llama-3.3-70b-versatile",
                max_tokens=1024,
                temperature=0.3
            )

            answer = response.choices[0].message.content

            # Update chat history
            chat_history.append((question, answer))

            return "", chat_history

        except Exception as e:
            return f"‚ùå Error: {str(e)}", chat_history

    def clear_chat(self) -> Tuple[str, List]:
        """Clear chat history"""
        return "", []

# Initialize the RAG application
rag_app = RAGApplication()

# Create Gradio interface
with gr.Blocks(title="RAG Application with Groq") as demo:
    gr.Markdown("# üìö RAG Application with Groq API")
    gr.Markdown("Upload documents (PDF, Excel, TXT) and ask questions based on their content.")

    with gr.Row():
        with gr.Column(scale=1):
            gr.Markdown("### üîë Step 1: API Key")
            api_key_input = gr.Textbox(
                label="Groq API Key",
                placeholder="Enter your Groq API key",
                type="password"
            )
            verify_btn = gr.Button("Verify API Key", variant="primary")
            api_status = gr.Textbox(label="Status", interactive=False)

            gr.Markdown("### üìÑ Step 2: Upload Documents")
            file_upload = gr.File(
                label="Upload Documents",
                file_count="multiple",
                file_types=[".pdf", ".txt", ".xlsx", ".xls"]
            )
            process_btn = gr.Button("Process Documents", variant="primary")
            process_status = gr.Textbox(label="Processing Status", interactive=False)

        with gr.Column(scale=2):
            gr.Markdown("### üí¨ Step 3: Ask Questions")
            chatbot = gr.Chatbot(label="Chat History", height=400)
            question_input = gr.Textbox(
                label="Your Question",
                placeholder="Ask a question about your documents...",
                lines=2
            )
            with gr.Row():
                submit_btn = gr.Button("Submit", variant="primary")
                clear_btn = gr.Button("Clear Chat")
            query_status = gr.Textbox(label="Status", interactive=False)

    # Event handlers
    verify_btn.click(
        fn=rag_app.verify_api_key,
        inputs=[api_key_input],
        outputs=[api_status, gr.State()]
    )

    process_btn.click(
        fn=rag_app.process_documents,
        inputs=[file_upload],
        outputs=[process_status]
    )

    submit_btn.click(
        fn=rag_app.query_documents,
        inputs=[question_input, chatbot],
        outputs=[query_status, chatbot]
    ).then(
        lambda: "",
        outputs=[question_input]
    )

    question_input.submit(
        fn=rag_app.query_documents,
        inputs=[question_input, chatbot],
        outputs=[query_status, chatbot]
    ).then(
        lambda: "",
        outputs=[question_input]
    )

    clear_btn.click(
        fn=rag_app.clear_chat,
        outputs=[question_input, chatbot]
    )

# Launch the app
demo.launch(debug=True, share=True)

ModuleNotFoundError: No module named 'groq'

In [None]:
"""
RAG Application with Groq API for Google Colab
Supports PDF, Excel, and Text files
"""

# Install required packages
import subprocess
import sys

def install_packages():
    packages = [
        'groq',
        'gradio',
        'langchain',
        'langchain-community',
        'langchain-text-splitters',
        'sentence-transformers',
        'chromadb',
        'pypdf',
        'openpyxl',
        'pandas'
    ]

    print("Installing required packages...")
    for package in packages:
        subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-q', package])
    print("‚úÖ All packages installed successfully!")

# Run installation
install_packages()

import os
import gradio as gr
from groq import Groq
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_core.documents import Document
import pandas as pd
import pypdf
from typing import List, Tuple
import tempfile
import shutil

class RAGApplication:
    def __init__(self):
        self.client = None
        self.vectorstore = None
        self.embeddings = None
        self.api_key_verified = False
        self.temp_dir = tempfile.mkdtemp()

    def verify_api_key(self, api_key: str) -> Tuple[str, bool]:
        """Verify the Groq API key"""
        try:
            self.client = Groq(api_key=api_key)
            # Test the API key with a simple request
            response = self.client.chat.completions.create(
                messages=[{"role": "user", "content": "Hi"}],
                model="llama-3.3-70b-versatile",
                max_tokens=10
            )
            self.api_key_verified = True
            return "‚úÖ API Key verified successfully!", True
        except Exception as e:
            self.api_key_verified = False
            return f"‚ùå API Key verification failed: {str(e)}", False

    def extract_text_from_pdf(self, file_path: str) -> str:
        """Extract text from PDF file"""
        text = ""
        with open(file_path, 'rb') as file:
            pdf_reader = pypdf.PdfReader(file)
            for page in pdf_reader.pages:
                text += page.extract_text() + "\n"
        return text

    def extract_text_from_excel(self, file_path: str) -> str:
        """Extract text from Excel file"""
        df = pd.read_excel(file_path)
        return df.to_string()

    def extract_text_from_txt(self, file_path: str) -> str:
        """Extract text from text file"""
        with open(file_path, 'r', encoding='utf-8') as file:
            return file.read()

    def process_documents(self, files) -> str:
        """Process uploaded documents and create vector store"""
        if not self.api_key_verified:
            return "‚ùå Please verify your API key first!"

        if not files:
            return "‚ùå Please upload at least one document!"

        try:
            # Extract text from all uploaded files
            documents = []
            for file in files:
                file_path = file.name
                file_ext = os.path.splitext(file_path)[1].lower()

                if file_ext == '.pdf':
                    text = self.extract_text_from_pdf(file_path)
                elif file_ext in ['.xlsx', '.xls']:
                    text = self.extract_text_from_excel(file_path)
                elif file_ext == '.txt':
                    text = self.extract_text_from_txt(file_path)
                else:
                    continue

                documents.append(Document(
                    page_content=text,
                    metadata={"source": os.path.basename(file_path)}
                ))

            if not documents:
                return "‚ùå No supported documents found! Please upload PDF, Excel, or TXT files."

            # Split documents into chunks
            text_splitter = RecursiveCharacterTextSplitter(
                chunk_size=1000,
                chunk_overlap=200,
                length_function=len
            )
            chunks = text_splitter.split_documents(documents)

            # Create embeddings
            self.embeddings = HuggingFaceEmbeddings(
                model_name="sentence-transformers/all-MiniLM-L6-v2"
            )

            # Create vector store
            self.vectorstore = Chroma.from_documents(
                documents=chunks,
                embedding=self.embeddings,
                persist_directory=os.path.join(self.temp_dir, "chroma_db")
            )

            return f"‚úÖ Successfully processed {len(files)} document(s) with {len(chunks)} chunks!"

        except Exception as e:
            return f"‚ùå Error processing documents: {str(e)}"

    def query_documents(self, question: str, chat_history: List) -> Tuple[str, List]:
        """Query the documents using RAG"""
        if not self.api_key_verified:
            return "‚ùå Please verify your API key first!", chat_history

        if not self.vectorstore:
            return "‚ùå Please upload and process documents first!", chat_history

        if not question.strip():
            return "‚ùå Please enter a question!", chat_history

        try:
            # Retrieve relevant documents
            relevant_docs = self.vectorstore.similarity_search(question, k=3)

            # Create context from retrieved documents
            context = "\n\n".join([doc.page_content for doc in relevant_docs])

            # Create prompt
            prompt = f"""Based on the following context, answer the question. If the answer cannot be found in the context, say "I cannot find this information in the provided documents."

Context:
{context}

Question: {question}

Answer:"""

            # Get response from Groq
            response = self.client.chat.completions.create(
                messages=[{"role": "user", "content": prompt}],
                model="llama-3.3-70b-versatile",
                max_tokens=1024,
                temperature=0.3
            )

            answer = response.choices[0].message.content

            # Update chat history
            chat_history.append((question, answer))

            return "", chat_history

        except Exception as e:
            return f"‚ùå Error: {str(e)}", chat_history

    def clear_chat(self) -> Tuple[str, List]:
        """Clear chat history"""
        return "", []

# Initialize the RAG application
rag_app = RAGApplication()

# Create Gradio interface
with gr.Blocks(title="RAG Application with Groq") as demo:
    gr.Markdown("# üìö RAG Application with Groq API")
    gr.Markdown("Upload documents (PDF, Excel, TXT) and ask questions based on their content.")

    with gr.Row():
        with gr.Column(scale=1):
            gr.Markdown("### üîë Step 1: API Key")
            api_key_input = gr.Textbox(
                label="Groq API Key",
                placeholder="Enter your Groq API key",
                type="password"
            )
            verify_btn = gr.Button("Verify API Key", variant="primary")
            api_status = gr.Textbox(label="Status", interactive=False)

            gr.Markdown("### üìÑ Step 2: Upload Documents")
            file_upload = gr.File(
                label="Upload Documents",
                file_count="multiple",
                file_types=[".pdf", ".txt", ".xlsx", ".xls"]
            )
            process_btn = gr.Button("Process Documents", variant="primary")
            process_status = gr.Textbox(label="Processing Status", interactive=False)

        with gr.Column(scale=2):
            gr.Markdown("### üí¨ Step 3: Ask Questions")
            chatbot = gr.Chatbot(label="Chat History", height=400)
            question_input = gr.Textbox(
                label="Your Question",
                placeholder="Ask a question about your documents...",
                lines=2
            )
            with gr.Row():
                submit_btn = gr.Button("Submit", variant="primary")
                clear_btn = gr.Button("Clear Chat")
            query_status = gr.Textbox(label="Status", interactive=False)

    # Event handlers
    verify_btn.click(
        fn=rag_app.verify_api_key,
        inputs=[api_key_input],
        outputs=[api_status, gr.State()]
    )

    process_btn.click(
        fn=rag_app.process_documents,
        inputs=[file_upload],
        outputs=[process_status]
    )

    submit_btn.click(
        fn=rag_app.query_documents,
        inputs=[question_input, chatbot],
        outputs=[query_status, chatbot]
    ).then(
        lambda: "",
        outputs=[question_input]
    )

    question_input.submit(
        fn=rag_app.query_documents,
        inputs=[question_input, chatbot],
        outputs=[query_status, chatbot]
    ).then(
        lambda: "",
        outputs=[question_input]
    )

    clear_btn.click(
        fn=rag_app.clear_chat,
        outputs=[question_input, chatbot]
    )

# Launch the app
demo.launch(debug=True, share=True)

Installing required packages...
‚úÖ All packages installed successfully!


  chatbot = gr.Chatbot(label="Chat History", height=400)
  chatbot = gr.Chatbot(label="Chat History", height=400)


Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://102c49257d7e6ff0e7.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


  self.embeddings = HuggingFaceEmbeddings(
The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

In [5]:
!pip install -q langchain-core

In [3]:
!pip install -q groq gradio langchain langchain-community langchain-text-splitters sentence-transformers chromadb pypdf openpyxl pandas torch transformers