In [None]:
# -*- coding: utf-8 -*-
"""rag.py

Automatically generated by Colab.

Original file is located at
    https://colab.research.google.com/drive/1gPulzDgzZ1D2TSlNOCbgW2ZHKdPTYt3A
"""

!pip install pytesseract
!pip install -U langchain-community
!pip install google-cloud-translate==2.0.0
!pip install pdfplumber
!pip install httpx==0.28.1
!pip install langchain-huggingface sentence-transformers transformers faiss-cpu pytesseract pdfplumber googletrans websockets deep_translator
# Colab: install libs
!pip install  faiss-cpu PyPDF2 pdfplumber flask flask_cors pyngrok
!pip install falcon

import zipfile
import os
import torch
import pdfplumber
import pytesseract
from PIL import Image
from deep_translator import GoogleTranslator
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM
from langchain_core.documents import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import FAISS
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
from langchain.llms import HuggingFacePipeline
from langchain_huggingface import HuggingFaceEmbeddings

# Path to your zip file
zip_path = "/content/doc_files.zip"

# Directory to extract files
extract_dir = "/content/extracted_files" # Changed directory name

# Create extraction directory if it doesn't exist
os.makedirs(extract_dir, exist_ok=True)

# Extract zip contents
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_dir)

print(f"Files extracted to: {extract_dir}")
print("Files:")
print(os.listdir(extract_dir))

In [None]:
# -------- Document Processor --------
class DocumentProcessor:
    def __init__(self):
        self.ocr_lang = 'eng+ara'
        self.translator_en = GoogleTranslator(source='auto', target='en')
        self.translator_ar = GoogleTranslator(source='auto', target='ar')

    def extract_text_from_pdf(self, filepath):
        text = ""
        with pdfplumber.open(filepath) as pdf:
            for page in pdf.pages:
                page_text = page.extract_text()
                if page_text:
                    text += page_text + "\n"
        return text

    def extract_text_from_image(self, filepath):
        return pytesseract.image_to_string(Image.open(filepath), lang=self.ocr_lang)

    def extract_text_from_txt(self, filepath):
        with open(filepath, 'r', encoding='utf-8') as f:
            return f.read()

    def extract_text_from_md(self, filepath):
        with open(filepath, 'r', encoding='utf-8') as f:
            return f.read()


    def translate_text_in_chunks(self, text, dest='en', chunk_size=4000):
        translated_text = ""
        translator = self.translator_en if dest == 'en' else self.translator_ar
        for i in range(0, len(text), chunk_size):
            chunk = text[i:i + chunk_size]
            try:
                translated_text += translator.translate(chunk)
            except Exception as e:
                print(f"⚠️ Error translating chunk: {e}")
                translated_text += chunk
        return translated_text

    def process_document(self, filepath, filetype='pdf'):
        text = ""
        if filetype == 'pdf':
            text = self.extract_text_from_pdf(filepath)
        elif filetype == 'txt':
            text = self.extract_text_from_txt(filepath)
        elif filetype == 'md':
            text = self.extract_text_from_md(filepath)
        else:
            raise ValueError(f"Unsupported file type: {filetype}")


        text_en = self.translate_text_in_chunks(text, dest='en')
        summary_en = "Summarization disabled"
        summary_ar = "Summarization disabled"
        return text, text_en, summary_en, summary_ar

In [None]:
# -------- Department Tagger --------
class DepartmentTagger:
    def __init__(self):
        self.keywords = {
            "Finance": ["budget", "revenue", "expense", "finance"],
            "Currency": ["banknotes", "coins", "mint", "currency"],
            "IT": ["network", "software", "hardware", "technology", "it"],
            "Legal": ["regulation", "law", "compliance", "legal"]
        }

    def tag(self, text):
        txt = text.lower()
        return [dept for dept, kws in self.keywords.items() if any(kw in txt for kw in kws)]

In [None]:
# -------- LLM Setup --------
def get_llm():
    # The Falcon-H1-1B-Base model is quite large and can be slow on CPU.
    # Consider using a smaller model or enabling a GPU runtime in Colab for faster processing.
    model_name = "tiiuae/Falcon-H1-1B-Base"
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        device_map="auto",
        torch_dtype=torch.bfloat16
    )
    pipe = pipeline(
        "text-generation",
        model=model,
        tokenizer=tokenizer,
        max_new_tokens=512,
        do_sample=False  # Deteministic output to reduce hallucination
    )
    print("GPU detected." if torch.cuda.is_available() else "⚠️ GPU not detected. Using CPU.")
    return HuggingFacePipeline(pipeline=pipe), model

In [None]:
# -------- Prompt Template --------
prompt_template = """You are an assistant for the Oman Central Bank. You must follow these rules strictly:

RULES:
1. Use ONLY the information provided in the Context section below
2. If the exact answer is not found in the Context, respond with "I don't have that specific information available"
3. Do NOT add any information from your general knowledge
4. Do NOT make assumptions or inferences beyond what is explicitly stated
5. Quote directly from the context when possible

Context: {context}

Question: {question}

Instructions: Read the context carefully and provide an answer using ONLY the information above. If you cannot find the answer in the context, say "I don't have that specific information available."

Answer:"""

prompt = PromptTemplate(input_variables=["context", "question"], template=prompt_template)

In [None]:
# -------- Oman Central Bank RAG --------
class OmanCBRAG:
    def __init__(self):
        self.processor = DocumentProcessor()
        self.tagger = DepartmentTagger()
        self.embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/paraphrase-multilingual-mpnet-base-v2")
        self.vector_store = None
        self.llm, self.model = get_llm()
        self.qa_chain = None

    def ingest_documents(self, folder_path):
        documents = []
        subfolder = 'cbo currency data set'
        extracted_files_path = os.path.join(folder_path, subfolder)

        if not os.path.isdir(extracted_files_path):
            print(f"⚠️ Expected folder path but got: {extracted_files_path}")
            return

        # Process files in the extracted folder
        for filename in os.listdir(extracted_files_path):
            file_path = os.path.join(extracted_files_path, filename)
            file_extension = os.path.splitext(filename)[1].lower()

            if file_extension == ".pdf":
                _, text_en, sum_en, sum_ar = self.processor.process_document(file_path, 'pdf')
            elif file_extension == ".txt":
                 _, text_en, sum_en, sum_ar = self.processor.process_document(file_path, 'txt')
            elif file_extension == ".md":
                 _, text_en, sum_en, sum_ar = self.processor.process_document(file_path, 'md')
            else:
                print(f"Skipping unsupported file type: {filename}")
                continue

            departments = self.tagger.tag(text_en)
            splitter = RecursiveCharacterTextSplitter(chunk_size=800, chunk_overlap=100)
            chunks = splitter.split_text(text_en)
            for chunk in chunks:
                documents.append(Document(
                    page_content=chunk,
                    metadata={
                        "filename": filename,
                        "summary_en": sum_en,
                        "summary_ar": sum_ar,
                        "departments": departments
                    }
                ))

        # Process additional files outside the zip
        additional_files = ["/content/banking_knowledge.txt", "/content/cbo_faq_mapping.md"]
        for file_path in additional_files:
            if os.path.exists(file_path):
                filename = os.path.basename(file_path)
                file_extension = os.path.splitext(filename)[1].lower()

                if file_extension == ".txt":
                    _, text_en, sum_en, sum_ar = self.processor.process_document(file_path, 'txt')
                elif file_extension == ".md":
                    _, text_en, sum_en, sum_ar = self.processor.process_document(file_path, 'md')
                else:
                    print(f"Skipping unsupported additional file type: {filename}")
                    continue

                departments = self.tagger.tag(text_en)
                splitter = RecursiveCharacterTextSplitter(chunk_size=800, chunk_overlap=100)
                chunks = splitter.split_text(text_en)
                for chunk in chunks:
                    documents.append(Document(
                        page_content=chunk,
                        metadata={
                            "filename": filename,
                            "summary_en": sum_en,
                            "summary_ar": sum_ar,
                            "departments": departments
                        }
                    ))


        if not documents:
            print(f"⚠️ No documents processed from folder: {extracted_files_path} and additional files.")
            return

        self.vector_store = FAISS.from_documents(documents, self.embeddings)
        self.qa_chain = RetrievalQA.from_chain_type(
            llm=self.llm,
            chain_type="stuff",
            retriever=self.vector_store.as_retriever(),
            chain_type_kwargs={"prompt": prompt},
            return_source_documents=False,
        )
        print(f"✅ Indexed {len(documents)} chunks and initialized QA chain")

    def save_weights(self, path="falcon_h1_weights.pth"):
        if self.model:
            torch.save(self.model.state_dict(), path)
            print(f"✅ Model weights saved to {path}")
        else:
            print("❌ Model not initialized. Cannot save weights.")


    def query(self, question, language='en'):
        if not self.qa_chain:
            print("❌ QA chain not initialized. Please ingest documents first.")
            return "Please ingest documents first.", ""

        print(f"Received question in {language}: {question}")

        translated_question = question
        if language == 'ar':
            try:
                translated_question = self.processor.translate_text_in_chunks(question, dest='en')
                print(f"Translated question to English: {translated_question}")
            except Exception as e:
                print(f"❌ Error translating question: {e}")
                return "Error translating your question.", ""

        try:
            result = self.qa_chain.invoke({"query": translated_question})
            answer_en = result["result"]
            print(f"Generated English answer: {answer_en}")
        except Exception as e:
            print(f"❌ Error generating answer: {e}")
            return "An error occurred during answer generation.", ""

        if language == 'ar':
            try:
                answer_ar = self.processor.translate_text_in_chunks(answer_en, dest='ar')
                print(f"Translated answer to Arabic: {answer_ar}")
                return answer_ar, answer_en
            except Exception as e:
                print(f"❌ Error translating answer back to Arabic: {e}")
                return "Error translating the answer.", answer_en
        else:
            return answer_en, answer_en

In [None]:
# Initialize and ingest
rag = OmanCBRAG()
rag.ingest_documents(extract_dir)
rag.save_weights()

# Query function without UI
def query_without_ui(question, language='en'):
    answer, answer_en = rag.query(question, language)
    if language == 'ar':
        print(f"الإجابة بالعربية:\n{answer}\n\n(English answer for reference: {answer_en})")
    else:
        print(f"English answer:\n{answer}")

# Example query
query_without_ui("What is the current banking regulation framework in Oman?", language='en')