In [None]:
!pip install torch>=2.0.0 transformers>=4.31.0 sentence-transformers faiss-cpu PyMuPDF numpy accelerate 'bitsandbytes>=0.41.0'

In [None]:
!pip install google

In [None]:
#
import fitz
import torch
import faiss
import numpy as np
from sentence_transformers import SentenceTransformer
from transformers import LlamaForCausalLM, LlamaTokenizer, BitsAndBytesConfig



In [None]:
#
import os
os.environ['HUGGINGFACE_TOKEN'] = 'your token here'

# Add this explicit login
from huggingface_hub import login
login(os.environ['HUGGINGFACE_TOKEN'])

# Then your model loading code
from transformers import AutoTokenizer, AutoModelForCausalLM

model_name = "meta-llama/Llama-2-7b-chat-hf"
tokenizer = AutoTokenizer.from_pretrained(model_name)

In [None]:
class SimpleRAG:
    def __init__(self, directory_path):
        self.directory_path = directory_path
        self.documents = []
        self.doc_titles = []
        self.embed_model = None
        self.index = None
        self.tokenizer = None
        self.model = None
        # auto-detect GPU or CPU
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    def load_pdfs(self):
        pdf_files = [f for f in os.listdir(self.directory_path) if f.endswith('.pdf')]
        for f in pdf_files:
            full_path = os.path.join(self.directory_path, f)
            text = self.extract_text_from_pdf(full_path)
            if text:
                self.documents.append(text)
                self.doc_titles.append(f)
        print(f"Loaded {len(self.documents)} PDFs.")

    def extract_text_from_pdf(self, pdf_path):
        doc_text = []
        try:
            doc = fitz.open(pdf_path)
            for page in doc:
                txt = page.get_text()
                if txt.strip():
                    doc_text.append(txt)
            doc.close()
        except Exception as e:
            print(f"Error reading {pdf_path} - {e}")
            return None
        return " ".join(doc_text)

    def init_embeddings(self):
        self.embed_model = SentenceTransformer('all-MiniLM-L6-v2')
        print("SentenceTransformer loaded.")

    def create_index(self):
        print("Creating FAISS index.")
        embeddings = self.embed_model.encode(self.documents, show_progress_bar=True)
        dimension = embeddings.shape[1]
        self.index = faiss.IndexFlatL2(dimension)
        self.index.add(np.array(embeddings).astype('float32'))
        print("Index created and docs embedded.")

    def init_llama(self):
        print("Loading Llama 2 7B in 4-bit.")
        model_name = "meta-llama/Llama-2-7b-chat-hf"
        bnb_config = BitsAndBytesConfig(
            load_in_4bit=True,
            bnb_4bit_compute_dtype=torch.float16,
            bnb_4bit_use_double_quant=False,
            bnb_4bit_quant_type="nf4"
        )
        self.tokenizer = LlamaTokenizer.from_pretrained(model_name)
        self.model = LlamaForCausalLM.from_pretrained(
            model_name,
            quantization_config=bnb_config,
            # Optional: set your cache_dir if needed
            cache_dir='/media/joepareti54/Elements/x/huggingface/hub/more/',
            torch_dtype=torch.float16,
            device_map="auto"
        )
        print("Llama 7B model loaded in 4-bit.")

    def retrieve_documents(self, query, k=3):
        print(f"\nQuery: {query}")
        query_embed = self.embed_model.encode([query])[0]
        distances, indices = self.index.search(np.array([query_embed]).astype('float32'), k)
        retrieved_docs = []
        for rank, idx in enumerate(indices[0]):
            dist = distances[0][rank]
            doc_text = self.documents[idx]
            doc_title = self.doc_titles[idx]
            print(f"  Rank {rank+1}, Distance={dist:.4f}, Doc={doc_title}")
            retrieved_docs.append(doc_text)
        return retrieved_docs

    def summarize_text(self, text: str) -> str:
        """
        Step 1: Summarize the doc text in a short prompt so we don't pass huge data to final generation.
        """
        # Possibly truncate text first if it's huge
        max_chars = 1000
        truncated_text = text[:max_chars]

        prompt = (
            "You are an AI summarizer. Summarize the following text in your own words. "
            "Do NOT repeat large chunks verbatim.\n\n"
            f"{truncated_text}\n\n"
            "Summary:"
        )
        input_ids = self.tokenizer(prompt, return_tensors="pt", truncation=True, max_length=512).input_ids.to(self.device)
        outputs = self.model.generate(
            input_ids=input_ids,
            max_new_tokens=100,
            num_beams=1,
            do_sample=True,
            temperature=0.7
        )
        summary = self.tokenizer.decode(outputs[0], skip_special_tokens=True).strip()
        return summary

    def generate_final_answer(self, summary: str, query: str) -> str:
        """
        Step 2: Use the doc summary plus user query to produce a final answer.
        """
        prompt = (
            "You are a knowledgeable AI with broad expertise.\n"
            f"Here is a summary of a reference document:\n{summary}\n\n"
            f"User question: {query}\n\n"
            "Answer in your own words using the summary. Do not repeat the summary verbatim:"
        )
        input_ids = self.tokenizer(prompt, return_tensors="pt", truncation=True, max_length=512).input_ids.to(self.device)
        outputs = self.model.generate(
            input_ids=input_ids,
            max_new_tokens=150,
            num_beams=1,
            do_sample=True,
            temperature=0.7
        )
        answer = self.tokenizer.decode(outputs[0], skip_special_tokens=True).strip()
        return answer

    def run_query(self, query, k=3):
        docs = self.retrieve_documents(query, k)
        if not docs:
            print("No docs returned. Possibly the index is empty.")
            return

        # Optionally combine or just pick the top doc
        # e.g., summarizing only the 1st doc for demonstration
        # or you can merge them if you want multiple summaries
        doc_text = docs[0]

        # Step 1: Summarize doc text
        doc_summary = self.summarize_text(doc_text)

        # Step 2: Generate final answer
        final_answer = self.generate_final_answer(doc_summary, query)
        print(f"\nAnswer:\n{final_answer}")




In [None]:
def main():
#    
# Remove or comment these lines when running locally 
    from google.colab import drive
    drive.mount('/content/drive')
#    
    directory_path = r"/content/drive/My Drive/All_Finance_PDF_files_old/"
# update the line above using your own storage on google drive
    rag = SimpleRAG(directory_path)
    rag.load_pdfs()
    rag.init_embeddings()
    rag.create_index()
    rag.init_llama()

    while True:
        query = input("\nEnter query (or 'quit'): ").strip()
        if query.lower() == 'quit':
            break
        rag.run_query(query, k=3)

if __name__ == "__main__":
    main()
