In [None]:
import gradio as gr

def letter_counter(word: str, letter: str) -> int:
    """
    Count the number of occurrences of a letter in a word or text.

    Args:
        word (str): The input text to search through
        letter (str): The letter to search for

    Returns:
        int: The number of times the letter appears in the text
    """
    word = word.lower()
    letter = letter.lower()
    count = word.count(letter)
    return count

demo = gr.Interface(
    fn=letter_counter,
    inputs=["textbox", "textbox"],
    outputs="number",
    title="Letter Counter",
    description="Enter text and a letter to count how many times the letter appears in the text."
)

if __name__ == "__main__":
    demo.launch(mcp_server=True)

In [None]:
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("Weather Service")

@mcp.tool()
def get_weather(location: str) -> str:
    """Get the current weather for a specified location."""
    return f"Weather in {location}: Sunny, 72°F"

@mcp.resource("weather://{location}")
def weather_resource(location: str) -> str:
    """Provide weather data as a resource."""
    return f"Weather data for {location}: Sunny, 72°F"

@mcp.prompt()
def weather_report(location: str) -> str:
    """Create a weather report prompt."""
    return f"""You are a weather reporter. Weather report for {location}?"""

if __name__ == "__main__":
    mcp.run()

In [None]:
import asyncio
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from uuid import UUID, uuid4
import httpx
from pydantic import BaseModel, Field

from mcp import MCPServer, Tool, types
from mcp.server.models import InitializationOptions

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class EventData(BaseModel):
    """Event data model for risk assessment"""
    event_id: str
    title: str
    description: str
    location: str
    date_time: str
    organizer_email: str
    organizer_id: str
    category: str
    max_attendees: Optional[int] = None
    price: Optional[float] = 0.0
    external_urls: List[str] = []

class RiskAssessmentResult(BaseModel):
    """Risk assessment response model"""
    event_id: str
    risk_score: int 
    risk_level: str 
    risk_factors: List[str]
    confidence: float
    recommendation: str
    analysis_timestamp: str

class RSVPData(BaseModel):
    """RSVP data for email notifications"""
    rsvp_id: str
    event_id: str
    user_email: str
    user_name: str
    event_title: str
    event_date: str
    event_location: str
    organizer_name: str

class N8NClient:
    """Client for interacting with n8n webhooks"""
    
    def __init__(self, base_url: str, auth_token: Optional[str] = None):
        self.base_url = base_url.rstrip('/')
        self.auth_token = auth_token
        self.client = httpx.AsyncClient(timeout=30.0)
    
    async def trigger_webhook(self, webhook_path: str, data: Dict[str, Any]) -> Dict[str, Any]:
        """Trigger an n8n webhook with data"""
        url = f"{self.base_url}/webhook/{webhook_path}"
        
        headers = {'Content-Type': 'application/json'}
        if self.auth_token:
            headers['Authorization'] = f'Bearer {self.auth_token}'
        
        try:
            logger.info(f"Triggering n8n webhook: {url}")
            response = await self.client.post(url, json=data, headers=headers)
            response.raise_for_status()
            
            result = response.json()
            logger.info(f"n8n webhook response: {result}")
            return result
            
        except httpx.HTTPError as e:
            logger.error(f"n8n webhook error: {e}")
            raise Exception(f"Failed to trigger n8n webhook: {str(e)}")
    
    async def close(self):
        """Close the HTTP client"""
        await self.client.aclose()

class EventHubMCPServer:
    """Main MCP Server for EventHub AI operations"""
    
    def __init__(self, n8n_base_url: str, n8n_auth_token: Optional[str] = None):
        self.n8n_client = N8NClient(n8n_base_url, n8n_auth_token)
        self.server = MCPServer("eventhub-ai")
        self._register_tools()
    
    def _register_tools(self):
        """Register all MCP tools"""
        
        @self.server.tool()
        async def assess_event_risk(
            event_data: Dict[str, Any]
        ) -> Dict[str, Any]:
            """
            Assess risk for a submitted event using AI analysis.
            
            Args:
                event_data: Dictionary containing event information
                
            Returns:
                Risk assessment with score, factors, and recommendation
            """
            try:
                event = EventData(**event_data)
                
                workflow_data = {
                    "event_id": event.event_id,
                    "title": event.title,
                    "description": event.description,
                    "location": event.location,
                    "date_time": event.date_time,
                    "organizer_email": event.organizer_email,
                    "organizer_id": event.organizer_id,
                    "category": event.category,
                    "max_attendees": event.max_attendees,
                    "price": event.price,
                    "external_urls": event.external_urls,
                    "timestamp": datetime.utcnow().isoformat()
                }
                
                result = await self.n8n_client.trigger_webhook(
                    "risk-assessment", 
                    workflow_data
                )
                
                if "risk_score" not in result:
                    raise Exception("Invalid response from risk assessment workflow")
                
                assessment = RiskAssessmentResult(
                    event_id=event.event_id,
                    risk_score=int(result["risk_score"]),
                    risk_level=result.get("risk_level", "UNKNOWN"),
                    risk_factors=result.get("risk_factors", []),
                    confidence=float(result.get("confidence", 0.0)),
                    recommendation=result.get("recommendation", ""),
                    analysis_timestamp=datetime.utcnow().isoformat()
                )
                
                logger.info(f"Risk assessment completed for event {event.event_id}: {assessment.risk_score}/100")
                
                return assessment.dict()
                
            except Exception as e:
                logger.error(f"Risk assessment failed: {str(e)}")
                raise Exception(f"Risk assessment failed: {str(e)}")
        
        @self.server.tool()
        async def send_rsvp_confirmation(
            rsvp_data: Dict[str, Any]
        ) -> Dict[str, Any]:
            """
            Send RSVP confirmation email to user.
            
            Args:
                rsvp_data: Dictionary containing RSVP and event information
                
            Returns:
                Confirmation of email sent
            """
            try:
                rsvp = RSVPData(**rsvp_data)
                
                email_data = {
                    "type": "rsvp_confirmation",
                    "rsvp_id": rsvp.rsvp_id,
                    "event_id": rsvp.event_id,
                    "recipient_email": rsvp.user_email,
                    "recipient_name": rsvp.user_name,
                    "event_title": rsvp.event_title,
                    "event_date": rsvp.event_date,
                    "event_location": rsvp.event_location,
                    "organizer_name": rsvp.organizer_name,
                    "timestamp": datetime.utcnow().isoformat()
                }
              
                result = await self.n8n_client.trigger_webhook(
                    "email-automation", 
                    email_data
                )
                
                logger.info(f"RSVP confirmation sent for event {rsvp.event_id} to {rsvp.user_email}")
                
                return {
                    "success": True,
                    "message": "RSVP confirmation email sent successfully",
                    "rsvp_id": rsvp.rsvp_id,
                    "email_id": result.get("email_id"),
                    "sent_at": datetime.utcnow().isoformat()
                }
                
            except Exception as e:
                logger.error(f"RSVP confirmation failed: {str(e)}")
                raise Exception(f"Failed to send RSVP confirmation: {str(e)}")
        
        @self.server.tool()
        async def schedule_event_reminder(
            rsvp_data: Dict[str, Any]
        ) -> Dict[str, Any]:
            """
            Schedule 24-hour reminder email for event attendee.
            
            Args:
                rsvp_data: Dictionary containing RSVP and event information
                
            Returns:
                Confirmation of reminder scheduled
            """
            try:
                rsvp = RSVPData(**rsvp_data)
                event_datetime = datetime.fromisoformat(rsvp.event_date.replace('Z', '+00:00'))
                reminder_time = event_datetime - timedelta(hours=24)
                
                if reminder_time <= datetime.utcnow().replace(tzinfo=reminder_time.tzinfo):
                    return {
                        "success": False,
                        "message": "Event is less than 24 hours away, reminder not scheduled",
                        "rsvp_id": rsvp.rsvp_id
                    }
                
                reminder_data = {
                    "type": "event_reminder",
                    "rsvp_id": rsvp.rsvp_id,
                    "event_id": rsvp.event_id,
                    "recipient_email": rsvp.user_email,
                    "recipient_name": rsvp.user_name,
                    "event_title": rsvp.event_title,
                    "event_date": rsvp.event_date,
                    "event_location": rsvp.event_location,
                    "organizer_name": rsvp.organizer_name,
                    "reminder_time": reminder_time.isoformat(),
                    "schedule_at": reminder_time.isoformat(),
                    "timestamp": datetime.utcnow().isoformat()
                }
                
                # Trigger n8n reminder scheduling workflow
                result = await self.n8n_client.trigger_webhook(
                    "schedule-reminder", 
                    reminder_data
                )
                
                logger.info(f"Reminder scheduled for event {rsvp.event_id} to {rsvp.user_email} at {reminder_time}")
                
                return {
                    "success": True,
                    "message": "Event reminder scheduled successfully",
                    "rsvp_id": rsvp.rsvp_id,
                    "reminder_id": result.get("reminder_id"),
                    "scheduled_for": reminder_time.isoformat()
                }
                
            except Exception as e:
                logger.error(f"Reminder scheduling failed: {str(e)}")
                raise Exception(f"Failed to schedule reminder: {str(e)}")
        
        @self.server.tool()
        async def get_risk_assessment_history(
            event_id: str,
            limit: int = 10
        ) -> Dict[str, Any]:
            """
            Get risk assessment history for an event or organizer.
            
            Args:
                event_id: Event ID to get history for
                limit: Maximum number of assessments to return
                
            Returns:
                List of historical risk assessments
            """
            try:
                query_data = {
                    "type": "risk_history",
                    "event_id": event_id,
                    "limit": limit,
                    "timestamp": datetime.utcnow().isoformat()
                }
                
                result = await self.n8n_client.trigger_webhook(
                    "query-history", 
                    query_data
                )
                
                return {
                    "success": True,
                    "event_id": event_id,
                    "assessments": result.get("assessments", []),
                    "total_count": result.get("total_count", 0)
                }
                
            except Exception as e:
                logger.error(f"History query failed: {str(e)}")
                raise Exception(f"Failed to get risk assessment history: {str(e)}")
    
    async def run(self, transport_type: str = "stdio"):
        """Run the MCP server"""
        if transport_type == "stdio":
            from mcp.server.stdio import stdio_server
            async with stdio_server() as (read_stream, write_stream):
                await self.server.run(
                    read_stream, 
                    write_stream, 
                    InitializationOptions(
                        server_name="EventHub AI Server",
                        server_version="1.0.0",
                        capabilities={
                            "tools": {}
                        }
                    )
                )
        else:
            raise ValueError(f"Unsupported transport type: {transport_type}")
    
    async def cleanup(self):
        """Cleanup resources"""
        await self.n8n_client.close()


async def main():
    """Example usage of the MCP server"""
    # Initialize MCP server
    server = EventHubMCPServer(
        n8n_base_url="https://your-n8n-instance.render.com",
        n8n_auth_token="your-n8n-auth-token" 
    )
    
    try:
        test_event = {
            "event_id": str(uuid4()),
            "title": "Free Yoga Class in the Park",
            "description": "Join us for a relaxing yoga session in Central Park. Bring your own mat!",
            "location": "Central Park, NYC",
            "date_time": "2024-03-15T10:00:00Z",
            "organizer_email": "yoga@example.com",
            "organizer_id": str(uuid4()),
            "category": "Health & Wellness",
            "max_attendees": 20,
            "price": 0.0,
            "external_urls": []
        }
        await server.run()
        
    except KeyboardInterrupt:
        logger.info("Shutting down MCP server...")
    finally:
        await server.cleanup()

if __name__ == "__main__":
    asyncio.run(main())

In [None]:
{ "event_id": 12345, "title": "FREE iPhone 14 Giveaway!!! Click Now!!!", "description": "WIN FREE IPHONE!!! Just click link and enter credit card for verification! Limited time only!!! Sign  up here!! ["bit.ly/suspicious-link"]" }

In [3]:
import requests
import json

url = "https://karanja-kariuki.app.n8n.cloud/webhook-test/92c9b343-6599-4253-afb4-711946738a55"
headers = {"KK_ACCESS_PASS": "CFtMJFnPwxAT8EMI5aPOfS7fA4E9qA3dX"}

payload = {
  "event_id": 12347, 
  "title": "Local Community Garden Workshop", 
  "description": "Join us this Saturday for a hands-on workshop about organic gardening. Learn composting, planting techniques, and sustainable practices. Free event, registration required.", "urls": ["https://community-garden.org/workshop"]
}
response = requests.get(url, headers=headers,json=payload)
response.text

'{"output":"```json\\n{\\n  \\"event_id\\": 12347,\\n  \\"risk_score\\": 5,\\n  \\"reasoning\\": \\"Legitimate community event with verified official website\\",\\n  \\"flags\\": []\\n}\\n```\\n"}'

In [None]:
{
  "name": "Karanja Kariuki",
  "email": "kariuki.karanja@students.jkuat.ac.ke", 
  "event_title": "Blankets and Wine Night",
  "event_date": "2024-09-20",
  "event_time": "9:00 P.M.",
  "event_place": "Kasarani",
  "reminder_type": "rsvp"
}

In [7]:
import requests
import json

url = "https://karanja-kariuki.app.n8n.cloud/webhook/92c9b343-6599-4253-afb4-711946738a55"
headers = {"KK_ACCESS_PASS": "CFtMJFnPwxAT8EMI5aPOfS7fA4E9qA3dX"}

payload = {
  "event_id": 123, 
  "title": "FREE iPhone 14 Giveaway!!! Click Now!!!", 
  "description": "WIN FREE IPHONE!!! Just click link and enter credit card for verification! Limited time only!!! Sign  up here!! ['bit.ly/suspicious-link']" 
}
response = requests.get(url, headers=headers,json=payload)
response.text

'{"output":"```json\\n{\\n  \\"event_id\\": 123,\\n  \\"risk_score\\": 95,\\n  \\"reasoning\\": \\"Giveaway scam, clickbait, suspicious URL.\\",\\n  \\"flags\\": [\\"Giveaway Scam\\", \\"Suspicious URL\\", \\"Clickbait\\"]\\n}\\n```\\n"}'

In [None]:
{
  "event_id": 12345, 
  "title": "FREE iPhone 14 Giveaway!!! Click Now!!!", 
  "description": "WIN FREE IPHONE!!! Just click link and enter credit card for verification! Limited time only!!! Sign  up here!! ['bit.ly/suspicious-link']" 
}

In [None]:
https://api-bcbe5a.stack.tryrelevance.com/latest/agents/hooks/custom-trigger/173cb28b-2bdc-4a0b-bad8-d7137c5c1945/c480d6fd-6a4d-427b-9714-5c54a3060ba7

In [None]:
import chromadb
from sentence_transformers import SentenceTransformer
import uuid

def load_txt_to_chroma(txt_file_path, collection_name="documents", chunk_size=512):
    client = chromadb.PersistentClient(path="./chroma_db")
    model = SentenceTransformer('all-MiniLM-L6-v2')
    collection = client.get_or_create_collection(name=collection_name)

    with open(txt_file_path, 'r', encoding='utf-8') as f:
        text = f.read()
    chunks = [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)]
    embeddings = model.encode(chunks).tolist()
    ids = [str(uuid.uuid4()) for _ in chunks]
    collection.add(
        embeddings=embeddings,
        documents=chunks,
        ids=ids
    )
    print(f"Added {len(chunks)} chunks to collection '{collection_name}'")
    return collection

if __name__ == "__main__":
    collection = load_txt_to_chroma("Context.txt")
    results = collection.query(
        query_texts=["your search query"],
        n_results=3
    )
    print("Search results:", results['documents'][0])

In [12]:
import re
def filter_pii(text: str) -> str:
    """Remove PII from text using regex patterns"""
    text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '[EMAIL]', text)    
    text = re.sub(r'(\+?1?[-.\s]?)?\(?([0-9]{3})\)?[-.\s]?([0-9]{3})[-.\s]?([0-9]{4})', '[PHONE]', text)    
    text = re.sub(r'\b(?:\d{4}[-\s]?){3}\d{4}\b', '[CARD]', text)    
    text = re.sub(r'\b\d{3}-?\d{2}-?\d{4}\b', '[SSN]', text)    
    return text

In [15]:
filter_pii("Come down to our harambe event this Saturday! Contact me at 0794123456 or email me at gasfhs@gmail.com")

'Come down to our harambe event this Saturday! Contact me at[PHONE] or email me at [EMAIL]'