In [7]:
# View & export your DocQA SQLite data from Colab
!pip -q install pandas

import os, json, sqlite3, pandas as pd
from datetime import datetime

DB_PATH = "/content/docqa.db"  # change if you saved it elsewhere

if not os.path.exists(DB_PATH):
    raise FileNotFoundError(f"DB not found at {DB_PATH}. Run the app first to create it.")

con = sqlite3.connect(DB_PATH)
con.row_factory = sqlite3.Row

# 1) List tables
tables = pd.read_sql_query("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name;", con)
print("📚 Tables found:")
display(tables)

# 2) Show schemas (optional)
def schema(name):
    return pd.read_sql_query(f"PRAGMA table_info({name});", con)

print("Schema: uploads")
display(schema("uploads"))
print("Schema: chunks")
display(schema("chunks"))
print("Schema: queries")
display(schema("queries"))

# 3) Load data
uploads_df = pd.read_sql_query("SELECT * FROM uploads ORDER BY created_at DESC;", con)
queries_df = pd.read_sql_query("SELECT * FROM queries ORDER BY created_at DESC;", con)

# 4) Expand citations_json for readability
def expand_citations(df: pd.DataFrame) -> pd.DataFrame:
    if "citations_json" not in df.columns:
        return df
    def parse(js):
        try:
            arr = json.loads(js) if js else []
            # Show up to 2 previews and scores
            p1 = arr[0]["preview"] if len(arr) > 0 else None
            s1 = arr[0]["score"]   if len(arr) > 0 else None
            p2 = arr[1]["preview"] if len(arr) > 1 else None
            s2 = arr[1]["score"]   if len(arr) > 1 else None
            return pd.Series({"cit1_score": s1, "cit1_preview": p1, "cit2_score": s2, "cit2_preview": p2})
        except Exception:
            return pd.Series({"cit1_score": None, "cit1_preview": None, "cit2_score": None, "cit2_preview": None})
    if len(df) == 0:
        return df
    expanded = df.apply(lambda r: parse(r["citations_json"]), axis=1)
    return pd.concat([df.drop(columns=["citations_json"]), expanded], axis=1)

queries_pretty = expand_citations(queries_df)

# 5) Join queries with filenames for convenience
queries_join = queries_pretty.merge(
    uploads_df[["id","filename"]],
    left_on="upload_id",
    right_on="id",
    how="left",
    suffixes=("", "_upload")
).drop(columns=["id_upload"])

# 6) Show results
print("📄 uploads")
display(uploads_df.head(20))

print("💬 queries (pretty)")
display(queries_join.head(20))

# 7) Optional: export CSVs to download
uploads_csv = "/content/uploads_export.csv"
queries_csv = "/content/queries_export.csv"
uploads_df.to_csv(uploads_csv, index=False)
queries_join.to_csv(queries_csv, index=False)

print("✅ Exported CSVs:")
print(" -", uploads_csv)
print(" -", queries_csv)

# If you want to view chunk texts for a specific upload:
if not uploads_df.empty:
    sample_upload_id = uploads_df.iloc[0]["id"]
    print(f"\n🔎 Example chunks for upload_id={sample_upload_id[:8]}…")
    ch = pd.read_sql_query(
        "SELECT chunk_index, substr(text,1,200) AS preview FROM chunks WHERE upload_id = ? ORDER BY chunk_index LIMIT 10;",
        con, params=(sample_upload_id,)
    )
    display(ch)

con.close()


📚 Tables found:


Unnamed: 0,name
0,chunks
1,queries
2,sqlite_sequence
3,uploads


Schema: uploads


Unnamed: 0,cid,name,type,notnull,dflt_value,pk
0,0,id,TEXT,0,,1
1,1,filename,TEXT,1,,0
2,2,size_bytes,INTEGER,1,,0
3,3,mimetype,TEXT,0,,0
4,4,created_at,TEXT,1,,0


Schema: chunks


Unnamed: 0,cid,name,type,notnull,dflt_value,pk
0,0,id,INTEGER,0,,1
1,1,upload_id,TEXT,1,,0
2,2,chunk_index,INTEGER,1,,0
3,3,text,TEXT,1,,0


Schema: queries


Unnamed: 0,cid,name,type,notnull,dflt_value,pk
0,0,id,INTEGER,0,,1
1,1,upload_id,TEXT,1,,0
2,2,question,TEXT,1,,0
3,3,answer,TEXT,1,,0
4,4,citations_json,TEXT,0,,0
5,5,created_at,TEXT,1,,0


📄 uploads


Unnamed: 0,id,filename,size_bytes,mimetype,created_at
0,85170031-3ef9-4ade-aa80-f1c8eb22fdc6,sample_test_doc.pdf,2152,application/pdf,2025-09-22T09:06:07.253366


💬 queries (pretty)


Unnamed: 0,id,upload_id,question,answer,created_at,cit1_score,cit1_preview,cit2_score,cit2_preview,filename
0,3,85170031-3ef9-4ade-aa80-f1c8eb22fdc6,why was this document made,to help test the AI-powered Q&A system,2025-09-22T09:06:54.763568,0.153897,Sample Document for Testing Q&A;\nThis documen...,,,sample_test_doc.pdf
1,2,85170031-3ef9-4ade-aa80-f1c8eb22fdc6,what is flutter,UI toolkit,2025-09-22T09:06:39.801854,0.256495,Sample Document for Testing Q&A;\nThis documen...,,,sample_test_doc.pdf
2,1,85170031-3ef9-4ade-aa80-f1c8eb22fdc6,why was this made,I couldn’t find that in the provided document.,2025-09-22T09:06:20.906282,,,,,sample_test_doc.pdf


✅ Exported CSVs:
 - /content/uploads_export.csv
 - /content/queries_export.csv

🔎 Example chunks for upload_id=85170031…


Unnamed: 0,chunk_index,preview
0,0,Sample Document for Testing Q&A;\nThis documen...


In [6]:
# All-in-one Colab cell: Model + Retrieval + SQLite DB + Gradio UI
!pip -q install gradio pdfplumber scikit-learn transformers accelerate

import os, re, json, uuid, sqlite3, pdfplumber, gradio as gr
from datetime import datetime
from typing import List, Tuple, Optional

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

NOT_FOUND = "I couldn’t find that in the provided document."

# ---------- LLM (with safe fallback) ----------
GEN = None
try:
    from transformers import pipeline
    GEN = pipeline("text2text-generation", model="google/flan-t5-base", device_map="auto")
except Exception as e:
    print("⚠️ transformers model unavailable; using snippet-only fallback.\n", e)

def answer_with_model(question: str, contexts: List[str]) -> str:
    if not contexts:
        return NOT_FOUND
    if GEN is None:
        # Strict snippet-only fallback (quotes from doc)
        snips = [re.sub(r"\s+", " ", c).strip() for c in contexts[:2]]
        if not any(snips):
            return NOT_FOUND
        merged = ' " … " '.join([s[:300] + ("…" if len(s) > 300 else "") for s in snips])
        return f'From the document: "{merged}"'
    prompt = (
        "You are a careful assistant that answers using ONLY the provided context from a single document.\n"
        f"If the answer is not in the context, reply exactly: \"{NOT_FOUND}\"\n\n"
        "Context:\n" + "\n\n".join([f"[#{i+1}]\n{c}" for i, c in enumerate(contexts)]) +
        f"\n\nQuestion: {question}\nAnswer:"
    )
    out = GEN(prompt, max_new_tokens=256, temperature=0.0)
    text = (out[0].get("generated_text") or "").strip()
    return text if text else NOT_FOUND

# ---------- Document processing ----------
def extract_text(path: str) -> str:
    if not os.path.exists(path):
        raise FileNotFoundError(path)
    ext = os.path.splitext(path)[1].lower()
    if ext == ".txt":
        with open(path, "rb") as f:
            return f.read().decode("utf-8", errors="ignore").strip()
    elif ext == ".pdf":
        with pdfplumber.open(path) as pdf:
            pages = [p.extract_text() or "" for p in pdf.pages]
        return "\n\n".join(pages).strip()
    else:
        raise ValueError("Unsupported file type. Use .txt or .pdf")

def chunk_text(text: str, max_chars: int = 1500, overlap: int = 200) -> List[str]:
    text = re.sub(r"\r", "", text or "")
    chunks, i, n = [], 0, len(text)
    while i < n:
        end = min(i + max_chars, n)
        piece = text[i:end]
        if end < n:
            cut = piece.rfind(". ")
            if cut > max_chars * 0.6:
                piece = piece[:cut+1]
                end = i + cut + 1
        piece = piece.strip()
        if piece:
            chunks.append(piece)
        if end >= n: break
        i = max(0, end - overlap)
    return chunks

class Retriever:
    def __init__(self, chunks: List[str]):
        self.chunks = chunks
        self.vectorizer = TfidfVectorizer(lowercase=True, token_pattern=r"[A-Za-z0-9]+")
        self.mat = self.vectorizer.fit_transform(chunks)
    def search(self, query: str, k: int = 4) -> List[Tuple[int, float, str]]:
        q = self.vectorizer.transform([query])
        sims = cosine_similarity(q, self.mat)[0]
        idxs = sims.argsort()[::-1][:k]
        return [(int(i), float(sims[i]), self.chunks[int(i)]) for i in idxs]

# ---------- SQLite DB ----------
DB_PATH = "/content/docqa.db"  # set to Drive path if you want persistence across sessions

_conn = sqlite3.connect(DB_PATH, check_same_thread=False)
_conn.execute("PRAGMA journal_mode=WAL;")
_conn.execute("PRAGMA synchronous=NORMAL;")
_conn.row_factory = sqlite3.Row

def init_db():
    _conn.executescript("""
    CREATE TABLE IF NOT EXISTS uploads (
        id TEXT PRIMARY KEY,
        filename TEXT NOT NULL,
        size_bytes INTEGER NOT NULL,
        mimetype TEXT,
        created_at TEXT NOT NULL
    );
    CREATE TABLE IF NOT EXISTS chunks (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        upload_id TEXT NOT NULL,
        chunk_index INTEGER NOT NULL,
        text TEXT NOT NULL,
        FOREIGN KEY(upload_id) REFERENCES uploads(id) ON DELETE CASCADE
    );
    CREATE TABLE IF NOT EXISTS queries (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        upload_id TEXT NOT NULL,
        question TEXT NOT NULL,
        answer TEXT NOT NULL,
        citations_json TEXT,
        created_at TEXT NOT NULL,
        FOREIGN KEY(upload_id) REFERENCES uploads(id) ON DELETE CASCADE
    );
    """)
    _conn.commit()

def save_upload(filename: str, size_bytes: int, mimetype: Optional[str], chunks: List[str]) -> str:
    upload_id = str(uuid.uuid4())
    _conn.execute(
        "INSERT INTO uploads (id, filename, size_bytes, mimetype, created_at) VALUES (?, ?, ?, ?, ?)",
        (upload_id, filename, int(size_bytes), mimetype or "", datetime.utcnow().isoformat()),
    )
    _conn.executemany(
        "INSERT INTO chunks (upload_id, chunk_index, text) VALUES (?, ?, ?)",
        [(upload_id, i, c) for i, c in enumerate(chunks)],
    )
    _conn.commit()
    return upload_id

def save_query(upload_id: str, question: str, answer: str, citations: Optional[List[dict]]):
    _conn.execute(
        "INSERT INTO queries (upload_id, question, answer, citations_json, created_at) VALUES (?, ?, ?, ?, ?)",
        (upload_id, question, answer, json.dumps(citations or []), datetime.utcnow().isoformat()),
    )
    _conn.commit()

init_db()
print(f"✅ DB ready at {DB_PATH}")

# ---------- Gradio UI ----------
UI_STATE = {
    "meta": None,        # {"name":..., "size":..., "mime":...}
    "chunks": None,      # list[str]
    "retriever": None,   # Retriever instance
    "upload_id": None,   # DB id
}

def ui_handle_upload(file):
    if file is None:
        return "❌ No file selected.", [], gr.update(visible=False)

    path = getattr(file, "name", None)
    if not path or not os.path.exists(path):
        return "❌ Could not read uploaded file.", [], gr.update(visible=False)

    size = os.path.getsize(path)
    if size > 2 * 1024 * 1024:
        return "❌ File too large (> 2MB).", [], gr.update(visible=False)

    ext = os.path.splitext(path.lower())[1]
    if ext not in (".txt", ".pdf"):
        return "❌ Unsupported file type. Use .txt or .pdf.", [], gr.update(visible=False)

    try:
        text = extract_text(path)
    except Exception as e:
        return f"❌ Failed to read file: {e}", [], gr.update(visible=False)

    chunks = chunk_text(text)
    if not chunks:
        return "❌ No readable text found in file.", [], gr.update(visible=False)

    retr = Retriever(chunks)
    UI_STATE["meta"] = {
        "name": os.path.basename(path),
        "size": size,
        "mime": ("text/plain" if ext == ".txt" else "application/pdf"),
    }
    UI_STATE["chunks"] = chunks
    UI_STATE["retriever"] = retr

    # Save to DB
    upload_id = save_upload(
        filename=UI_STATE["meta"]["name"],
        size_bytes=UI_STATE["meta"]["size"],
        mimetype=UI_STATE["meta"]["mime"],
        chunks=chunks
    )
    UI_STATE["upload_id"] = upload_id

    human = f"{size/1024:.1f} KB" if size < 1024*1024 else f"{size/1024/1024:.2f} MB"
    info = f"✅ Loaded: **{UI_STATE['meta']['name']}** • {human} • {len(chunks)} chunk(s)"
    seed_msgs = [{"role":"assistant", "content":"File uploaded. Ask me anything about it."}]
    return info, seed_msgs, gr.update(visible=True)

def submit_and_clear(q, history):
    retr = UI_STATE.get("retriever")
    if retr is None:
        ans = "Please upload a PDF/TXT (≤ 2MB) first."
        return "", history + [{"role":"user","content":q}, {"role":"assistant","content":ans}]

    # Collect hits for citations + answer
    hits = retr.search(q, k=4)
    if not hits or hits[0][1] < 0.08:
        ans = NOT_FOUND
        cits = []
    else:
        contexts = [h[2] for h in hits]
        ans = answer_with_model(q, contexts)
        cits = [{"score": float(s), "preview": t[:200]} for (_, s, t) in hits]

    # Save Q&A to DB
    if UI_STATE.get("upload_id"):
        save_query(UI_STATE["upload_id"], q, ans, cits)

    return "", history + [{"role":"user","content":q}, {"role":"assistant","content":ans}]

with gr.Blocks(theme=gr.themes.Soft()) as demo:
    gr.Markdown("## 📄 Document Q&A (Colab + SQLite)")

    with gr.Row():
        file_in = gr.File(label="Upload PDF/TXT (≤ 2MB)")
        info = gr.Markdown()

    chatbot = gr.Chatbot(height=320, type="messages")
    question = gr.Textbox(placeholder="Type your question…", container=True)
    ask_btn = gr.Button("Ask", variant="primary")

    # Hide chat controls until a file is uploaded
    chatbot.visible = False
    question.visible = False
    ask_btn.visible = False

    file_in.upload(fn=ui_handle_upload, inputs=file_in, outputs=[info, chatbot, chatbot])

    def show_controls(): return gr.update(visible=True), gr.update(visible=True)
    file_in.upload(fn=show_controls, inputs=None, outputs=[question, ask_btn])

    question.submit(submit_and_clear, inputs=[question, chatbot], outputs=[question, chatbot])
    ask_btn.click(submit_and_clear, inputs=[question, chatbot], outputs=[question, chatbot])

demo.launch()


Device set to use cpu


✅ DB ready at /content/docqa.db
It looks like you are running Gradio on a hosted Jupyter notebook, which requires `share=True`. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://4d62cb7a78cb75155d.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


