In [None]:
!pip install -q transformers sentence-transformers faiss-cpu PyPDF2 python-docx python-pptx gradio nltk torch


In [None]:
import os
import numpy as np
import gradio as gr
import faiss
import nltk
nltk.download("punkt", quiet=True)
from nltk.tokenize import sent_tokenize
from typing import List

import PyPDF2
from docx import Document
from pptx import Presentation

import torch
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline
from sentence_transformers import SentenceTransformer

In [None]:
# Device

USE_CUDA = torch.cuda.is_available()
DEVICE = 0 if USE_CUDA else -1
torch_device = "cuda" if USE_CUDA else "cpu"
print("Using device:", torch_device)

Using device: cuda


In [None]:
# LOAD SUMMARIZATION MODEL (Flan-T5)
MODEL_NAME = "google/flan-t5-base"
LLM_SAVE_PATH = "./models/flan-t5-base"

# download & save locally (will take time & disk)
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForSeq2SeqLM.from_pretrained(MODEL_NAME)
os.makedirs(LLM_SAVE_PATH, exist_ok=True)
tokenizer.save_pretrained(LLM_SAVE_PATH)
model.save_pretrained(LLM_SAVE_PATH)

def get_summarizer(model_path=LLM_SAVE_PATH):
    return pipeline(
        "summarization",
        model=model_path,
        tokenizer=model_path,
        device=0 if torch.cuda.is_available() else -1,
        # use float16 on GPU for lower memory if available
        torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
    )

summarizer = get_summarizer()

# We'll also use a text2text pipeline from the same local model for QA generation
flan_pipe = pipeline("text2text-generation", model=LLM_SAVE_PATH, tokenizer=LLM_SAVE_PATH,
                     device=0 if torch.cuda.is_available() else -1)

Device set to use cuda:0
Device set to use cuda:0


In [None]:
# TRANSLATION MODEL (mbart-50)
# ============================
translator = pipeline("translation", model="facebook/mbart-large-50-many-to-many-mmt")

def translate_text(text, direction="en-ar"):
    if not text.strip():
        return "⚠️ No text to translate"
    if direction == "en-ar":
        return translator(text, src_lang="en_XX", tgt_lang="ar_AR")[0]['translation_text']
    else:
        return translator(text, src_lang="ar_AR", tgt_lang="en_XX")[0]['translation_text']

config.json: 0.00B [00:00, ?B/s]

model.safetensors:   0%|          | 0.00/2.44G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/261 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/529 [00:00<?, ?B/s]

sentencepiece.bpe.model:   0%|          | 0.00/5.07M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/649 [00:00<?, ?B/s]

Device set to use cuda:0


In [None]:
# EMBEDDING MODEL FOR RAG / CHAT
embedder = SentenceTransformer("all-MiniLM-L6-v2")
if USE_CUDA:
    try:
        embedder.to(torch_device)
    except Exception:
        # some SentenceTransformer versions ignore .to()
        pass

In [None]:
# File extraction utilities
def extract_text_from_file(path: str) -> str:
    if not path:
        return ""
    ext = os.path.splitext(path)[1].lower()
    text = ""
    try:
        if ext == ".pdf":
            reader = PyPDF2.PdfReader(path)
            for page in reader.pages:
                text += (page.extract_text() or "") + "\n"
        elif ext == ".docx":
            doc = Document(path)
            text = "\n".join([p.text for p in doc.paragraphs])
        elif ext == ".pptx":
            prs = Presentation(path)
            for slide in prs.slides:
                for shape in slide.shapes:
                    if hasattr(shape, "text"):
                        text += shape.text + "\n"
        elif ext == ".txt":
            with open(path, "r", encoding="utf-8", errors="ignore") as f:
                text = f.read()
        else:
            return ""
    except Exception as e:
        return f"[ERROR reading file: {e}]"
    return text.strip()


In [None]:
# Chunking helper (word-based)
def chunk_by_words(text: str, chunk_size: int = 600, overlap: int = 100) -> List[str]:
    words = text.split()
    if not words:
        return []
    chunks = []
    i = 0
    n = len(words)
    while i < n:
        chunk = words[i: i + chunk_size]
        chunks.append(" ".join(chunk))
        i += chunk_size - overlap
    return chunks

In [None]:
# Summarization
# ----------------------------
def summarize_document_single(text, chunk_size=800, overlap=100, min_len=80, max_len=300):
    chunks = chunk_by_words(text, chunk_size=chunk_size, overlap=overlap)
    summaries = []
    for ch in chunks:
        try:
            s = summarizer(ch, min_length=min_len, max_length=max_len)[0]['summary_text']
        except Exception as e:
            s = f"[Error summarizing chunk: {e}]"
        summaries.append(s)
    return "\n\n".join(summaries)


In [92]:
# RAG (index, retrieve, answer)
# ----------------------------
def build_faiss_index(chunks: List[str]):
    if not chunks:
        return None, []
    vecs = embedder.encode(chunks, show_progress_bar=False)
    vecs = np.array(vecs).astype("float32")
    index = faiss.IndexFlatL2(vecs.shape[1])
    index.add(vecs)
    return index, chunks

QA_PROMPT = """Use the context below to answer the question.
If the answer is not in the context, say "I don't know".

Context:
{context}

Question: {question}

Answer:"""

def retrieve_top_k(question: str, index, chunks, k=3):
    if index is None:
        return []
    q_emb = embedder.encode([question]).astype("float32")
    D, I = index.search(q_emb, k)
    idxs = [int(i) for i in I[0] if 0 <= i < len(chunks)]
    return [chunks[i] for i in idxs]

def rag_answer(question: str, index, chunks):
    if not question or index is None:
        return "⚠️ Upload a file first."
    retrieved = retrieve_top_k(question, index, chunks, k=3)
    if not retrieved:
        return "No relevant context found."
    context = "\n\n".join(retrieved)
    prompt = QA_PROMPT.format(context=context, question=question)
    out = flan_pipe(prompt, max_length=220, do_sample=False)
    ans = out[0]["generated_text"].strip()
    return ans


In [93]:
# ----------------------------
# GRADIO UI (clean & aligned)
# ----------------------------
with gr.Blocks() as demo:
    gr.Markdown("# 📚 NLP Smart Assistant")

    # States (hidden)
    doc_text = gr.State("")
    doc_chunks = gr.State([])
    faiss_index = gr.State(None)

    # --------------------
    # File Upload Section
    # --------------------
    with gr.Row():
        file_input = gr.File(label="📂 Upload Document", type="filepath")
        file_status = gr.Markdown("**No file loaded**")

    # --------------------
    # Main Section (Summarize | Chat)
    # --------------------
    with gr.Row():
        # Summarization (Left)
        with gr.Column(scale=2):
            gr.Markdown("### 📑 Summarization")

            with gr.Row():
                min_len = gr.Slider(30, 200, value=90, step=10, label="Min Length")
                max_len = gr.Slider(100, 800, value=490, step=10, label="Max Length")
                chunk_size = gr.Slider(200, 1500, value=950, step=50, label="Chunk Size")

            summarize_btn = gr.Button("📑 Summarization")
            summary_out = gr.Textbox(label="Summary ", lines=12)

        # Chat (Right)
        with gr.Column(scale=1):
            gr.Markdown("### 💬 Chat with Document")
            question_in = gr.Textbox(label="Ask", placeholder="Type your question...")
            ask_btn = gr.Button("🤔 Ask")
            answer_out = gr.Textbox(label="Answer", lines=12)

    # --------------------
    # Translation Section (underneath)
    # --------------------
    with gr.Row():
        gr.Markdown("### 🌍 Translation")
    with gr.Row():
        translate_btn = gr.Button("🌍 Translate")
        translation_out = gr.Textbox(label="Summary in Arabic", lines=6)

    # --------------------
    # Handlers
    # --------------------
    def handle_file(path):
        if not path:
            return "", [], None, "**No file loaded**"
        txt = extract_text_from_file(path)
        if txt.startswith("[ERROR"):
            return "", [], None, f"**Error:** {txt}"
        chunks = chunk_by_words(txt, chunk_size=300, overlap=50)
        index, chunks = build_faiss_index(chunks)
        status = f"Loaded: {os.path.basename(path)} — {len(txt)} chars — chunks: {len(chunks)}"
        return txt, chunks, index, status

    file_input.upload(handle_file, inputs=[file_input], outputs=[doc_text, doc_chunks, faiss_index, file_status])

    def do_summarize(text, chunk_sz, min_l, max_l):
        if not text.strip():
            return "⚠️ Please upload a file first."
        return summarize_document_single(text, chunk_size=chunk_sz, overlap=120, min_len=int(min_l), max_len=int(max_l))

    summarize_btn.click(do_summarize, inputs=[doc_text, chunk_size, min_len, max_len], outputs=[summary_out])

    translate_btn.click(lambda txt: translate_text(txt, "en-ar"), inputs=[summary_out], outputs=[translation_out])

    ask_btn.click(rag_answer, inputs=[question_in, faiss_index, doc_chunks], outputs=[answer_out])

# Run
demo.launch(share=True)


Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://bc404dba4f89ea09b0.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


