<a href="https://colab.research.google.com/github/salimaliraja/salimaliraja/blob/main/SalimAliraja_without_streamlit04112025.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [7]:
# Class work:
#Base yourself on the multi-agent design principles to build a multi-agent linked-in blog writing system.

#The components/sub-agents will be:

#Web search
#Blog writer (that writes based on web search)
#SEO review and improvement
#You can use a different multi-agent design pattern. Remeber to use context to save data for intermittent steps in the workflow.

!pip install streamlit
import streamlit as st
import sqlite3
import json
import time
from typing import List, Dict, Any, Optional
import requests
import os

# Optional imports for LLMs
try:
    import openai
except Exception:
    openai = None

try:
    from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM
except Exception:
    pipeline = None

# ---------------------------
# Simple persistent context (SQLite)
# ---------------------------
DB_PATH = "rag_agents_context.db"

def init_db():
    conn = sqlite3.connect(DB_PATH)
    cur = conn.cursor()
    cur.execute(
        """CREATE TABLE IF NOT EXISTS context (
               id TEXT PRIMARY KEY,
               payload TEXT,
               updated_at REAL
           )"""
    )
    conn.commit()
    conn.close()

def save_context(key: str, payload: Dict[str, Any]):
    conn = sqlite3.connect(DB_PATH)
    cur = conn.cursor()
    cur.execute(
        "REPLACE INTO context (id, payload, updated_at) VALUES (?, ?, ?)",
        (key, json.dumps(payload), time.time()),
    )
    conn.commit()
    conn.close()

def load_context(key: str) -> Optional[Dict[str, Any]]:
    conn = sqlite3.connect(DB_PATH)
    cur = conn.cursor()
    cur.execute("SELECT payload FROM context WHERE id = ?", (key,))
    row = cur.fetchone()
    conn.close()
    if row:
        return json.loads(row[0])
    return None

# ---------------------------
# Agent base class
# ---------------------------
class Agent:
    def __init__(self, name: str):
        self.name = name

    def run(self, ctx: Dict[str, Any]) -> Dict[str, Any]:
        """
        Executes agent's task, updates ctx and returns it.
        Must be overridden.
        """
        raise NotImplementedError

# ---------------------------
# Web Search Agent
# ---------------------------

class WebSearchAgent(Agent):
    """
    WebSearchAgent supports:
      - SerpAPI (if serpapi_key provided)
      - Bing Web Search (if bing_api_key provided)
      - Manual fallback (user-pasted results)
    """
    def __init__(self, serpapi_key: Optional[str]=None, bing_api_key: Optional[str]=None):
        super().__init__("web_search")
        self.serpapi_key = serpapi_key
        self.bing_api_key = bing_api_key

    def serpapi_search(self, query:str, num:int=5) -> List[Dict[str,str]]:
        url = "https://serpapi.com/search.json"
        params = {"q": query, "engine": "google", "num": num, "api_key": self.serpapi_key}
        resp = requests.get(url, params=params, timeout=15)
        resp.raise_for_status()
        data = resp.json()
        results = []
        for r in data.get("organic_results", [])[:num]:
            results.append({"title": r.get("title"), "snippet": r.get("snippet"), "link": r.get("link")})
        return results

    def bing_search(self, query:str, num:int=5) -> List[Dict[str,str]]:
        # Uses Bing Web Search v7 endpoint (requires API key and correct endpoint)
        subscription_key = self.bing_api_key
        search_url = "https://api.bing.microsoft.com/v7.0/search"
        headers = {"Ocp-Apim-Subscription-Key": subscription_key}
        params = {"q": query, "count": num}
        resp = requests.get(search_url, headers=headers, params=params, timeout=15)
        resp.raise_for_status()
        data = resp.json()
        results = []
        for r in data.get("webPages", {}).get("value", [])[:num]:
            results.append({"title": r.get("name"), "snippet": r.get("snippet"), "link": r.get("url")})
        return results

    def run(self, ctx: Dict[str, Any]) -> Dict[str, Any]:
        query = ctx.get("topic", "")
        num = ctx.get("search_num", 5)
        search_results = []
        try:
            if self.serpapi_key:
                search_results = self.serpapi_search(query, num)
            elif self.bing_api_key:
                search_results = self.bing_search(query, num)
            else:
                # No API keys - expect manual input in ctx["manual_search"]
                search_results = ctx.get("manual_search", [])
        except Exception as e:
            ctx.setdefault("logs", []).append(f"WebSearchAgent error: {e}")
            search_results = ctx.get("manual_search", [])

        ctx["search_results"] = search_results
        ctx.setdefault("logs", []).append(f"WebSearchAgent: retrieved {len(search_results)} items.")
        save_context("current_job", ctx)
        return ctx

# ---------------------------
# Blog Writer Agent
# ---------------------------
class BlogWriterAgent(Agent):
    def __init__(self, llm_backend: str="openai", openai_key: Optional[str]=None, hf_model: Optional[str]=None):
        super().__init__("blog_writer")
        self.llm_backend = llm_backend
        self.openai_key = openai_key
        self.hf_model = hf_model
        if self.llm_backend == "openai" and openai and openai_key:
            openai.api_key = openai_key

    def _prompt_from_search(self, topic:str, results:List[Dict[str,str]]) -> str:
        snippets = "\n\n".join([f"- {r.get('title','')} : {r.get('snippet','')}\nLink: {r.get('link','')}" for r in results])
        prompt = (
            f"Write a LinkedIn-style blog post (~500-800 words) about: {topic}.\n\n"
            "Use the following sources and weave them into a clear, engaging narrative with practical takeaways, headings, and a short conclusion.\n\n"
            f"Sources:\n{snippets}\n\n"
            "Style: professional, friendly, slightly conversational. Include an engaging hook, 3-5 short sections (with headings), "
            "practical tips, and a 1-2 line call-to-action at the end. Keep sentences concise for LinkedIn readers."
        )
        return prompt

    def _generate_openai(self, prompt:str, max_tokens:int=700) -> str:
        if not openai:
            raise RuntimeError("OpenAI package not installed.")
        resp = openai.Completion.create(
            engine="text-davinci-003",
            prompt=prompt,
            max_tokens=max_tokens,
            temperature=0.7,
            top_p=0.9,
            n=1,
            stop=None
        )
        text = resp.choices[0].text.strip()
        return text

    def _generate_hf(self, prompt:str, max_new_tokens:int=700) -> str:
        if pipeline is None:
            raise RuntimeError("Transformers not available.")
        # Lightweight pipeline (you can change model)
        gen = pipeline("text-generation", model=self.hf_model or "gpt2", device=0 if os.environ.get("CUDA_VISIBLE_DEVICES") else -1)
        out = gen(prompt, max_length=len(prompt.split()) + max_new_tokens, do_sample=True, top_p=0.9, temperature=0.7, num_return_sequences=1)
        return out[0]["generated_text"][len(prompt):].strip() if isinstance(out[0]["generated_text"], str) else str(out)

    def run(self, ctx: Dict[str, Any]) -> Dict[str, Any]:
        topic = ctx.get("topic", "Untitled")
        results = ctx.get("search_results", [])
        prompt = self._prompt_from_search(topic, results)
        ctx["blog_prompt"] = prompt
        try:
            if self.llm_backend == "openai" and self.openai_key:
                blog = self._generate_openai(prompt)
            else:
                blog = self._generate_hf(prompt)
        except Exception as e:
            ctx.setdefault("logs", []).append(f"BlogWriterAgent error: {e}")
            blog = ctx.get("manual_draft", "## Draft\n\n(Provide manual draft here)")
        ctx["draft"] = blog
        ctx.setdefault("logs", []).append("BlogWriterAgent: draft generated.")
        save_context("current_job", ctx)
        return ctx

# ---------------------------
# SEO Reviewer / Improver Agent
# ---------------------------
class SEOAgent(Agent):
    def __init__(self, llm_backend: str="openai", openai_key:Optional[str]=None, hf_model:Optional[str]=None):
        super().__init__("seo_reviewer")
        self.llm_backend = llm_backend
        self.openai_key = openai_key
        self.hf_model = hf_model
        if self.llm_backend == "openai" and openai and openai_key:
            openai.api_key = openai_key

    def _seo_prompt(self, draft:str, topic:str, keywords:List[str]=None) -> str:
        keys = ", ".join(keywords) if keywords else "None"
        prompt = (
            "You are an SEO expert and copywriter. Improve the following LinkedIn blog post for SEO and readability.\n\n"
            f"Topic: {topic}\n"
            f"Target keywords: {keys}\n\n"
            "Tasks:\n"
            "1) Suggest a catchy title (<= 12 words).\n"
            "2) Provide 5 meta description options (max 155 chars each).\n"
            "3) Propose 8 short, relevant hashtags.\n"
            "4) Provide an improved blog draft focusing on clarity, headings, and keyword inclusion. Keep LinkedIn tone.\n\n"
            "Original draft:\n" + draft + "\n\n"
            "Output in JSON with keys: title, meta_descriptions (list), hashtags (list), improved_draft."
        )
        return prompt

    def _call_llm(self, prompt:str) -> str:
        # Use same OpenAI/text-davinci-003 pattern; otherwise HF
        if self.llm_backend == "openai" and self.openai_key:
            if not openai:
                raise RuntimeError("OpenAI package missing.")
            resp = openai.Completion.create(engine="text-davinci-003", prompt=prompt, max_tokens=800, temperature=0.6)
            return resp.choices[0].text.strip()
        else:
            if pipeline is None:
                raise RuntimeError("Transformers not available.")
            gen = pipeline("text-generation", model=self.hf_model or "gpt2", device=0 if os.environ.get("CUDA_VISIBLE_DEVICES") else -1)
            out = gen(prompt, max_length=len(prompt.split()) + 800, do_sample=True, top_p=0.9, temperature=0.6, num_return_sequences=1)
            return out[0]["generated_text"]

    def run(self, ctx: Dict[str, Any]) -> Dict[str, Any]:
        draft = ctx.get("draft", "")
        topic = ctx.get("topic", "")
        keywords = ctx.get("keywords", [])
        prompt = self._seo_prompt(draft, topic, keywords)
        ctx["seo_prompt"] = prompt
        try:
            raw = self._call_llm(prompt)
            # Try to parse JSON; if parse fails, attempt to find JSON in text
            parsed = None
            try:
                parsed = json.loads(raw)
            except Exception:
                # Try to extract first {...} block
                start = raw.find("{")
                end = raw.rfind("}") + 1
                if start != -1 and end != -1:
                    try:
                        parsed = json.loads(raw[start:end])
                    except Exception:
                        parsed = None
            if parsed:
                ctx["seo"] = parsed
                ctx.setdefault("logs", []).append("SEOAgent: parsed JSON output.")
            else:
                ctx.setdefault("logs", []).append("SEOAgent: couldn't parse JSON; saving raw output.")
                ctx["seo_raw"] = raw
        except Exception as e:
            ctx.setdefault("logs", []).append(f"SEOAgent error: {e}")
        save_context("current_job", ctx)
        return ctx

# ---------------------------
# Orchestrator
# ---------------------------
class Orchestrator:
    def __init__(self, agents: List[Agent]):
        self.agents = agents

    def run(self, ctx: Dict[str, Any]) -> Dict[str, Any]:
        for agent in self.agents:
            st.session_state["last_agent"] = agent.name
            st.experimental_rerun() if False else None  # no-op but shows point of extension
            ctx = agent.run(ctx)
        return ctx

# ---------------------------
# Streamlit UI
# ---------------------------
st.set_page_config(page_title="Multi-agent LinkedIn Blog Writer", layout="wide")
st.title("Multi-Agent LinkedIn Blog Writer â€” Prototype")

init_db()

with st.sidebar:
    st.header("Job Config")
    topic = st.text_input("Blog topic / prompt", value="How to build resilient teams in hybrid work")
    search_num = st.number_input("Search results to retrieve", min_value=1, max_value=20, value=5)
    st.markdown("**Web Search API keys (optional)**")
    serpapi_key = st.text_input("SerpAPI key", value="", type="password")
    bing_api_key = st.text_input("Bing API key", value="", type="password")
    st.markdown("**LLM settings**")
    llm_provider = st.selectbox("LLM backend", options=["openai", "huggingface"], index=0)
    openai_key = st.text_input("OpenAI API key", value="", type="password") if llm_provider=="openai" else ""
    hf_model = st.text_input("HF model id (if huggingface)", value="gpt2") if llm_provider=="huggingface" else ""
    st.markdown("**Optional SEO keywords (comma separated)**")
    keywords_text = st.text_input("Keywords", value="resilience,hybrid teams,leadership")
    st.markdown("---")
    st.button("Save config", key="save_conf")

# read manual search or manual draft from UI
st.header("Inputs & Manual Overrides")
col1, col2 = st.columns([1,1])
with col1:
    st.subheader("Manual search results (optional)")
    manual_search_json = st.text_area("Paste JSON list of {title,snippet,link} or leave blank", height=140)
    manual_search = []
    if manual_search_json.strip():
        try:
            manual_search = json.loads(manual_search_json)
        except Exception as e:
            st.error("Manual search JSON parse error: " + str(e))
with col2:
    st.subheader("Manual draft (optional fallback)")
    manual_draft = st.text_area("Paste a manual draft to use if LLM fails", height=140)

# Job control
if "current_job" not in st.session_state:
    st.session_state.current_job = {"topic": topic, "search_num": search_num, "keywords": [k.strip() for k in keywords_text.split(",") if k.strip()]}
else:
    # update topic/search_num/keywords when user changes
    st.session_state.current_job.update({"topic": topic, "search_num": search_num, "keywords": [k.strip() for k in keywords_text.split(",") if k.strip()]})

ctx = st.session_state.current_job
if manual_search:
    ctx["manual_search"] = manual_search
if manual_draft:
    ctx["manual_draft"] = manual_draft

# Show saved context if exists
st.subheader("Current context (persisted)")
saved = load_context("current_job")
st.write(saved if saved else ctx)

# Agent instantiation
web_agent = WebSearchAgent(serpapi_key=serpapi_key or None, bing_api_key=bing_api_key or None)
blog_agent = BlogWriterAgent(llm_backend=llm_provider, openai_key=openai_key or None, hf_model=hf_model or None)
seo_agent = SEOAgent(llm_backend=llm_provider, openai_key=openai_key or None, hf_model=hf_model or None)

orch = Orchestrator([web_agent, blog_agent, seo_agent])

st.markdown("---")
if st.button("Run full pipeline"):
    with st.spinner("Running multi-agent pipeline..."):
        ctx = dict(ctx)  # copy
        ctx = orch.run(ctx)
        st.success("Pipeline finished.")
        save_context("current_job", ctx)
        st.session_state.current_job = ctx

if st.button("Run only web search"):
    with st.spinner("Running web search..."):
        ctx = web_agent.run(ctx)
        st.success("Web search finished.")
        save_context("current_job", ctx)
        st.session_state.current_job = ctx

if st.button("Run blog writer"):
    with st.spinner("Running blog writer..."):
        ctx = blog_agent.run(ctx)
        st.success("Blog writer finished.")
        save_context("current_job", ctx)
        st.session_state.current_job = ctx

if st.button("Run SEO reviewer"):
    with st.spinner("Running SEO reviewer..."):
        ctx = seo_agent.run(ctx)
        st.success("SEO reviewer finished.")
        save_context("current_job", ctx)
        st.session_state.current_job = ctx

# Results display
st.markdown("---")
st.header("Results")
if ctx.get("search_results"):
    st.subheader("Search results")
    for i, r in enumerate(ctx["search_results"], 1):
        st.markdown(f"**{i}. {r.get('title')}**")
        st.write(r.get("snippet"))
        st.write(r.get("link"))

if ctx.get("blog_prompt"):
    st.subheader("Blog prompt used")
    st.code(ctx["blog_prompt"])

if ctx.get("draft"):
    st.subheader("Generated Draft")
    st.write(ctx["draft"])

if ctx.get("seo"):
    st.subheader("SEO Output (parsed)")
    st.write(ctx["seo"])
elif ctx.get("seo_raw"):
    st.subheader("SEO Output (raw)")
    st.write(ctx["seo_raw"])

if ctx.get("logs"):
    st.subheader("Logs")
    for log in ctx["logs"]:
        st.write("-", log)

st.markdown("---")
st.write("Persisted context will be stored in `rag_agents_context.db`. You can extend the agents, replace the LLM and retriever modules, or connect a vector store for better retrieval.")




