In [None]:
import os, asyncio, gradio as gr
from langgraph.graph import StateGraph, END

from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.checkpoint.memory import MemorySaver

import os
import time
from pathlib import Path
from typing import TypedDict, List, Optional, Literal
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeoutError

In [None]:
llm = ChatOpenAI(
    model="openai/gpt-oss-20b:free",
    api_key=os.getenv("OPENAI_API_KEY"),
    base_url="https://openrouter.ai/api/v1"
)

In [None]:
SYSTEM_PROMPT = """
You are the LinkedIn Post Agent.
Your goal is to help the user create a LinkedIn post based on the given topic.

Steps:
1. If the user's topic is unclear â†’ ask clarifying questions first.
2. Once enough info is collected â†’ write a LinkedIn post.
3. Post must be:
   - At least 200 words
   - In markdown
   - Engaging, insightful, formatted
   - Include a catchy title
   - Include relevant hashtags at the end
"""


In [None]:
class State(TypedDict):
    messages: list

async def llm_node(state: State):
    response = await llm.ainvoke(state["messages"])
    return {"messages": state["messages"] + [response]}



In [None]:
PROFILE_DIR = Path.home() / ".linkedin_playwright_profile"

def ensure_profile_dir(path: Path):
    path.mkdir(parents=True, exist_ok=True)
    return str(path)

def _set_contenteditable_text(page, selector: str, text: str):
    page.eval_on_selector(
        selector,
        """(el, value) => {
            el.focus();
            el.innerText = value;
            el.dispatchEvent(new InputEvent('input', {bubbles: true}));
            el.dispatchEvent(new Event('change', {bubbles: true}));
        }""",
        text,
    )

def _try_selectors(page, selectors, timeout=3000):
    for sel in selectors:
        try:
            page.wait_for_selector(sel, timeout=timeout)
            return sel
        except PWTimeoutError:
            continue
    return None

def post_to_linkedin(
    post_text: str,
    image_paths: Optional[List[str]] = None,
    video_paths: Optional[List[str]] = None,
    profile_dir: Optional[str] = None,
    headless: bool = False,
    wait_for_manual_login: bool = False,
    navigation_timeout: int = 15000,
) -> dict:
    profile_dir = ensure_profile_dir(Path(profile_dir or PROFILE_DIR))
    image_paths = image_paths or []
    video_paths = video_paths or []

    with sync_playwright() as p:
        browser = p.chromium
        context = browser.launch_persistent_context(
            user_data_dir=profile_dir,
            headless=headless,
            args=["--disable-dev-shm-usage"]
        )
        page = context.new_page()
        page.set_default_timeout(navigation_timeout)

        try:
            if wait_for_manual_login:
                page.goto("https://www.linkedin.com", wait_until="networkidle")
                print("Please log in to LinkedIn in the opened browser window.")
                input("Press Enter after login is complete...")
                context.close()
                return {"success": True, "message": "Manual login completed."}

            page.goto("https://www.linkedin.com/feed/", wait_until="networkidle")

            # Start post
            start_sel = _try_selectors(page, [
                "button[aria-label='Start a post']",
                "div.share-box__open",
                "button.share-box__open",
                "button[aria-label='Create a post']"
            ], timeout=8000)
            if not start_sel:
                context.close()
                return {"success": False, "message": "Cannot find 'Start a post' button."}
            page.click(start_sel)

            # Post textbox
            textbox_selector = _try_selectors(page, [
                "div[role='textbox']",
                "div.editor-content[contenteditable='true']"
            ], timeout=8000)
            if not textbox_selector:
                context.close()
                return {"success": False, "message": "Cannot find post textbox."}
            _set_contenteditable_text(page, textbox_selector, post_text)

            # Upload images
            if image_paths:
                file_input_sel = _try_selectors(page, [
                    "input[type='file'][accept*='image']",
                    "input[type='file'][accept*='image/*']",
                    "input[type='file']"
                ], timeout=4000)
                if file_input_sel:
                    page.set_input_files(file_input_sel, image_paths)
                    time.sleep(1.5)

            # Upload videos
            if video_paths:
                file_input_sel_vid = _try_selectors(page, [
                    "input[type='file'][accept*='video']",
                    "input[type='file'][accept*='video/*']",
                    "input[type='file']"
                ], timeout=4000)
                if file_input_sel_vid:
                    page.set_input_files(file_input_sel_vid, video_paths)
                    time.sleep(2.5)

            # Post button
            post_btn = _try_selectors(page, [
                "button[aria-label='Post']",
                "button[aria-label='Share']",
                "button[data-control-name='share.post']",
                "button[data-test-post-button]",
                "div.share-box-actions button"
            ], timeout=8000)
            if not post_btn:
                context.close()
                return {"success": False, "message": "Cannot find Post button."}
            page.click(post_btn)
            time.sleep(2)
            context.close()
            return {"success": True, "message": "Post submitted."}

        except Exception as e:
            try: context.close()
            except: pass
            return {"success": False, "message": f"Exception: {e}"}


In [None]:
# ----------------------
async def linkedin_post_tool(state: State, post_text: str, image_paths=None, video_paths=None, do_post=False):
    if not do_post:
        return {"messages": state["messages"] + [{"content": "Post skipped (do_post=False)."}]}
    result = post_to_linkedin(post_text=post_text, image_paths=image_paths, video_paths=video_paths, wait_for_manual_login=False)
    state["messages"].append({"content": f"LinkedIn posting result: {result['message']}"})
    return {"messages": state["messages"]}


In [None]:
builder = StateGraph(State)
builder.add_node("llm", llm_node)
builder.add_node("linkedin_post", linkedin_post_tool)

def route_to_post(state: State) -> Literal["linkedin_post", END]:
    for msg in state["messages"]:
        if "APPROVE_POST" in msg.content:
            return "linkedin_post"
    return END

builder.set_entry_point("llm")
builder.add_conditional_edges("llm", route_to_post)
builder.add_edge("linkedin_post", END)

graph = builder.compile()


In [None]:
from typing import Literal

def route_to_post(state: State):
    for msg in state["messages"]:
        if "APPROVE_POST" in msg.content:  # âœ… access content attribute
            return "linkedin_post"
    return END




In [None]:
with gr.Blocks(title="LinkedIn Post Agent") as ui:
    gr.Markdown("## ðŸ“¢ LinkedIn Post Agent\nChat with the AI and upload content for your post.")

    with gr.Row():
        # LEFT PANEL
        with gr.Column(scale=2):
            chatbot = gr.Chatbot(label="Conversation", height=450)
            user_input = gr.Textbox(placeholder="Type topic/feedback...", label="Your Message")
            image_file = gr.File(label="Upload Image", file_types=["image"])
            submit_btn = gr.Button("Send Message", variant="primary")
            done_btn = gr.Button("Done (Preview Upload)", variant="secondary")
        # RIGHT PANEL
        with gr.Column(scale=1):
            gr.Markdown("### ðŸ“Ž Uploaded Media Preview")
            image_preview = gr.Image(label="Image Preview", height=250)

    # -----------------------------
    # Chat + AI handler
    # -----------------------------
    def on_user_message(user_message, history, img_file):
        history_text = [h[0] for h in history]  # user messages only
        messages = [SystemMessage(content=SYSTEM_PROMPT)]
        for h in history_text:
            messages.append(HumanMessage(content=h))
        messages.append(HumanMessage(content=user_message))

        # Use the correct graph variable
        result = asyncio.run(graph.ainvoke({"messages": messages}))

        ai_response = result["messages"][-1].content


        history.append((user_message, ai_response))

        return history, img_file

    submit_btn.click(
        on_user_message,
        inputs=[user_input, chatbot, image_file],
        outputs=[chatbot, image_preview]
    )

    # -----------------------------
    # File preview
    # -----------------------------
    def process_uploaded_files(img_file):
        path = img_file.name if img_file else None
        print("Image path:", path)
        return path

    def preview_files(img_file):
        path = process_uploaded_files(img_file)
        return img_file  # or path if you want to show file path

    done_btn.click(preview_files, inputs=[image_file], outputs=[image_preview])

# Launch UI
ui.launch()
