SETUP

In [None]:
# If Colab asks to restart after installing, accept it.
!pip -q install --upgrade pip
!pip -q install "transformers>=4.45.0" "accelerate>=0.34.0" "sentence-transformers>=3.0.1" \
                 "faiss-cpu>=1.8.0" "langchain-text-splitters>=0.3.0" "pypdf>=4.2.0" \
                 "gradio>=4.44.0"
# bitsandbytes is optional (for 8-bit loading if a GPU is available). It may fail on CPU-only.
!pip -q install bitsandbytes==0.44.1 || echo "bitsandbytes optional install skipped"
!pip -q install -U google-generativeai

In [None]:
import os, torch, faiss
from sentence_transformers import SentenceTransformer
from langchain_text_splitters import RecursiveCharacterTextSplitter
print("PyTorch:", torch.__version__, "| CUDA available:", torch.cuda.is_available())
device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
from google.colab import userdata
import os

# Retrieve secret
GOOGLE_API_KEY = userdata.get('GOOGLE_API_KEY')

# Export it so the rest of the notebook sees it
os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY

print("Gemini key loaded:", "‚úÖ" if GOOGLE_API_KEY else "‚ùå Missing")

CONFIGURE MODEL BACKEND

In [None]:
# === Choose your generator backend ===
# "local" uses a small open-source chat model via Hugging Face Transformers.
# "openai" uses OpenAI's API (set OPENAI_API_KEY).
# "gemini" uses Google Generative AI (set GOOGLE_API_KEY).
GEN_BACKEND = "gemini"  # options: "local", "openai", "gemini"

# Local model config:
LOCAL_MODEL = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"  # light model for CPU/GPU demos

# Retrieval config
EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
CHUNK_SIZE = 800
CHUNK_OVERLAP = 120
TOP_K = 4

# Prompt template
SYSTEM_PROMPT = (
    "You are a helpful assistant. Use the provided context to answer the user's question.\n"
    "If the answer is not in the context, say you don't know.\n"
)

ANSWER_TEMPLATE = """[System]
{system}

[Context]
{context}

[User Question]
{question}

[Instructions]
- Cite the most relevant chunks briefly (e.g., 'From chunk 2').
- If unsure, say 'I don't know from the provided docs.'
- Keep answers concise and factual.
"""

OPENAI_API_KEY = globals().get("OPENAI_API_KEY", os.getenv("OPENAI_API_KEY", ""))
GOOGLE_API_KEY = globals().get("GOOGLE_API_KEY", os.getenv("GOOGLE_API_KEY", ""))

LOAD FILES/CHUNKS AND EMBEDDED

In [None]:
import os
from typing import List, Dict
from pypdf import PdfReader

def load_texts_from_paths(paths: List[str]) -> List[Dict]:
    docs = []
    for p in paths:
        if p.lower().endswith(".pdf"):
            text = ""
            try:
                reader = PdfReader(p)
                for page in reader.pages:
                    text += page.extract_text() or ""
            except Exception as e:
                print(f"[WARN] Failed to parse PDF {p}: {e}")
                continue
        elif p.lower().endswith((".txt",".md")):
            with open(p, "r", encoding="utf-8", errors="ignore") as f:
                text = f.read()
        else:
            print(f"[SKIP] Unsupported file type: {p}")
            continue
        docs.append({"path": p, "text": text})
    return docs

splitter = RecursiveCharacterTextSplitter(
    chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP, length_function=len
)

def chunk_docs(docs: List[Dict]) -> List[Dict]:
    chunks = []
    for d in docs:
        for i, ch in enumerate(splitter.split_text(d["text"])):
            chunks.append({"source": d["path"], "chunk_id": i, "text": ch})
    return chunks

class RAGIndex:
    def __init__(self, embedding_model_name: str):
        self.model = SentenceTransformer(embedding_model_name, device=device)
        self.index = None
        self.chunks: List[Dict] = []

    def build(self, chunks: List[Dict]):
        self.chunks = chunks
        embs = self.model.encode([c["text"] for c in chunks], convert_to_numpy=True, show_progress_bar=True)
        dim = embs.shape[1]
        index = faiss.IndexFlatIP(dim)
        faiss.normalize_L2(embs)
        index.add(embs)
        self.index = index
        print(f"Built index with {len(chunks)} chunks.")

    def search(self, query: str, k: int = 4):
        if self.index is None or not self.chunks:
            return []
        q = self.model.encode([query], convert_to_numpy=True)
        faiss.normalize_L2(q)
        scores, idxs = self.index.search(q, k)
        results = []
        for score, idx in zip(scores[0], idxs[0]):
            if idx == -1: continue
            results.append((float(score), self.chunks[idx]))
        return results

rag = RAGIndex(EMBEDDING_MODEL)

GENERATIONS BACKEND

In [None]:
def render_context(snippets):
    lines = []
    for rank, (score, ch) in enumerate(snippets, start=1):
        header = f"[Chunk {rank}] (score={score:.3f}) source={os.path.basename(ch['source'])} id={ch['chunk_id']}"
        lines.append(header + "\n" + ch["text"])
    return "\n\n".join(lines)

def build_prompt(question, context_blocks):
    return ANSWER_TEMPLATE.format(
        system=SYSTEM_PROMPT.strip(),
        context=context_blocks.strip(),
        question=question.strip()
    )

In [None]:
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
import torch

_local_pipe = None

def get_local_pipe():
    global _local_pipe
    if _local_pipe is None:
        tok = AutoTokenizer.from_pretrained(LOCAL_MODEL, use_fast=True)
        model = AutoModelForCausalLM.from_pretrained(
            LOCAL_MODEL,
            device_map="auto" if torch.cuda.is_available() else None,
            torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
            low_cpu_mem_usage=True
        )
        _local_pipe = pipeline(
            "text-generation",
            model=model,
            tokenizer=tok,
            device=0 if torch.cuda.is_available() else -1,
            max_new_tokens=384,
            do_sample=True,
            temperature=0.3,
            top_p=0.9,
            repetition_penalty=1.05
        )
    return _local_pipe

def generate_local(prompt: str) -> str:
    p = get_local_pipe()
    out = p(prompt, pad_token_id=p.tokenizer.eos_token_id)[0]["generated_text"]
    return out[len(prompt):].strip()

In [None]:
def generate_openai(prompt: str) -> str:
    if not OPENAI_API_KEY:
        return "OPENAI_API_KEY not set. Switch GEN_BACKEND to 'local' or set your key."
    try:
        from openai import OpenAI
        client = OpenAI(api_key=OPENAI_API_KEY)
        r = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role":"system","content":SYSTEM_PROMPT},
                      {"role":"user","content":prompt}],
            temperature=0.2,
            max_tokens=400
        )
        return r.choices[0].message.content
    except Exception as e:
        return f"[OpenAI error] {e}"

In [None]:
def generate_gemini(prompt: str) -> str:
    if not GOOGLE_API_KEY:
        return "GOOGLE_API_KEY not set. Switch GEN_BACKEND to 'local' or set your key."
    try:
        import google.generativeai as genai
        genai.configure(api_key=GOOGLE_API_KEY)
        model = genai.GenerativeModel("gemini-2.5-flash-lite")
        r = model.generate_content(prompt)
        return r.text
    except Exception as e:
        return f"[Gemini error] {e}"

ASK QUESTIONS

In [None]:
def answer_question(question: str, top_k: int = TOP_K):
    hits = rag.search(question, k=top_k)
    context = render_context(hits)
    prompt = build_prompt(question, context)

    if GEN_BACKEND == "local":
        answer = generate_local(prompt)
    elif GEN_BACKEND == "openai":
        answer = generate_openai(prompt)
    elif GEN_BACKEND == "gemini":
        answer = generate_gemini(prompt)
    else:
        answer = f"Unknown backend: {GEN_BACKEND}"

    return {"question": question, "answer": answer, "top_chunks": hits}

print("RAG ready. After indexing, call: answer_question('Your query')")

In [None]:
import google.generativeai as genai, os
genai.configure(api_key=os.environ["GOOGLE_API_KEY"])

available = [m.name for m in genai.list_models()
             if "generateContent" in getattr(m, "supported_generation_methods", [])]
for name in available:
    print(name)

In [None]:
import google.generativeai as genai, time, os
genai.configure(api_key=os.environ["GOOGLE_API_KEY"])
m = genai.GenerativeModel("gemini-2.5-flash-lite")
t=time.time()
print(m.generate_content("Say only: ready").text, "‚è±", round(time.time()-t,2), "s")

UPLOAD FILESAND BUILD INDEX

In [None]:
from google.colab import files

def upload_files():
    print("Select PDFs/TXT/MD files...")
    uploaded = files.upload()
    paths = []
    for name, data in uploaded.items():
        path = f"/content/{name}"
        with open(path, "wb") as f:
            f.write(data)
        paths.append(path)
    return paths

def build_index_from_paths(paths):
    docs = load_texts_from_paths(paths)
    chunks = chunk_docs(docs)
    rag.build(chunks)
    print(f"Indexed {len(chunks)} chunks from {len(docs)} files.")

In [None]:
# 1) Pick your PDFs / TXT / MD from your computer
paths = upload_files()                 # opens a file picker

# 2) Build the index (chunk + embed + FAISS)
build_index_from_paths(paths)

# 3) Sanity checks
print("Files:", paths)
print("Chunks indexed:", len(rag.chunks))
print("First chunk preview:\n", rag.chunks[0]["text"][:500] if rag.chunks else "No chunks")

GRADIO CHAT OVERLAY

In [None]:
import os
import gradio as gr

def read_text_file_to_string(file_path: str) -> str:
    """
    Reads a text file from a filepath and returns its contents as a string.
    """
    if not file_path:
        return ""
    with open(file_path, "r", encoding="utf-8", errors="replace") as f:
        return f.read()

def _gradio_story_eval(story_text: str, mission: str = ""):
    """
    Runs an Impact-Studios-style evaluation, but grounded in your RAG index
    when available.
    """
    if not story_text.strip():
        return "‚ö†Ô∏è Please enter a valid story, concept, or idea."

    # --- Build the creative directive prompt ---
    base_prompt = """
    You are an editorial advisor focused on community engagement and ethical publication. Read the following piece of writing
    and identify the communities, audiences, or individuals who may be directly affected by its themes.Then provide specific,
    actionable suggestions for how the author can responsibly and meaningfully reach out to or
    support those communities through or alongside the text.

    Your feedback must:
    -Be concrete (e.g., exact additions, placements, wording ideas, or resources to include).
    -Explain why each suggestion is appropriate for the content.
    -Address ethical considerations such as harm reduction, accessibility, and care for vulnerable readers when relevant.

    Avoid generic advice like ‚Äúbe sensitive‚Äù or ‚Äúraise awareness.‚Äù

    When recommending a resource, make sure to provide a way to access it such as a phone number or website.

    Do not suggest any changes to the origional content like "consider altering the ending" or "elaborate on this".

    When users share a script, concept, or idea, your job is to:
    1. Analyze the submission‚Äôs tone, themes, and emotional depth.
    2. Determine the topics that are discussed in the submission.
    3. Provide clear reasoning for your evaluation.
    4. Provide specific, actionable suggestions for how the author can responsibly and meaningfully reach out to or support those communities through or alongside the text.
    5. If any information is provided, integrate it naturally into your evaluation.

    Respond in this format:

    Topics identified:
    (List of topics identified in the submission. Each topic should be listed with at least one quote from the submission that exemplifies the listed topic and at least one specific, actionable recommendations for outreach and support actions)
    """

    # --- Combine with the optional mission ---
    if mission and mission.strip():
        user_prompt = f"{base_prompt}\n\nCurrent Studio Mission:\n{mission}\n\nUser Submission:\n{story_text}"
    else:
        user_prompt = f"{base_prompt}\n\nUser Submission:\n{story_text}"

    print("Using backend:", GEN_BACKEND)

    # --- If you have RAG docs, retrieve and inject context ---
    context_text = ""
    if "rag" in globals() and getattr(rag, "chunks", []):
        hits = rag.search(story_text, k=TOP_K)
        context_text = render_context(hits)
        print(f"Injected {len(hits)} RAG chunks as grounding context.")
    else:
        print("No RAG index loaded ‚Äî using prompt only.")

    # --- Build full combined prompt for Gemini/OpenAI/local ---
    full_prompt = (
        ANSWER_TEMPLATE.format(
            system=SYSTEM_PROMPT,
            context=context_text or "[No extra context]",
            question=user_prompt,
        )
    )

    out = answer_question(full_prompt)
    answer = out["answer"]

    # --- Append citation summary if RAG used ---
    cites = []
    if context_text:
        for rank, (_, ch) in enumerate(out["top_chunks"], start=1):
            cites.append(f"Chunk {rank} ‚Äî {os.path.basename(ch['source'])}#{ch['chunk_id']}")
    suffix = ("\n\n---\nSources: " + "; ".join(cites)) if cites else ""

    return (answer or "[Empty answer]") + suffix


CUSTOM_CSS = """
body, .gradio-container {
    background-color: rgb(106, 179, 81) !important;
}
"""

# === Gradio UI ===
with gr.Blocks(css=CUSTOM_CSS) as demo:
    gr.Markdown("## üé¨ Impact Studios RAG Evaluator")
    gr.Markdown(
        "Analyze stories or concepts to see how well they uplift humanity ‚Äî "
        "grounded in your uploaded reference docs if available."
    )

    with gr.Row():
        # LEFT COLUMN ‚Äî uploader
        with gr.Column(scale=4):
            uploaded_txt = gr.File(
                label="Script Upload",
                file_types=[".txt"],
                type="filepath",
                height=160,
            )

        # RIGHT COLUMN ‚Äî input
        with gr.Column(scale=4):
            story = gr.Textbox(
                label="Enter your story, script, or concept",
                placeholder="Example: A young girl follows a rabbit into a strange world that challenges her perception of reality...",
                lines=8,
            )

    output = gr.Textbox(label="Impact Evaluation", lines=12)
    btn = gr.Button("Evaluate")

    uploaded_txt.change(
        fn=read_text_file_to_string,
        inputs=uploaded_txt,
        outputs=story,
    )

    btn.click(_gradio_story_eval, inputs=[story], outputs=output)

print("Launching Impact Studios Evaluator‚Ä¶")
demo.launch(share=True, debug=True)

btn.click(_gradio_story_eval, inputs=[story], outputs=output)


In [None]:
#Use this if first code block fails
import os, gradio as gr

def _gradio_ask(q: str):
    if not q.strip():
        return "Please enter a question."
    print("Using backend:", GEN_BACKEND)  # shows in Colab logs
    out = answer_question(q)
    cites = []
    for rank, (_, ch) in enumerate(out["top_chunks"], start=1):
        cites.append(f"Chunk {rank} ‚Äî {os.path.basename(ch['source'])}#{ch['chunk_id']}")
    suffix = ("\n\n---\nSources: " + "; ".join(cites)) if cites else ""
    return (out["answer"] or "[Empty answer]") + suffix

with gr.Blocks() as demo:
    gr.Markdown("### RAG Chat ‚Äî ask questions grounded in your uploaded docs")
    q = gr.Textbox(label="Your question")
    a = gr.Markdown(label="Answer")
    btn = gr.Button("Ask")
    btn.click(_gradio_ask, inputs=q, outputs=a)

print("Launching RAG chat‚Ä¶")
demo.launch(share=True, debug=True)