# üï∑Ô∏è LMForge MCP Agent System - Standalone Colab

**Crawler + Parser Agents - Complete Standalone Implementation**

This notebook contains everything needed - no external files required!

‚úÖ **Features:**
- **Crawler Agent** - Fetches HTML from URLs
- **Parser Agent** - Extracts clean text
- **Fallback Support** - Works even if MCP agents fail
- **Complete Standalone** - All code embedded

---


## üì¶ Step 1: Install Dependencies


In [None]:
%pip install -q fastapi uvicorn mcp-use langchain-openai python-dotenv nest-asyncio requests beautifulsoup4 pydantic


## ‚öôÔ∏è Step 2: Enable Async Support


In [None]:
import nest_asyncio
nest_asyncio.apply()
print("‚úÖ Async support enabled")


## üìù Step 3: Create All Agent Files (Embedded in Notebook)

All agent code is embedded here - no external files needed!


In [None]:
import os
from pathlib import Path

# Create agents directory
os.makedirs('agents', exist_ok=True)

# Crawler Agent
crawler_code = '''#!/usr/bin/env python3
import asyncio
import json
from typing import Any
import requests
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent

app = Server("crawler-server")

@app.list_tools()
async def list_tools() -> list[Tool]:
    return [
        Tool(
            name="fetch_url",
            description="Fetches the raw HTML content from a given URL",
            inputSchema={
                "type": "object",
                "properties": {"url": {"type": "string", "description": "The URL to fetch"}},
                "required": ["url"],
            },
        )
    ]

@app.call_tool()
async def call_tool(name: str, arguments: Any) -> list[TextContent]:
    if name != "fetch_url":
        raise ValueError(f"Unknown tool: {name}")
    
    url = arguments.get("url")
    if not url:
        raise ValueError("URL is required")

    try:
        headers = {"User-Agent": "Mozilla/5.0"}
        response = requests.get(url, headers=headers, timeout=10)
        response.raise_for_status()
        
        result = {"url": url, "status_code": response.status_code, "html": response.text}
        return [TextContent(type="text", text=json.dumps(result))]
    except Exception as e:
        return [TextContent(type="text", text=json.dumps({"url": url, "error": str(e), "html": ""}))]

async def main():
    async with stdio_server() as (read_stream, write_stream):
        await app.run(read_stream, write_stream, app.create_initialization_options())

if __name__ == "__main__":
    asyncio.run(main())
'''

# Parser Agent
parser_code = '''#!/usr/bin/env python3
import asyncio
import json
from typing import Any
from bs4 import BeautifulSoup
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent

app = Server("parser-server")

@app.list_tools()
async def list_tools() -> list[Tool]:
    return [
        Tool(
            name="parse_html",
            description="Extracts text content from HTML",
            inputSchema={
                "type": "object",
                "properties": {"html": {"type": "string", "description": "The HTML content to parse"}},
                "required": ["html"],
            },
        )
    ]

@app.call_tool()
async def call_tool(name: str, arguments: Any) -> list[TextContent]:
    if name != "parse_html":
        raise ValueError(f"Unknown tool: {name}")

    html = arguments.get("html")
    if html is None:
        raise ValueError("HTML content is required")

    try:
        soup = BeautifulSoup(html, 'html.parser')
        for script_or_style in soup(['script', 'style', 'meta', 'noscript']):
            script_or_style.decompose()
        
        text = soup.get_text()
        lines = (line.strip() for line in text.splitlines())
        chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
        text = '\\n'.join(chunk for chunk in chunks if chunk)
        
        result = {"text": text, "success": True}
        return [TextContent(type="text", text=json.dumps(result))]
    except Exception as e:
        return [TextContent(type="text", text=json.dumps({"text": "", "error": str(e), "success": False}))]

async def main():
    async with stdio_server() as (read_stream, write_stream):
        await app.run(read_stream, write_stream, app.create_initialization_options())

if __name__ == "__main__":
    asyncio.run(main())
'''

# Write agent files
with open('agents/crawler_server.py', 'w') as f:
    f.write(crawler_code)

with open('agents/parser_server.py', 'w') as f:
    f.write(parser_code)

print("‚úÖ Agent files created:")
print("   - agents/crawler_server.py")
print("   - agents/parser_server.py")


## üîß Step 4: Create Orchestrator (main.py)


In [None]:
main_code = '''import os
import json
import asyncio
import sys
from typing import Dict, Any
from mcp_use import MCPClient

class LMForgeOrchestrator:
    def __init__(self):
        self.clients: Dict[str, MCPClient] = {}
        self.agents: Dict[str, Any] = {}
        
    async def initialize(self):
        mcp_servers = {
            "crawler": {
                "command": sys.executable,
                "args": ["agents/crawler_server.py"],
                "env": os.environ.copy()
            },
            "parser": {
                "command": sys.executable,
                "args": ["agents/parser_server.py"],
                "env": os.environ.copy()
            }
        }
        
        for name, config in mcp_servers.items():
            try:
                client = MCPClient()
                client.add_server(name, config)
                await client.create_all_sessions()
                session = client.get_session(name)
                if not session:
                    raise RuntimeError(f"Failed to create session for {name}")
                self.clients[name] = client
                self.agents[name] = session
                print(f"‚úì Initialized {name} agent")
            except Exception as e:
                error_msg = str(e)
                if "fileno" in error_msg.lower():
                    print(f"‚úó Failed to initialize {name} agent: stdio issue")
                    print(f"  Using fallback implementation (direct function calls)")
                else:
                    print(f"‚úó Failed to initialize {name} agent: {str(e)}")
                # Continue with fallback
    
    async def crawl_url(self, url: str) -> Dict[str, Any]:
        # Try MCP agent first
        if "crawler" in self.agents:
            try:
                session = self.agents["crawler"]
                result = await session.call_tool("fetch_url", {"url": url})
                
                if result and hasattr(result, 'content') and result.content:
                    content_item = result.content[0]
                    result_text = content_item.text if hasattr(content_item, 'text') else str(content_item)
                    return json.loads(result_text)
                raise RuntimeError("No result from crawler")
            except Exception as e:
                print(f"‚ö† MCP crawler failed, using fallback: {e}")
        
        # Fallback: Direct implementation
        import requests
        try:
            headers = {"User-Agent": "Mozilla/5.0"}
            response = requests.get(url, headers=headers, timeout=10)
            response.raise_for_status()
            
            return {
                "url": url,
                "status_code": response.status_code,
                "html": response.text,
            }
        except Exception as e:
            return {
                "url": url,
                "error": str(e),
                "html": "",
            }
    
    async def parse_html(self, html: str) -> Dict[str, Any]:
        # Try MCP agent first
        if "parser" in self.agents:
            try:
                session = self.agents["parser"]
                result = await session.call_tool("parse_html", {"html": html})
                
                if result and hasattr(result, 'content') and result.content:
                    content_item = result.content[0]
                    result_text = content_item.text if hasattr(content_item, 'text') else str(content_item)
                    return json.loads(result_text)
                raise RuntimeError("No result from parser")
            except Exception as e:
                print(f"‚ö† MCP parser failed, using fallback: {e}")
        
        # Fallback: Direct implementation
        from bs4 import BeautifulSoup
        try:
            soup = BeautifulSoup(html, 'html.parser')
            for script_or_style in soup(['script', 'style', 'meta', 'noscript']):
                script_or_style.decompose()
            
            text = soup.get_text()
            lines = (line.strip() for line in text.splitlines())
            chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
            text = '\\n'.join(chunk for chunk in chunks if chunk)
            
            return {"text": text, "success": True}
        except Exception as e:
            return {"text": "", "error": str(e), "success": False}
    
    def clean_text_simple(self, text: str) -> str:
        import re
        text = re.sub(r'\\s+', ' ', text)
        return text.strip()
    
    async def run_full_flow(self, url: str) -> Dict[str, Any]:
        crawl_result = await self.crawl_url(url)
        if "error" in crawl_result:
            return {"url": url, "error": crawl_result["error"], "stage": "crawl"}
        
        html_content = crawl_result.get("html", "")
        if not html_content:
            return {"url": url, "error": "No HTML content received", "stage": "crawl"}
        
        parse_result = await self.parse_html(html_content)
        if not parse_result.get("success", False):
            return {"url": url, "error": parse_result.get("error", "Parse failed"), "stage": "parse"}
        
        parsed_text = parse_result.get("text", "")
        cleaned_text = self.clean_text_simple(parsed_text)
        
        return {
            "url": url,
            "raw_html": html_content[:500] + ("..." if len(html_content) > 500 else ""),
            "parsed_text": parsed_text[:500] + ("..." if len(parsed_text) > 500 else ""),
            "cleaned_text": cleaned_text,
            "success": True
        }
    
    async def close(self):
        for name, client in self.clients.items():
            try:
                await client.close_all_sessions()
            except:
                pass

orchestrator = LMForgeOrchestrator()

async def get_orchestrator():
    if not orchestrator.clients:
        await orchestrator.initialize()
    return orchestrator
'''

with open('main.py', 'w') as f:
    f.write(main_code)

print("‚úÖ main.py created")


In [None]:
app_code = '''import os
import sys
from typing import Dict, Any
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from main import get_orchestrator

@asynccontextmanager
async def lifespan(app: FastAPI):
    print("üöÄ Starting LMForge MCP-Use Backend...")
    try:
        orchestrator = await get_orchestrator()
        print("‚úì All agents initialized successfully")
    except Exception as e:
        print(f"‚úó Failed to initialize agents: {str(e)}")
        sys.exit(1)
    yield
    print("üõë Shutting down...")
    try:
        orchestrator = await get_orchestrator()
        await orchestrator.close()
    except:
        pass

app = FastAPI(title="LMForge MCP-Use Backend", version="1.0.0")
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"])

class RunFlowRequest(BaseModel):
    url: str = Field(..., description="The URL to process")

class RunFlowResponse(BaseModel):
    url: str
    raw_html: str
    parsed_text: str
    cleaned_text: str
    success: bool = True

@app.get("/")
async def root():
    return {"name": "LMForge Backend", "status": "running"}

@app.get("/health")
async def health_check():
    return {"status": "healthy", "service": "LMForge MCP-Use Backend"}

@app.post("/run-flow", response_model=RunFlowResponse)
async def run_flow(request: RunFlowRequest) -> Dict[str, Any]:
    try:
        orchestrator = await get_orchestrator()
        result = await orchestrator.run_full_flow(request.url)
        
        if "error" in result:
            raise HTTPException(status_code=400, detail={"error": result["error"], "stage": result.get("stage", "unknown")})
        
        return result
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
'''

with open('app.py', 'w') as f:
    f.write(app_code)

print("‚úÖ app.py created")


## üåê Step 6: Start the Server


In [None]:
import threading
import time
import asyncio
from pathlib import Path
from uvicorn import Config, Server

# Verify app.py exists (should be created in previous cells)
if not Path('app.py').exists():
    print("‚ö†Ô∏è app.py not found!")
    print("Please run the previous cells first:")
    print("  - Cell 6: Create agent files")
    print("  - Cell 8: Create main.py")
    print("  - Cell 10: Create app.py")
    print("\nThen run this cell again.")
else:
    print("‚úÖ app.py found - starting server...")
    
    def run_server():
        config = Config("app:app", host="0.0.0.0", port=8000, log_level="info")
        server = Server(config)
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.run_until_complete(server.serve())
    
    server_thread = threading.Thread(target=run_server, daemon=True)
    server_thread.start()
    
    print("üöÄ Starting server...")
    time.sleep(8)
    print("‚úÖ Server started at http://localhost:8000")
    print("üìö API docs: http://localhost:8000/docs")
    print("\nüí° To test: Run the next cell and enter a URL!")


## üß™ Step 7: Test the Agents - Extract Text from URL

Enter any URL below and see the agents extract text!


In [None]:
import requests
import json
import time

# Test URL
print("="*70)
print("üß™ Test the Backend - Extract Text from URL")
print("="*70)
print()

test_url = input("üìù Enter URL to extract text from (or press Enter for default): ").strip()

if not test_url:
    test_url = "https://example.com"  # Default
    print(f"   Using default URL: {test_url}")
else:
    print(f"   Testing: {test_url}")

print(f"\nüåê Processing: {test_url}")
print("‚è≥ Please wait (this may take a few seconds)...\n")

# Wait a bit for server to be ready
time.sleep(2)

try:
    response = requests.post(
        "http://localhost:8000/run-flow",
        json={"url": test_url},
        timeout=60
    )
    
    if response.status_code == 200:
        result = response.json()
        
        print("="*70)
        print("‚úÖ SUCCESS - Content Extracted!")
        print("="*70)
        print(f"\nüìã URL: {result['url']}")
        print(f"\nüìä Statistics:")
        print(f"   ‚Ä¢ Raw HTML preview: {len(result.get('raw_html', ''))} characters")
        print(f"   ‚Ä¢ Parsed Text preview: {len(result.get('parsed_text', ''))} characters")
        print(f"   ‚Ä¢ Cleaned Text: {len(result.get('cleaned_text', ''))} characters")
        
        print(f"\nüìÑ Extracted Text (First 500 chars):")
        print("-"*70)
        cleaned = result.get('cleaned_text', '')
        if cleaned:
            print(cleaned[:500])
            if len(cleaned) > 500:
                print(f"\n... (truncated - showing first 500 of {len(cleaned)} chars)")
        else:
            print("(No text extracted)")
        print("-"*70)
        
        print(f"\n‚ú® Full cleaned text ({len(cleaned)} chars) is available!")
        print(f"\nüí° Tip: You can access the full result in the 'result' variable")
        
        # Store result for next cell
        globals()['result'] = result
        
    else:
        print(f"‚ùå Error: HTTP {response.status_code}")
        try:
            error_detail = response.json()
            print(f"\nüìã Error Details:")
            print(json.dumps(error_detail, indent=2))
        except:
            print(f"\nüìã Response: {response.text}")
        
except requests.exceptions.ConnectionError:
    print("‚ùå Cannot connect to backend!")
    print("   Make sure the server is running.")
    print("   Check the previous cell - did you see 'Server started'?")
    print("\n   Try running the server cell again if needed.")
    
except requests.exceptions.Timeout:
    print("‚ùå Request timed out!")
    print("   The server may be processing. Try again or check if the URL is accessible.")
    
except Exception as e:
    print(f"‚ùå Error: {e}")
    print("\nüí° Troubleshooting:")
    print("   1. Check if server started (previous cell)")
    print("   2. Wait a few seconds after starting server")
    print("   3. Try a simpler URL like https://example.com")


## üìä Step 8: View Full Results


In [None]:
# Display full result as JSON
try:
    # Check if result exists
    if 'result' in globals() or 'result' in locals():
        result_val = globals().get('result') or locals().get('result')
        
        if result_val and result_val.get('success'):
            print("\nüìã Complete Result (JSON):")
            print("="*70)
            print(json.dumps(result_val, indent=2, ensure_ascii=False))
            print("="*70)
            print("\nüí° Access individual fields:")
            print(f"   - result['cleaned_text'] - Full extracted text")
            print(f"   - result['parsed_text'] - Parsed text preview")
            print(f"   - result['raw_html'] - Raw HTML preview")
        else:
            print("‚ö† Result exists but wasn't successful")
            print("   Run the previous test cell again")
    else:
        print("üìù Run the previous cell first to extract content from a URL")
        print("   Then run this cell to see the full JSON result")
except Exception as e:
    print(f"‚ö† Error displaying results: {e}")
    print("   Make sure you ran the test cell first")


## üåê Step 9: Access API Documentation


In [None]:
from IPython.display import IFrame

print("üìö FastAPI Interactive Documentation")
print("Open in new tab: http://localhost:8000/docs")
print("\nOr view below:")

IFrame(src="http://localhost:8000/docs", width=900, height=600)


---

## ‚úÖ What's Working

- ‚úÖ **Crawler Agent** - Fetches HTML from URLs (with fallback)
- ‚úÖ **Parser Agent** - Extracts clean text (with fallback)
- ‚úÖ **FastAPI Backend** - REST API endpoint at `/run-flow`
- ‚úÖ **MCP Orchestration** - Agents work together (or fallback if needed)
- ‚úÖ **Standalone** - Everything in this notebook!
- ‚úÖ **Reliable** - Fallback ensures it always works!

## üß™ Quick Test (Optional)

Try testing with different URLs:
- `https://example.com` - Simple demo
- `https://httpbin.org/html` - Sample HTML
- Any website URL!

## üìù Next Steps

- Frontend integration (already built!)
- Add more agents (QA Generator, etc.)
- Deploy to production

---

**Made with ‚ù§Ô∏è for LMForge**

**Note:** If MCP agents fail to initialize (stdio issues), the fallback implementation ensures the backend still works perfectly!
