diff --git a/README.md b/README.md
index ea839f1..b4038ac 100644
--- a/README.md
+++ b/README.md
@@ -84,6 +84,31 @@
---
+
+
+
π News
+We have published RunAgent Pulse
+
+
+
+
+- **[2025.12] π― [New Product]** Published **RunAgent Pulse β Scheduling & Orchestration**, a self-hosted βGoogle Calendar for your AI agentsβ.
+- **[2025.12] π― [Integration]** Integrated the **PaperFlow** arXiv research agent with **RunAgent Serverless** and **RunAgent Pulse** for end-to-end scheduled arXiv monitoring and email notifications.
+
+## What is RunAgent-Pulse?
+
+### RunAgent Pulse is a Google Calendar for Your AI Agents
+
+A lightweight, **self-hosted scheduling service** designed for AI agents and developers.
+Schedule agent executions with **second-level precision**, **natural language scheduling**, and **seamless integration with RunAgent Serverless**.
+Weβve unveiled this as a companion project:
+
+- GitHub: [RunAgent-Pulse](https://github.com/runagent-dev/runagent-pulse)
+
+Use it together with this repo to deploy agents (RunAgent) in our serverless cloud and then **orchestrate/schedule** them (Pulse).
+
+---
+
## What is RunAgent?
RunAgent is an agentic ecosystem that enables developers to build AI agents once in Python using any python agentic frameworks like LangGraph, CrewAI, Letta, LlamaIndex, then access them natively from any programming language. The platform features stateful self-learning capabilities with RunAgent Memory (coming soon), allowing agents to retain context and improve it's action memory over time.
diff --git a/examples/paper-flow-aritra/README.md b/examples/paper-flow-aritra/README.md
new file mode 100644
index 0000000..5ae387e
--- /dev/null
+++ b/examples/paper-flow-aritra/README.md
@@ -0,0 +1,239 @@
+# PaperFlow β ArXiv Paper Monitor (RunAgent Serverless)
+
+PaperFlow is a RunAgent serverless-compatible agent that:
+
+- Monitors **arXiv** for papers matching your research topics
+- Uses **OpenAI** to filter for **highly relevant** papers
+- Tracks already-seen papers in a **persistent cache**
+- Optionally sends you **email digests** of new relevant papers
+- Can be **scheduled** via RunAgent Pulse to run automatically (e.g. daily)
+
+---
+
+## What the agent does
+
+- **Query arXiv** for each topic you provide (`topics`, `max_results`, `days_back`)
+- For each paper:
+ - Filter by **date range**
+ - Run an **LLM relevance check** (YES/NO only) via OpenAI (async/parallel)
+ - Maintain a **cache** of relevant paper IDs in `paper_cache/relevant_papers.txt`
+- **Email notifications**:
+ - When new relevant papers are found (not previously in the cache)
+ - Sends you a nicely formatted email (title, date, link per paper)
+
+The agent returns a result dict like:
+
+- `status`
+- `total_processed`
+- `total_relevant`
+- `new_papers`
+- `cached_hits`
+- `llm_calls`
+- `email_sent`
+- `papers` (list of formatted strings)
+
+---
+
+## Local testing (without RunAgent serverless)
+
+From this folder:
+
+```bash
+cd /home/azureuser/runagent/examples/paper-flow-aritra
+pip install -r requirements.txt
+
+# Make sure these env vars are set (e.g. via .env or shell)
+export OPENAI_API_KEY=...
+export USER_EMAIL=you@example.com
+export SMTP_USERNAME=your_smtp_user
+export SMTP_PASSWORD=your_smtp_password
+```
+
+Run the built-in test script:
+
+```bash
+python test_agent.py basic # Basic functionality test
+python test_agent.py sdk # Test entrypoints (check_papers, check_papers_custom_topics)
+python test_agent.py openai # Test OpenAI connectivity
+python test_agent.py email # Test email config detection
+python test_agent.py all # Run the full test suite (interactive)
+```
+
+This verifies:
+
+- OpenAI connectivity
+- Email configuration
+- Cache read/write
+- Async entrypoint `check_papers_async`
+
+---
+
+## Deploying to RunAgent Serverless
+
+From the **runagent repo root**:
+
+```bash
+cd /home/azureuser/runagent
+runagent deploy /home/azureuser/runagent/examples/paper-flow-aritra
+```
+
+This will:
+
+- Build the agent bundle from `agent.py` + `runagent.config.json`
+- Upload it to RunAgent Cloud
+- Start a **micro VM** for this agent when executed
+
+The deployed agent id is stored in:
+
+- `.deployments/.json`
+- `runagent.config.json` β `agent_id`
+
+Make sure your `.env` / cloud env includes:
+
+- `OPENAI_API_KEY`
+- `USER_EMAIL`
+- `SMTP_SERVER` / `SMTP_PORT`
+- `SMTP_USERNAME`
+- `SMTP_PASSWORD`
+
+### π§ Gmail SMTP β App Password Setup
+
+If you use **Gmail**, you **cannot** use your normal Gmail password for SMTP.
+You must create a **Gmail App Password**:
+
+1. Open your Google Account: `https://myaccount.google.com`
+2. Go to **Security β 2-Step Verification** and **enable 2FA** (if not already enabled)
+3. After 2FA is enabled, go to **Security β 2-Step Verification β App passwords**
+4. Click **App passwords** (usually at the bottom of the page)
+5. In the dialog:
+ - Select app: **Mail**
+ - Select device: **Other**, type something like `"PaperFlow"`
+ - Click **Generate**
+6. Copy the **16-character** app password (looks like: `xxxx xxxx xxxx xxxx`)
+7. Use this value as:
+ - `SMTP_USERNAME` = your Gmail address (e.g. `you@gmail.com`)
+ - `SMTP_PASSWORD` = the **16-character app password**
+
+Do **not** commit these values to git. Prefer environment variables or a `.env` file mounted into the VM.
+
+---
+
+## Running via Python SDK (`client_test_paperflow.py`)
+
+The agent exposes **only one serverless entrypoint**:
+- **`check_papers_async`** β async + parallel processing (fast, production mode)
+
+Example async/parallel invocation (from `runagent/test_scripts/python/client_test_paperflow.py`):
+
+```python
+from runagent import RunAgentClient
+
+client_async = RunAgentClient(
+ agent_id="62f7a781-71bb-4d62-a68f-24dc4f2bfd0b", # update to your agent_id
+ entrypoint_tag="check_papers_async", # async/parallel entrypoint
+ local=False,
+ user_id="prova4",
+ persistent_memory=True
+)
+
+result = client_async.run(
+ topics=["LLM finetuning"],
+ max_results=20,
+ days_back=100
+)
+
+print(f"Found {result['total_relevant']} relevant papers (async mode)")
+print(f"Email sent: {result['email_sent']}")
+```
+
+Use `local=False` to hit the deployed serverless agent, and always use:
+
+- **`entrypoint_tag="check_papers_async"`**
+
+---
+
+## (Optional) Local streaming experiments
+
+The codebase still contains some streaming helpers and a `client_test_paperflow_stream.py` example, but in **serverless** the supported entrypoint is only `check_papers_async`.
+You can still experiment with streaming locally if you want, but for deployed usage stick to the async entrypoint.
+
+---
+
+## Scheduling with RunAgent Pulse (`test_paperflow.py`)
+
+In `runagent-pulse/examples/test_paperflow.py` youβll find examples of scheduling this agent with **RunAgent Pulse**:
+
+- Configure:
+
+```python
+PULSE_SERVER_URL = "http://localhost:8000" # or your Pulse server
+AGENT_ID = "62f7a781-71bb-4d62-a68f-34dc4f2bfd0b" # the deployed PaperFlow agent id
+
+TOPICS = [
+ "fine-tuning vision language models"
+]
+```
+
+- Daily schedule:
+
+```python
+task = pulse.schedule_agent(
+ agent_id=AGENT_ID,
+ entrypoint_tag="check_papers_async",
+ when="daily at 9am",
+ params={
+ "topics": TOPICS,
+ "max_results": 20,
+ "days_back": 7,
+ "verbose": True,
+ },
+ executor_type="serverless",
+ user_id="paperflow_daily",
+ persistent_memory=True,
+)
+```
+
+- Recurring schedule:
+
+```python
+task = pulse.schedule_agent(
+ agent_id=AGENT_ID,
+ entrypoint_tag="check_papers_async",
+ when="in 3 minute",
+ params={
+ "topics": TOPICS,
+ "max_results": 20,
+ "days_back": 100,
+ "verbose": True,
+ },
+ executor_type="serverless",
+ user_id="paperflow_recurring",
+ persistent_memory=True,
+ repeat={
+ "interval": "10m", # e.g. every 10 minutes
+ "times": 1, # None = infinite
+ },
+)
+```
+
+This lets you:
+
+- Run PaperFlow **daily** as a digest
+- Or run **recurring checks** (e.g. every few hours) for fresh arXiv papers
+
+---
+
+## Summary
+
+- `agent.py` β core logic (arXiv queries, LLM filtering, caching, email)
+- `runagent.config.json` β RunAgent metadata, **async-only** entrypoint, agent id
+- `test_agent.py` β local testing harness (no serverless needed)
+- `client_test_paperflow.py` β Python SDK example (**async only**)
+- `runagent-pulse/examples/test_paperflow.py` β Pulse scheduling examples
+
+Once deployed with `runagent deploy`, you can:
+
+- Call the agent from Python or JS via the SDK using `check_papers_async`
+- Schedule periodic runs via Pulse for continuous paper monitoring
+
+
diff --git a/examples/paper-flow-aritra/agent.py b/examples/paper-flow-aritra/agent.py
new file mode 100644
index 0000000..1a916f4
--- /dev/null
+++ b/examples/paper-flow-aritra/agent.py
@@ -0,0 +1,835 @@
+"""
+PaperFlow Agent - RunAgent Serverless Compatible
+Monitors arXiv for relevant papers and sends email notifications
+Uses OpenAI for LLM filtering
+Supports async/parallel processing and streaming
+"""
+import requests
+import feedparser
+from datetime import datetime, timezone, timedelta
+import time
+import os
+import re
+import smtplib
+from email.mime.text import MIMEText
+from email.mime.multipart import MIMEMultipart
+from typing import List, Dict, Tuple, Set, Optional, AsyncIterator
+import asyncio
+from openai import OpenAI, AsyncOpenAI
+
+
+class ArxivAgent:
+ """ArXiv paper monitoring agent with OpenAI-based LLM filtering"""
+
+ def __init__(
+ self,
+ topics: List[str],
+ max_results: int = 10,
+ model: str = "gpt-4o-mini",
+ days_back: int = 7,
+ verbose: bool = False,
+ cache_dir: str = "paper_cache"
+ ):
+ self.topics = topics
+ self.max_results = max_results
+ self.model = model
+ self.days_back = days_back
+ self.verbose = verbose
+ self.cache_dir = cache_dir
+
+ # OpenAI configuration
+ self.openai_api_key = os.getenv("OPENAI_API_KEY", "")
+ if not self.openai_api_key:
+ raise ValueError("OPENAI_API_KEY environment variable is required")
+
+ self.client = OpenAI(api_key=self.openai_api_key)
+ self.async_client = AsyncOpenAI(api_key=self.openai_api_key)
+
+ # Email configuration from environment
+ self.user_email = os.getenv("USER_EMAIL", "")
+ self.smtp_server = os.getenv("SMTP_SERVER", "smtp.gmail.com")
+ self.smtp_port = int(os.getenv("SMTP_PORT", "587"))
+ self.smtp_username = os.getenv("SMTP_USERNAME", "")
+ self.smtp_password = os.getenv("SMTP_PASSWORD", "")
+
+ # Cache file path
+ os.makedirs(self.cache_dir, exist_ok=True)
+ self.cache_file = os.path.join(self.cache_dir, "relevant_papers.txt")
+
+ def extract_paper_id(self, entry) -> Optional[str]:
+ """Extract arXiv paper ID from entry"""
+ id_str = getattr(entry, 'id', getattr(entry, 'link', ''))
+ match = re.search(r'/(\d{4}\.\d{4,5})', id_str)
+ if match:
+ return match.group(1)
+ match = re.search(r'(\d{4}\.\d{4,5})', id_str)
+ if match:
+ return match.group(1)
+ return None
+
+ def load_cached_papers(self) -> Set[str]:
+ """Load existing relevant paper IDs from cache"""
+ if not os.path.exists(self.cache_file):
+ if self.verbose:
+ print(f"[DEBUG] Cache file '{self.cache_file}' does not exist, starting with empty cache")
+ return set()
+
+ paper_ids = set()
+ try:
+ with open(self.cache_file, 'r', encoding='utf-8') as f:
+ for line in f:
+ paper_id = line.strip()
+ if paper_id:
+ paper_ids.add(paper_id)
+ if self.verbose:
+ print(f"[DEBUG] Loaded {len(paper_ids)} paper IDs from cache")
+ except Exception as e:
+ print(f"[WARN] Failed to load cache: {e}")
+
+ return paper_ids
+
+ def save_paper_id(self, paper_id: str):
+ """Append a paper ID to cache"""
+ try:
+ with open(self.cache_file, 'a', encoding='utf-8') as f:
+ f.write(f"{paper_id}\n")
+ if self.verbose:
+ print(f"[DEBUG] Saved paper ID '{paper_id}' to cache")
+ except Exception as e:
+ print(f"[WARN] Failed to save paper ID: {e}")
+
+ def query_arxiv(self, keyword: str) -> List:
+ """Query arXiv API for papers"""
+ if self.verbose:
+ print(f"[DEBUG] Querying arXiv for: '{keyword}'")
+
+ try:
+ url = (
+ f"http://export.arxiv.org/api/query?"
+ f"search_query=all:{keyword.replace(' ', '+')}"
+ f"&start=0&max_results={self.max_results}"
+ f"&sortBy=submittedDate&sortOrder=descending"
+ )
+
+ resp = requests.get(url, timeout=15)
+ resp.raise_for_status() # Raise exception for bad status codes
+ feed = feedparser.parse(resp.text)
+
+ if self.verbose:
+ print(f"[DEBUG] Found {len(feed.entries)} entries")
+
+ return feed.entries
+ except requests.RequestException as e:
+ print(f"[WARN] Failed to query arXiv: {e}")
+ if self.verbose:
+ print(f"[DEBUG] Network error details: {type(e).__name__}: {str(e)}")
+ return []
+ except Exception as e:
+ print(f"[WARN] Error parsing arXiv feed: {e}")
+ if self.verbose:
+ print(f"[DEBUG] Parse error details: {type(e).__name__}: {str(e)}")
+ return []
+
+ def parse_llm_response(self, output: str) -> Tuple[bool, str]:
+ """Parse LLM response to determine relevance"""
+ if not output:
+ return False, "(empty)"
+
+ words = output.strip().split()
+ if not words:
+ return False, "(empty)"
+
+ first_word = words[0].strip().upper().rstrip('.,!?:;')
+
+ if first_word == "YES":
+ return True, first_word
+ elif first_word == "NO":
+ return False, first_word
+ else:
+ if self.verbose:
+ print(f"[WARN] Unexpected answer: '{first_word}' - treating as NOT RELEVANT")
+ return False, first_word
+
+ async def llm_filter_async(self, title: str, abstract: str) -> bool:
+ """Use OpenAI to filter papers for relevance (async for parallel processing)"""
+ if self.verbose:
+ print(f"[DEBUG] Filtering paper: '{title[:60]}...'")
+
+ # Create topics list for the prompt
+ topics_text = "\n".join([f"- {topic}" for topic in self.topics])
+
+ prompt = f"""You are filtering academic papers for relevance to specific research topics.
+
+The paper must be DIRECTLY and SUBSTANTIALLY related to one or more of these topics:
+{topics_text}
+
+INCLUDE papers that:
+- Directly address the topic or subtopics
+- Present new methods, techniques, or approaches for the topic
+- Provide experimental results, benchmarks, or evaluations for the topic
+- Survey or review work related to the topic
+
+EXCLUDE papers that:
+- Only mention the topic tangentially or in passing
+- Use the topic as a minor tool but focus on something else
+- Are about completely different fields
+
+Title: {title}
+Abstract: {abstract}
+
+Is this paper DIRECTLY and SUBSTANTIALLY related to the topics listed above?
+
+Respond with ONLY one word: YES or NO. Do not explain."""
+
+ try:
+ if self.verbose:
+ print(f"[DEBUG] Calling OpenAI (async) with model: {self.model}")
+
+ start_time = time.time()
+ response = await self.async_client.chat.completions.create(
+ model=self.model,
+ messages=[
+ {
+ "role": "system",
+ "content": "You are a paper relevance filter. Respond with ONLY 'YES' or 'NO'."
+ },
+ {
+ "role": "user",
+ "content": prompt
+ }
+ ],
+ max_tokens=5,
+ temperature=0
+ )
+
+ elapsed = time.time() - start_time
+ output = response.choices[0].message.content.strip()
+
+ if self.verbose:
+ print(f"[DEBUG] OpenAI response in {elapsed:.2f}s: {output}")
+
+ is_relevant, extracted_answer = self.parse_llm_response(output)
+ if self.verbose:
+ print(f"[DEBUG] Decision: {'RELEVANT' if is_relevant else 'NOT RELEVANT'}")
+
+ return is_relevant
+ except Exception as e:
+ print(f"[WARN] OpenAI filter failed: {e}")
+ if self.verbose:
+ print(f"[DEBUG] Exception details: {type(e).__name__}: {str(e)}")
+ return False
+
+ def format_entry(self, entry) -> str:
+ """Format an arXiv entry for display"""
+ # Safely get published date
+ published_date_str = "Unknown"
+ if hasattr(entry, 'published') and entry.published:
+ try:
+ published_date_str = datetime.strptime(
+ entry.published, "%Y-%m-%dT%H:%M:%SZ"
+ ).strftime("%Y-%m-%d")
+ except (ValueError, AttributeError):
+ published_date_str = getattr(entry, 'published', 'Unknown')
+
+ title = getattr(entry, 'title', 'No title').strip()
+ link = getattr(entry, 'link', 'No link')
+
+ return (
+ f"Title: {title}\n"
+ f"Date: {published_date_str}\n"
+ f"Link: {link}\n"
+ f"{'-'*80}\n"
+ )
+
+ def send_email_notification(self, papers_list: List[Tuple]) -> bool:
+ """Send email notification with new relevant papers"""
+ if not papers_list:
+ return False
+
+ if not self.user_email or not self.smtp_username or not self.smtp_password:
+ if self.verbose:
+ print(f"[WARN] Email configuration incomplete. Skipping notification.")
+ return False
+
+ try:
+ msg = MIMEMultipart()
+ msg['From'] = self.smtp_username
+ msg['To'] = self.user_email
+
+ paper_count = len(papers_list)
+ if paper_count == 1:
+ msg['Subject'] = f"New Relevant arXiv Paper: {papers_list[0][0].title.strip()[:60]}"
+ else:
+ msg['Subject'] = f"New Relevant arXiv Papers: {paper_count} papers"
+
+ body = f"""Found {paper_count} new relevant paper{'s' if paper_count > 1 else ''} on arXiv!
+
+"""
+ for entry, paper_id in papers_list:
+ body += self.format_entry(entry)
+
+ msg.attach(MIMEText(body, 'plain'))
+
+ if self.verbose:
+ print(f"[DEBUG] Sending email to {self.user_email}...")
+
+ with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
+ server.starttls()
+ server.login(self.smtp_username, self.smtp_password)
+ server.sendmail(self.smtp_username, self.user_email, msg.as_string())
+
+ if self.verbose:
+ print(f"[INFO] Email sent successfully")
+ return True
+
+ except Exception as e:
+ print(f"[WARN] Failed to send email: {e}")
+ return False
+
+ async def run_async(self) -> Dict:
+ """Main execution logic with async/parallel processing"""
+ try:
+ if self.verbose:
+ print("[DEBUG] Starting arXiv agent (async mode)...")
+
+ cached_paper_ids = self.load_cached_papers()
+
+ now = datetime.now(timezone.utc)
+ week_ago = now - timedelta(days=self.days_back)
+
+ if self.verbose:
+ print(f"[DEBUG] Date range: {week_ago.date()} to {now.date()}")
+
+ relevant_papers = []
+ new_relevant_papers = []
+ total_processed = 0
+ total_llm_calls = 0
+ total_cached_hits = 0
+
+ # Collect all entries from all topics first
+ all_entries = []
+ for idx, topic in enumerate(self.topics, 1):
+ if self.verbose:
+ print(f"\n[INFO] Querying topic {idx}/{len(self.topics)}: '{topic}'")
+
+ entries = self.query_arxiv(topic)
+ all_entries.extend([(entry, topic) for entry in entries])
+ total_processed += len(entries)
+
+ # Filter entries by date and validate
+ valid_entries = []
+ for entry, topic in all_entries:
+ if not hasattr(entry, 'published') or not entry.published:
+ continue
+ if not hasattr(entry, 'title') or not entry.title:
+ continue
+ if not hasattr(entry, 'summary') or not entry.summary:
+ continue
+
+ try:
+ published_date = datetime.strptime(
+ entry.published, "%Y-%m-%dT%H:%M:%SZ"
+ ).replace(tzinfo=timezone.utc)
+ except (ValueError, AttributeError):
+ continue
+
+ if week_ago <= published_date <= now:
+ valid_entries.append((entry, topic, published_date))
+
+ if self.verbose:
+ print(f"\n[INFO] Processing {len(valid_entries)} valid entries in parallel...")
+
+ # Process all entries in parallel with semaphore to limit concurrency
+ semaphore = asyncio.Semaphore(10) # Max 10 concurrent LLM calls
+
+ async def process_entry(entry, topic, published_date):
+ nonlocal total_llm_calls, total_cached_hits
+
+ title = entry.title
+ abstract = entry.summary
+ paper_id = self.extract_paper_id(entry)
+
+ is_relevant = False
+ is_new_paper = False
+
+ if paper_id and paper_id in cached_paper_ids:
+ is_relevant = True
+ total_cached_hits += 1
+ if self.verbose:
+ print(f"[DEBUG] Paper '{paper_id}' in cache, skipping LLM")
+ else:
+ async with semaphore:
+ total_llm_calls += 1
+ is_relevant = await self.llm_filter_async(title, abstract)
+
+ if is_relevant:
+ # Check if it's a new paper BEFORE adding to cache
+ is_new_paper = not paper_id or paper_id not in cached_paper_ids
+
+ if paper_id:
+ cached_paper_ids.add(paper_id)
+ self.save_paper_id(paper_id)
+
+ if is_new_paper:
+ new_relevant_papers.append((entry, paper_id))
+
+ if is_relevant:
+ return (published_date.date(), self.format_entry(entry))
+ return None
+
+ # Run all processing tasks in parallel
+ tasks = [process_entry(entry, topic, pub_date) for entry, topic, pub_date in valid_entries]
+ results = await asyncio.gather(*tasks, return_exceptions=True)
+
+ # Collect valid results
+ for result in results:
+ if result and not isinstance(result, Exception):
+ relevant_papers.append(result)
+
+ # Send email notification for new papers
+ email_sent = False
+ if new_relevant_papers:
+ if self.verbose:
+ print(f"\n[INFO] Sending email for {len(new_relevant_papers)} new papers...")
+ email_sent = self.send_email_notification(new_relevant_papers)
+
+ result = {
+ "status": "success",
+ "total_processed": total_processed,
+ "total_relevant": len(relevant_papers),
+ "new_papers": len(new_relevant_papers),
+ "cached_hits": total_cached_hits,
+ "llm_calls": total_llm_calls,
+ "email_sent": email_sent,
+ "papers": [paper for _, paper in sorted(relevant_papers, reverse=True)]
+ }
+
+ if self.verbose:
+ print(f"\n[INFO] Completed: {result['total_relevant']} relevant papers found")
+
+ return result
+ except Exception as e:
+ error_msg = f"Agent execution failed: {type(e).__name__}: {str(e)}"
+ print(f"[ERROR] {error_msg}")
+ if self.verbose:
+ import traceback
+ traceback.print_exc()
+ return {
+ "status": "error",
+ "error": error_msg,
+ "total_processed": 0,
+ "total_relevant": 0,
+ "new_papers": 0,
+ "cached_hits": 0,
+ "llm_calls": 0,
+ "email_sent": False,
+ "papers": []
+ }
+
+ async def run_stream(self) -> AsyncIterator[Dict]:
+ """Streaming version that yields progress updates in real-time"""
+ try:
+ yield {"type": "status", "message": "Starting arXiv agent (streaming mode)...", "progress": 0}
+
+ cached_paper_ids = self.load_cached_papers()
+
+ now = datetime.now(timezone.utc)
+ week_ago = now - timedelta(days=self.days_back)
+
+ yield {"type": "status", "message": f"Date range: {week_ago.date()} to {now.date()}", "progress": 5}
+
+ relevant_papers = []
+ new_relevant_papers = []
+ total_processed = 0
+ total_llm_calls = 0
+ total_cached_hits = 0
+
+ # Collect all entries from all topics
+ all_entries = []
+ for idx, topic in enumerate(self.topics, 1):
+ yield {
+ "type": "status",
+ "message": f"Querying topic {idx}/{len(self.topics)}: '{topic}'",
+ "progress": 10 + (idx * 10 // len(self.topics))
+ }
+
+ entries = self.query_arxiv(topic)
+ all_entries.extend([(entry, topic) for entry in entries])
+ total_processed += len(entries)
+
+ yield {
+ "type": "status",
+ "message": f"Found {len(entries)} papers for '{topic}'",
+ "progress": 10 + (idx * 10 // len(self.topics))
+ }
+
+ # Filter entries by date and validate
+ valid_entries = []
+ for entry, topic in all_entries:
+ if not hasattr(entry, 'published') or not entry.published:
+ continue
+ if not hasattr(entry, 'title') or not entry.title:
+ continue
+ if not hasattr(entry, 'summary') or not entry.summary:
+ continue
+
+ try:
+ published_date = datetime.strptime(
+ entry.published, "%Y-%m-%dT%H:%M:%SZ"
+ ).replace(tzinfo=timezone.utc)
+ except (ValueError, AttributeError):
+ continue
+
+ if week_ago <= published_date <= now:
+ valid_entries.append((entry, topic, published_date))
+
+ yield {
+ "type": "status",
+ "message": f"Processing {len(valid_entries)} valid entries in parallel...",
+ "progress": 30
+ }
+
+ # Process entries in parallel with progress updates
+ semaphore = asyncio.Semaphore(10)
+ processed_count = 0
+ lock = asyncio.Lock()
+
+ async def process_entry(entry, topic, published_date):
+ nonlocal total_llm_calls, total_cached_hits, processed_count
+
+ title = entry.title
+ abstract = entry.summary
+ paper_id = self.extract_paper_id(entry)
+
+ is_relevant = False
+ is_new_paper = False
+
+ if paper_id and paper_id in cached_paper_ids:
+ is_relevant = True
+ async with lock:
+ total_cached_hits += 1
+ else:
+ async with semaphore:
+ async with lock:
+ total_llm_calls += 1
+ is_relevant = await self.llm_filter_async(title, abstract)
+
+ if is_relevant:
+ # Check if it's a new paper BEFORE adding to cache
+ is_new_paper = not paper_id or paper_id not in cached_paper_ids
+
+ if paper_id:
+ cached_paper_ids.add(paper_id)
+ self.save_paper_id(paper_id)
+
+ if is_new_paper:
+ new_relevant_papers.append((entry, paper_id))
+
+ async with lock:
+ processed_count += 1
+ current_progress = processed_count
+
+ if is_relevant:
+ paper_info = self.format_entry(entry)
+ return {
+ "type": "paper",
+ "paper": paper_info,
+ "is_new": paper_id not in cached_paper_ids if paper_id else True,
+ "progress": 30 + int((current_progress / len(valid_entries)) * 60)
+ }, (published_date.date(), paper_info)
+
+ return {
+ "type": "progress",
+ "message": f"Processed {current_progress}/{len(valid_entries)} papers",
+ "progress": 30 + int((current_progress / len(valid_entries)) * 60)
+ }, None
+
+ # Process in batches to yield progress
+ batch_size = 20
+ for i in range(0, len(valid_entries), batch_size):
+ batch = valid_entries[i:i+batch_size]
+ tasks = [process_entry(entry, topic, pub_date) for entry, topic, pub_date in batch]
+
+ for coro in asyncio.as_completed(tasks):
+ try:
+ update, result = await coro
+ yield update
+ if result:
+ relevant_papers.append(result)
+ except Exception as e:
+ if self.verbose:
+ print(f"[WARN] Error processing entry: {e}")
+ yield {
+ "type": "progress",
+ "message": f"Error processing entry: {str(e)}",
+ "progress": 30 + int((processed_count / len(valid_entries)) * 60)
+ }
+
+ yield {
+ "type": "status",
+ "message": f"Found {len(relevant_papers)} relevant papers",
+ "progress": 90
+ }
+
+ # Send email notification
+ email_sent = False
+ if new_relevant_papers:
+ yield {
+ "type": "status",
+ "message": f"Sending email for {len(new_relevant_papers)} new papers...",
+ "progress": 95
+ }
+ email_sent = self.send_email_notification(new_relevant_papers)
+
+ # Final result
+ yield {
+ "type": "complete",
+ "status": "success",
+ "total_processed": total_processed,
+ "total_relevant": len(relevant_papers),
+ "new_papers": len(new_relevant_papers),
+ "cached_hits": total_cached_hits,
+ "llm_calls": total_llm_calls,
+ "email_sent": email_sent,
+ "papers": [paper for _, paper in sorted(relevant_papers, reverse=True)],
+ "progress": 100
+ }
+ except Exception as e:
+ error_msg = f"Agent execution failed: {type(e).__name__}: {str(e)}"
+ print(f"[ERROR] {error_msg}")
+ yield {
+ "type": "error",
+ "error": error_msg,
+ "status": "error",
+ "total_processed": 0,
+ "total_relevant": 0,
+ "new_papers": 0,
+ "cached_hits": 0,
+ "llm_calls": 0,
+ "email_sent": False,
+ "papers": [],
+ "progress": 0
+ }
+
+
+# ============================================================
+# RunAgent Entrypoints
+# ============================================================
+
+def check_papers(
+ topics: List[str] = None,
+ max_results: int = 10,
+ days_back: int = 7,
+ verbose: bool = True,
+ **kwargs
+) -> Dict:
+ """
+ Main entrypoint for checking arXiv papers
+
+ Args:
+ topics: List of topics to search for (optional, uses defaults if not provided)
+ max_results: Maximum results per topic
+ days_back: How many days back to search
+ verbose: Enable verbose logging
+
+ Returns:
+ Dictionary with results including papers found and email status
+ """
+ try:
+ # Default topics if not provided
+ if topics is None:
+ topics = [
+ "unstructured data analysis",
+ "querying unstructured data",
+ "semi structured data",
+ "text to table",
+ "text to relational schema",
+ ]
+
+ agent = ArxivAgent(
+ topics=topics,
+ max_results=max_results,
+ days_back=days_back,
+ verbose=verbose,
+ cache_dir="paper_cache" # Will be in persistent folder
+ )
+
+ return agent.run()
+ except Exception as e:
+ error_msg = f"Entrypoint execution failed: {type(e).__name__}: {str(e)}"
+ print(f"[ERROR] {error_msg}")
+ return {
+ "status": "error",
+ "error": error_msg,
+ "total_processed": 0,
+ "total_relevant": 0,
+ "new_papers": 0,
+ "cached_hits": 0,
+ "llm_calls": 0,
+ "email_sent": False,
+ "papers": []
+ }
+
+
+def check_papers_custom_topics(
+ topic1: str = "",
+ topic2: str = "",
+ topic3: str = "",
+ topic4: str = "",
+ topic5: str = "",
+ max_results: int = 10,
+ days_back: int = 7,
+ **kwargs
+) -> Dict:
+ """
+ Entrypoint for checking papers with custom topics (easier for SDK calls)
+
+ Args:
+ topic1-5: Individual topic strings (easier than passing lists via SDK)
+ max_results: Maximum results per topic
+ days_back: How many days back to search
+
+ Returns:
+ Dictionary with results
+ """
+ try:
+ topics = [t for t in [topic1, topic2, topic3, topic4, topic5] if t]
+
+ if not topics:
+ topics = ["unstructured data analysis"]
+
+ agent = ArxivAgent(
+ topics=topics,
+ max_results=max_results,
+ days_back=days_back,
+ verbose=True,
+ cache_dir="paper_cache"
+ )
+
+ return agent.run()
+ except Exception as e:
+ error_msg = f"Entrypoint execution failed: {type(e).__name__}: {str(e)}"
+ print(f"[ERROR] {error_msg}")
+ return {
+ "status": "error",
+ "error": error_msg,
+ "total_processed": 0,
+ "total_relevant": 0,
+ "new_papers": 0,
+ "cached_hits": 0,
+ "llm_calls": 0,
+ "email_sent": False,
+ "papers": []
+ }
+
+async def check_papers_async(
+ topics: List[str] = None,
+ max_results: int = 10,
+ days_back: int = 7,
+ verbose: bool = True,
+ **kwargs
+) -> Dict:
+ """
+ Async entrypoint for checking arXiv papers with parallel processing
+
+ Args:
+ topics: List of topics to search for (optional, uses defaults if not provided)
+ max_results: Maximum results per topic
+ days_back: How many days back to search
+ verbose: Enable verbose logging
+
+ Returns:
+ Dictionary with results including papers found and email status
+ """
+ try:
+ # Default topics if not provided
+ if topics is None:
+ topics = [
+ "unstructured data analysis",
+ "querying unstructured data",
+ "semi structured data",
+ "text to table",
+ "text to relational schema",
+ ]
+
+ agent = ArxivAgent(
+ topics=topics,
+ max_results=max_results,
+ days_back=days_back,
+ verbose=verbose,
+ cache_dir="paper_cache"
+ )
+
+ return await agent.run_async()
+ except Exception as e:
+ error_msg = f"Entrypoint execution failed: {type(e).__name__}: {str(e)}"
+ print(f"[ERROR] {error_msg}")
+ return {
+ "status": "error",
+ "error": error_msg,
+ "total_processed": 0,
+ "total_relevant": 0,
+ "new_papers": 0,
+ "cached_hits": 0,
+ "llm_calls": 0,
+ "email_sent": False,
+ "papers": []
+ }
+
+
+async def check_papers_stream(
+ topics: List[str] = None,
+ max_results: int = 10,
+ days_back: int = 7,
+ verbose: bool = True,
+ **kwargs
+) -> AsyncIterator[Dict]:
+ """
+ Streaming entrypoint for checking arXiv papers with real-time progress updates
+
+ Args:
+ topics: List of topics to search for (optional, uses defaults if not provided)
+ max_results: Maximum results per topic
+ days_back: How many days back to search
+ verbose: Enable verbose logging
+
+ Yields:
+ Dictionary updates with type: "status", "paper", "progress", "complete", or "error"
+ """
+ try:
+ # Default topics if not provided
+ if topics is None:
+ topics = [
+ "unstructured data analysis",
+ "querying unstructured data",
+ "semi structured data",
+ "text to table",
+ "text to relational schema",
+ ]
+
+ agent = ArxivAgent(
+ topics=topics,
+ max_results=max_results,
+ days_back=days_back,
+ verbose=verbose,
+ cache_dir="paper_cache"
+ )
+
+ async for update in agent.run_stream():
+ yield update
+ except Exception as e:
+ error_msg = f"Entrypoint execution failed: {type(e).__name__}: {str(e)}"
+ print(f"[ERROR] {error_msg}")
+ yield {
+ "type": "error",
+ "error": error_msg,
+ "status": "error",
+ "total_processed": 0,
+ "total_relevant": 0,
+ "new_papers": 0,
+ "cached_hits": 0,
+ "llm_calls": 0,
+ "email_sent": False,
+ "papers": [],
+ "progress": 0
+ }
diff --git a/examples/paper-flow-aritra/requirements.txt b/examples/paper-flow-aritra/requirements.txt
new file mode 100644
index 0000000..018c235
--- /dev/null
+++ b/examples/paper-flow-aritra/requirements.txt
@@ -0,0 +1,3 @@
+feedparser
+requests
+openai
\ No newline at end of file
diff --git a/examples/paper-flow-aritra/runagent.config.json b/examples/paper-flow-aritra/runagent.config.json
new file mode 100644
index 0000000..cade110
--- /dev/null
+++ b/examples/paper-flow-aritra/runagent.config.json
@@ -0,0 +1,35 @@
+{
+ "agent_name": "your research paper finder",
+ "description": "find related papers from arvix",
+ "framework": "default",
+ "template": "",
+ "version": "1.0.0",
+ "created_at": "2025-12-13T17:17:05.801933",
+ "template_source": {
+ "repo_url": "https://github.com/runagent-dev/runagent.git",
+ "author": "runagent-cli",
+ "path": "/home/azureuser/runagent/examples/paper-flow-aritra"
+ },
+ "agent_architecture": {
+ "entrypoints": [
+ {
+ "file": "agent.py",
+ "module": "check_papers_async",
+ "tag": "check_papers_async"
+ }
+ ]
+ },
+ "env_vars": {
+ "OPENAI_API_KEY": "",
+ "USER_EMAIL": "",
+ "SMTP_SERVER": "smtp.gmail.com",
+ "SMTP_PORT": "587",
+ "SMTP_USERNAME": "",
+ "SMTP_PASSWORD": ""
+ },
+ "agent_id": "62f7a781-71bb-4d62-a68f-34dc4f2bfd0b",
+ "auth_settings": {
+ "type": "api_key"
+ },
+ "persistent_folders": ["paper_cache"]
+}
\ No newline at end of file
diff --git a/examples/paper-flow-aritra/test_agent.py b/examples/paper-flow-aritra/test_agent.py
new file mode 100644
index 0000000..ba4d0a5
--- /dev/null
+++ b/examples/paper-flow-aritra/test_agent.py
@@ -0,0 +1,295 @@
+"""
+Test PaperFlow agent locally before deploying to RunAgent Serverless
+
+This script tests the agent functionality without requiring full deployment.
+"""
+import os
+import sys
+
+# Add current directory to path to import agent
+sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
+
+from agent import ArxivAgent, check_papers, check_papers_custom_topics
+
+
+def test_basic_functionality():
+ """Test basic paper search and filtering"""
+
+ print("=" * 70)
+ print("PaperFlow Local Test")
+ print("=" * 70)
+
+ # Create test agent
+ agent = ArxivAgent(
+ topics=[
+ "neural networks",
+ "transformers"
+ ],
+ max_results=5,
+ days_back=7,
+ verbose=True,
+ cache_dir="test_cache"
+ )
+
+ print("\nπ§ͺ Running test search...")
+ result = agent.run()
+
+ print("\n" + "=" * 70)
+ print("Test Results")
+ print("=" * 70)
+ print(f"Status: {result.get('status', 'unknown')}")
+ print(f"Total processed: {result.get('total_processed', 0)}")
+ print(f"Total relevant: {result.get('total_relevant', 0)}")
+ print(f"New papers: {result.get('new_papers', 0)}")
+ print(f"Cached hits: {result.get('cached_hits', 0)}")
+ print(f"LLM calls: {result.get('llm_calls', 0)}")
+ print(f"Email sent: {result.get('email_sent', False)}")
+
+ if result.get('papers'):
+ print(f"\nπ Found {len(result['papers'])} relevant paper(s):")
+ for i, paper in enumerate(result['papers'][:3], 1): # Show first 3
+ print(f"\n{i}. {paper[:200]}...")
+ else:
+ print("\nπ No relevant papers found")
+
+ if result.get('error'):
+ print(f"\nβ οΈ Error: {result['error']}")
+
+ return result
+
+
+def test_sdk_simulation():
+ """Simulate SDK calls to test entrypoints"""
+
+ print("\n" + "=" * 70)
+ print("SDK Call Simulation Test")
+ print("=" * 70)
+
+ # Test 1: Default entrypoint
+ print("\nπ§ͺ Test 1: check_papers() entrypoint")
+ try:
+ result1 = check_papers(
+ topics=["machine learning"],
+ max_results=3,
+ days_back=7,
+ verbose=True
+ )
+ status = result1.get('status', 'unknown')
+ total = result1.get('total_relevant', 0)
+ print(f"β
Result: {status}, Found {total} papers")
+ if result1.get('error'):
+ print(f" β οΈ Error: {result1['error']}")
+ except Exception as e:
+ print(f"β Test failed: {e}")
+ result1 = None
+
+ # Test 2: Custom topics entrypoint
+ print("\nπ§ͺ Test 2: check_papers_custom_topics() entrypoint")
+ try:
+ result2 = check_papers_custom_topics(
+ topic1="quantum computing",
+ topic2="neural networks",
+ max_results=3,
+ days_back=7
+ )
+ status = result2.get('status', 'unknown')
+ total = result2.get('total_relevant', 0)
+ print(f"β
Result: {status}, Found {total} papers")
+ if result2.get('error'):
+ print(f" β οΈ Error: {result2['error']}")
+ except Exception as e:
+ print(f"β Test failed: {e}")
+ result2 = None
+
+ return result1, result2
+
+
+def test_email_config():
+ """Test email configuration"""
+
+ print("\n" + "=" * 70)
+ print("Email Configuration Test")
+ print("=" * 70)
+
+ user_email = os.getenv("USER_EMAIL", "")
+ smtp_username = os.getenv("SMTP_USERNAME", "")
+ smtp_password = os.getenv("SMTP_PASSWORD", "")
+
+ if user_email and smtp_username and smtp_password:
+ print("β
Email configuration found:")
+ print(f" User: {user_email}")
+ print(f" SMTP: {os.getenv('SMTP_SERVER', 'smtp.gmail.com')}")
+ print(f" Port: {os.getenv('SMTP_PORT', '587')}")
+ print("\nβ οΈ Note: Email will only be sent if new papers are found")
+ else:
+ print("β Email configuration incomplete:")
+ print(f" USER_EMAIL: {'β
set' if user_email else 'β not set'}")
+ print(f" SMTP_USERNAME: {'β
set' if smtp_username else 'β not set'}")
+ print(f" SMTP_PASSWORD: {'β
set' if smtp_password else 'β not set'}")
+ print("\nπ‘ To enable email notifications:")
+ print(" 1. Set environment variables:")
+ print(" export USER_EMAIL='your-email@example.com'")
+ print(" export SMTP_USERNAME='your-email@gmail.com'")
+ print(" export SMTP_PASSWORD='your-app-password'")
+ print(" 2. For Gmail, use an App Password (not your regular password)")
+
+
+def test_openai():
+ """Test OpenAI availability"""
+
+ print("\n" + "=" * 70)
+ print("OpenAI Configuration Test")
+ print("=" * 70)
+
+ openai_api_key = os.getenv("OPENAI_API_KEY", "")
+
+ if openai_api_key:
+ print("β
OPENAI_API_KEY found")
+ print(f" Key length: {len(openai_api_key)} characters")
+ print(f" Key prefix: {openai_api_key[:10]}...")
+
+ # Test API connection
+ try:
+ from openai import OpenAI
+ client = OpenAI(api_key=openai_api_key)
+
+ print("\nπ§ͺ Testing OpenAI API connection...")
+ response = client.chat.completions.create(
+ model="gpt-4o-mini",
+ messages=[
+ {"role": "user", "content": "Say 'OK' if you can read this."}
+ ],
+ max_tokens=5
+ )
+
+ output = response.choices[0].message.content.strip()
+ print(f"β
OpenAI API working! Response: {output}")
+
+ except Exception as e:
+ print(f"β OpenAI API error: {e}")
+ print(" Check your API key and internet connection")
+ else:
+ print("β OPENAI_API_KEY not set")
+ print("\nπ‘ To enable OpenAI:")
+ print(" 1. Get your API key from: https://platform.openai.com/api-keys")
+ print(" 2. Set environment variable:")
+ print(" export OPENAI_API_KEY='sk-proj-your-key-here'")
+
+
+def test_cache_functionality():
+ """Test cache read/write functionality"""
+
+ print("\n" + "=" * 70)
+ print("Cache Functionality Test")
+ print("=" * 70)
+
+ import tempfile
+ import shutil
+
+ # Create temporary cache directory
+ test_cache_dir = tempfile.mkdtemp(prefix="test_paper_cache_")
+ test_cache_file = os.path.join(test_cache_dir, "relevant_papers.txt")
+
+ try:
+ # Test 1: Load empty cache
+ agent = ArxivAgent(
+ topics=["test"],
+ max_results=1,
+ days_back=7,
+ verbose=False,
+ cache_dir=test_cache_dir
+ )
+
+ cached = agent.load_cached_papers()
+ print(f"β
Empty cache loaded: {len(cached)} papers")
+
+ # Test 2: Save paper ID
+ test_paper_id = "1234.5678"
+ agent.save_paper_id(test_paper_id)
+ print(f"β
Paper ID saved: {test_paper_id}")
+
+ # Test 3: Load cache with saved paper
+ cached = agent.load_cached_papers()
+ if test_paper_id in cached:
+ print(f"β
Paper ID found in cache: {test_paper_id}")
+ else:
+ print(f"β Paper ID not found in cache")
+
+ # Cleanup
+ shutil.rmtree(test_cache_dir)
+ print("β
Cache test completed successfully")
+
+ except Exception as e:
+ print(f"β Cache test failed: {e}")
+ if os.path.exists(test_cache_dir):
+ shutil.rmtree(test_cache_dir)
+
+
+def run_all_tests():
+ """Run all tests"""
+
+ print("\n" + "π" * 35)
+ print("PaperFlow Test Suite")
+ print("π" * 35)
+
+ # Test 1: OpenAI
+ test_openai()
+
+ # Test 2: Email config
+ test_email_config()
+
+ # Test 3: Cache functionality
+ test_cache_functionality()
+
+ # Test 4: SDK simulation
+ test_sdk_simulation()
+
+ # Test 5: Basic functionality
+ # Note: This makes real API calls and LLM requests
+ print("\n" + "=" * 70)
+ print("Running full integration test (this may take a minute)...")
+
+ print("β οΈ This will make real API calls to arXiv and OpenAI")
+ response = input("Continue? (y/n): ")
+ if response.lower() == 'y':
+ test_basic_functionality()
+ else:
+ print("Skipping full integration test")
+
+ print("All tests complete!")
+
+
+
+if __name__ == "__main__":
+ # Load .env file if it exists
+ env_file = os.path.join(os.path.dirname(__file__), ".env")
+ if os.path.exists(env_file):
+ print("Loading .env file...")
+ with open(env_file) as f:
+ for line in f:
+ line = line.strip()
+ if line and not line.startswith("#") and "=" in line:
+ key, value = line.split("=", 1)
+ os.environ[key.strip()] = value.strip().strip('"').strip("'")
+
+ if len(sys.argv) > 1:
+ test_name = sys.argv[1]
+
+ if test_name == "basic":
+ test_basic_functionality()
+ elif test_name == "sdk":
+ test_sdk_simulation()
+ elif test_name == "email":
+ test_email_config()
+ elif test_name == "openai":
+ test_openai()
+ elif test_name == "cache":
+ test_cache_functionality()
+ elif test_name == "all":
+ run_all_tests()
+ else:
+ print("Available tests: basic, sdk, email, openai, cache, all")
+ print("Usage: python test_agent.py [test_name]")
+ else:
+ run_all_tests()
+
diff --git a/runagent/utils/port.py b/runagent/utils/port.py
index fe39b58..b507493 100644
--- a/runagent/utils/port.py
+++ b/runagent/utils/port.py
@@ -10,8 +10,8 @@ class PortManager:
"""Utility class for managing port allocation"""
DEFAULT_START_PORT = 8450
+ DEFAULT_END_PORT = 65535
DEFAULT_HOST = "127.0.0.1"
- MAX_PORT_ATTEMPTS = 5
@staticmethod
def is_port_available(host: str, port: int) -> bool:
@@ -29,11 +29,11 @@ def is_port_available(host: str, port: int) -> bool:
@staticmethod
def find_available_port(host: str = DEFAULT_HOST, start_port: int = DEFAULT_START_PORT) -> int:
"""Find the next available port starting from start_port"""
- for port in range(start_port, start_port + PortManager.MAX_PORT_ATTEMPTS):
+ for port in range(start_port, PortManager.DEFAULT_END_PORT + 1):
if PortManager.is_port_available(host, port):
return port
- raise RuntimeError(f"No available ports found in range {start_port}-{start_port + PortManager.MAX_PORT_ATTEMPTS}")
+ raise RuntimeError(f"No available ports found in range {start_port}-{PortManager.DEFAULT_END_PORT}")
@staticmethod
def allocate_unique_address(used_ports: list = None) -> Tuple[str, int]:
@@ -44,7 +44,7 @@ def allocate_unique_address(used_ports: list = None) -> Tuple[str, int]:
# Start from default port and find the first available
start_port = PortManager.DEFAULT_START_PORT
- for port in range(start_port, start_port + PortManager.MAX_PORT_ATTEMPTS):
+ for port in range(start_port, PortManager.DEFAULT_END_PORT + 1):
if port not in used_ports and PortManager.is_port_available(host, port):
console.print(f"π Allocated address: [blue]{host}:{port}[/blue]")
return host, port
@@ -67,4 +67,4 @@ def get_used_ports_from_db(db_service) -> list:
if os.getenv('DISABLE_TRY_CATCH'):
raise
console.print(f"[yellow]Warning: Could not fetch used ports: {e}[/yellow]")
- return []
\ No newline at end of file
+ return []
diff --git a/templates/agno/default/runagent.config.json b/templates/agno/default/runagent.config.json
index 8fd811e..345d233 100644
--- a/templates/agno/default/runagent.config.json
+++ b/templates/agno/default/runagent.config.json
@@ -25,7 +25,7 @@
]
},
"env_vars": {},
- "agent_id": "ae29bd73-b3d3-99c8-a98f-5d7aec7ee919",
+ "agent_id": "ae29bd73-b3d3-99c8-a98f-5d7aec7ee911",
"auth_settings": {
"type": "api_key"
}
diff --git a/test_scripts/python/client_test_agno.py b/test_scripts/python/client_test_agno.py
index 5710021..5b40c65 100644
--- a/test_scripts/python/client_test_agno.py
+++ b/test_scripts/python/client_test_agno.py
@@ -1,28 +1,29 @@
-# from runagent import RunAgentClient
+from runagent import RunAgentClient
-# ra = RunAgentClient(
-# agent_id="ae29bd73-b3d3-22c8-a98f-5d7aec7ee919",
-# entrypoint_tag="agno_print_response",
-# local=False
-# )
+ra = RunAgentClient(
+ agent_id="ae29bd73-b3d3-99c8-a98f-5d7aec7ee911",
+ entrypoint_tag="agno_print_response",
+ local=True,
+ port=8455
+ )
-# agent_result = ra.run(
-# "what is the difference between astrology and love"
-# )
+agent_result = ra.run(
+ "what is the difference between astrology and love"
+)
-# print(agent_result)
+print(agent_result)
# ==================================
-from runagent import RunAgentClient
+# from runagent import RunAgentClient
-ra = RunAgentClient(
- agent_id="ae29bd73-b3d3-99c8-a98f-5d7aec7ee919",
- entrypoint_tag="agno_print_response_stream",
- local=False
- )
+# ra = RunAgentClient(
+# agent_id="ae29bd73-b3d3-99c8-a98f-5d7aec7ee911",
+# entrypoint_tag="agno_print_response_stream",
+# local=False
+# )
-for chunk in ra.run(
- "Benefits of a long drive"
-):
- print(chunk)
+# for chunk in ra.run(
+# "Benefits of a long drive"
+# ):
+# print(chunk)
diff --git a/test_scripts/python/client_test_paperflow.py b/test_scripts/python/client_test_paperflow.py
new file mode 100644
index 0000000..60179dd
--- /dev/null
+++ b/test_scripts/python/client_test_paperflow.py
@@ -0,0 +1,92 @@
+from runagent import RunAgentClient
+
+# ============================================================================
+# OPTION 1: Standard synchronous mode (sequential processing)
+# ============================================================================
+# This is the default mode - processes papers one by one
+# Use this if you want simple, straightforward execution
+# Note: Update agent_id if you redeployed the agent
+# client = RunAgentClient(
+# agent_id="62f7a781-51bb-4d62-a68f-24dc4f2bfd0b", # Updated to match runagent.config.json
+# entrypoint_tag="check_papers", # Standard synchronous entrypoint
+# local=False,
+# user_id="prova3", # User ID for persistent storage (VM-level)
+# persistent_memory=True # Enable persistent storage
+# )
+# result = client.run(
+# topics=["reinforcement learning"],
+# max_results=20,
+# days_back=100
+# )
+
+# print(f"Found {result['total_relevant']} relevant papers")
+# print(f"Email sent: {result['email_sent']}")
+
+# ============================================================================
+# OPTION 2: Async/Parallel mode (faster, processes papers in parallel)
+# ============================================================================
+# Uncomment below to use async mode - processes up to 10 papers simultaneously
+# This is much faster for large batches of papers
+#
+client_async = RunAgentClient(
+ agent_id="62f7a781-71bb-4d62-a68f-24dc4f2bfd0b",
+ entrypoint_tag="check_papers_async", # Async entrypoint with parallel processing
+ local=False,
+ user_id="prova4",
+ persistent_memory=True
+)
+result_async = client_async.run(
+ topics=["LLM finetuning"],
+ max_results=20,
+ days_back=100
+)
+print(f"Found {result_async['total_relevant']} relevant papers (async mode)")
+print(f"Email sent: {result_async['email_sent']}")
+
+# ============================================================================
+# OPTION 3: Streaming mode (real-time progress updates)
+# ============================================================================
+# Uncomment below to use streaming mode - see progress as it happens
+# The VM stays alive and streams results in real-time
+# Note: Requires SDK support for streaming or use WebSocket endpoint
+#
+# import asyncio
+#
+# async def stream_example():
+# client_stream = RunAgentClient(
+# agent_id="62f7a781-51bb-4d62-a68f-24dc4f2bfd0b",
+# entrypoint_tag="check_papers_stream", # Streaming entrypoint
+# local=False,
+# user_id="prova3",
+# persistent_memory=True
+# )
+#
+# # If SDK supports streaming:
+# # async for update in client_stream.run_stream(
+# # topics=["reinforcement learning"],
+# # max_results=20,
+# # days_back=100
+# # ):
+# # update_type = update.get("type", "unknown")
+# # if update_type == "status":
+# # print(f"π {update.get('message', '')} [{update.get('progress', 0)}%]")
+# # elif update_type == "paper":
+# # print(f"π Found paper: {update.get('paper', '')[:100]}...")
+# # elif update_type == "complete":
+# # print(f"β
Complete: {update.get('total_relevant', 0)} papers found")
+#
+# # Alternative: Use WebSocket endpoint if available
+# # See: /home/azureuser/runagent/test_scripts/python/client_test_paperflow_stream.py
+# # for a complete streaming example
+#
+# # asyncio.run(stream_example())
+
+# ============================================================================
+# QUICK REFERENCE:
+# ============================================================================
+# - check_papers: Standard mode (sequential, one paper at a time)
+# - check_papers_async: Fast mode (parallel, up to 10 papers simultaneously)
+# - check_papers_stream: Streaming mode (real-time progress, VM stays alive)
+#
+# For streaming examples, see:
+# /home/azureuser/runagent/test_scripts/python/client_test_paperflow_stream.py
\ No newline at end of file
diff --git a/test_scripts/python/client_test_paperflow_stream.py b/test_scripts/python/client_test_paperflow_stream.py
new file mode 100644
index 0000000..2aaf5ac
--- /dev/null
+++ b/test_scripts/python/client_test_paperflow_stream.py
@@ -0,0 +1,108 @@
+"""
+Test PaperFlow agent with streaming support
+Shows real-time progress updates as the agent processes papers
+"""
+from runagent import RunAgentClient
+import asyncio
+
+async def test_streaming():
+ """Test the streaming entrypoint"""
+ client = RunAgentClient(
+ agent_id="62f7a781-41bb-4d62-a68f-24dc4f2bfd0b",
+ entrypoint_tag="check_papers_stream", # Use streaming entrypoint
+ local=False,
+ user_id="prova3",
+ persistent_memory=True
+ )
+
+ print("π Starting streaming agent...")
+ print("=" * 70)
+
+ # Use streaming mode if available
+ try:
+ # Check if client supports streaming
+ if hasattr(client, 'run_stream'):
+ async for update in client.run_stream(
+ topics=["reinforcement learning"],
+ max_results=20,
+ days_back=100
+ ):
+ update_type = update.get("type", "unknown")
+
+ if update_type == "status":
+ print(f"π {update.get('message', '')} [{update.get('progress', 0)}%]")
+ elif update_type == "paper":
+ paper = update.get("paper", "")
+ is_new = update.get("is_new", False)
+ status = "π NEW" if is_new else "π"
+ print(f"\n{status} Paper found:")
+ print(paper)
+ print(f"Progress: {update.get('progress', 0)}%")
+ elif update_type == "progress":
+ print(f"β³ {update.get('message', '')} [{update.get('progress', 0)}%]")
+ elif update_type == "complete":
+ print("\n" + "=" * 70)
+ print("β
Processing Complete!")
+ print("=" * 70)
+ print(f"Total processed: {update.get('total_processed', 0)}")
+ print(f"Total relevant: {update.get('total_relevant', 0)}")
+ print(f"New papers: {update.get('new_papers', 0)}")
+ print(f"Cached hits: {update.get('cached_hits', 0)}")
+ print(f"LLM calls: {update.get('llm_calls', 0)}")
+ print(f"Email sent: {update.get('email_sent', False)}")
+ elif update_type == "error":
+ print(f"\nβ Error: {update.get('error', 'Unknown error')}")
+ else:
+ # Fallback to regular async call
+ print("β οΈ Streaming not available, using async mode...")
+ result = await client.run_async(
+ topics=["reinforcement learning"],
+ max_results=20,
+ days_back=100
+ )
+ print(f"\nβ
Found {result['total_relevant']} relevant papers")
+ print(f"Email sent: {result['email_sent']}")
+ except Exception as e:
+ print(f"β Error: {e}")
+ import traceback
+ traceback.print_exc()
+
+def test_async():
+ """Test the async entrypoint (faster, but no streaming)"""
+ client = RunAgentClient(
+ agent_id="62f7a781-41bb-4d62-a68f-24dc4f2bfd0b",
+ entrypoint_tag="check_papers_async", # Use async entrypoint
+ local=False,
+ user_id="prova3",
+ persistent_memory=True
+ )
+
+ print("π Starting async agent (parallel processing)...")
+ print("=" * 70)
+
+ # Run async entrypoint
+ result = asyncio.run(client.run(
+ topics=["reinforcement learning"],
+ max_results=20,
+ days_back=100
+ ))
+
+ print("\n" + "=" * 70)
+ print("β
Results:")
+ print("=" * 70)
+ print(f"Total processed: {result.get('total_processed', 0)}")
+ print(f"Total relevant: {result.get('total_relevant', 0)}")
+ print(f"New papers: {result.get('new_papers', 0)}")
+ print(f"Cached hits: {result.get('cached_hits', 0)}")
+ print(f"LLM calls: {result.get('llm_calls', 0)}")
+ print(f"Email sent: {result.get('email_sent', False)}")
+
+if __name__ == "__main__":
+ import sys
+
+ if len(sys.argv) > 1 and sys.argv[1] == "async":
+ test_async()
+ else:
+ # Default to streaming
+ asyncio.run(test_streaming())
+