Skip to content

Event-driven framework for building AI agents with streaming responses, function calling, interrupts, and message queuing.

License

Notifications You must be signed in to change notification settings

sandipan1/event-driven-agent

Repository files navigation

Event-Driven AI Agent

A Python + React implementation of a conversational AI agent with event-driven architecture, message queuing, and real-time WebSocket communication.

architecture

What is this?

This is like ChatGPT, but built with an event-driven architecture that enables:

  • Message queuing - Send multiple messages while AI is responding; they're automatically queued and executed sequentially
  • Non-blocking operations - Async OpenAI client allows concurrent event processing
  • Real-time UI updates - React frontend shows connection status and processing indicators via WebSocket
  • Decoupled architecture - EventBus pub-sub pattern for loose coupling between services

Real Example: Building a Data Analysis Script

User: "Write a Python script to analyze CSV sales data"
  ↓
AI: "Sure! Here's a script that reads CSV files and calculates..."
  [AI is typing...]

User: "Actually, also add visualization with matplotlib"
  [Message queued - AI still responding to first question]
  ↓
AI: "...total sales by region. Here's the code: [code block]"
  [First response complete]
  ↓
  [Queued message automatically processed]
  ↓
AI: "Great! I'll add matplotlib visualization. Here's the updated script..."
  [Generates code with plotting]

--- Future Capabilities ---

AI: <function_calls>
      <invoke name="execute_python">
        <code>import matplotlib.pyplot as plt...</code>
      </invoke>
    </function_calls>
  ↓
  [Approval modal appears instantly]
  "AI wants to execute Python code. Approve?"
  [Shows code preview]
  ↓
User: [Clicks "Approve"]
  ↓
AI: "Great! Here's the output: [chart image] Total sales: $125,000"
  ↓
User: [Sees response is going in wrong direction]
User: [Clicks "Stop" button]
  ↓
AI: "Response interrupted"
  ↓
User: "Instead, show me monthly trends"
  ↓
AI: "Sure! Here's the monthly analysis..."

Problem 1: Concurrent User Actions

User: "Write a 10-page essay on quantum physics"
LLM: "Quantum physics is the study of..." [streaming...]
User: "Wait, just give me a summary!" [types while streaming]

Without queuing: Second message is lost or errors out
With this framework: Automatically queued and processed after first response completes

Problem 2: Long-Running Operations

User: "Explain quantum physics"
LLM: [streaming 5 minutes of text...]
User: "Stop!" [clicks stop button]

Without interrupts: User stuck waiting, can't cancel
With this framework: Graceful interrupt with proper state cleanup

Problem 3: Function Calls Mid-Stream

LLM: "Sure! <function_calls><invoke><code>dangerous_code()</code></invoke>"

Need to: Parse while streaming, pause for approval, execute safely, continue conversation

Communication Flow:

Frontend                WebSocket                Backend
   │                       │                       │
   │──── user_message ────→│──────────────────────→│ EventBus
   │                       │                       │ StateManager
   │                       │                       │ LLMService
   │←──── state_update ────│←──────────────────────│ (broadcasts state)
   │                       │                       │
   │──── user_message ────→│ (while LLM running)   │ → Queued
   │                       │                       │
   │←──── state_update ────│←──────────────────────│ (queue processed)

WebSocket maintains a persistent bidirectional channel for all frontend ↔ backend communication.

Architecture Philosophy

Event-Driven Design with Pure Reducers

This implementation uses a decoupled event-driven architecture:

Core Components

  1. EventBus: Central pub-sub hub using asyncio.Queue

    • Loose coupling: Components only know about EventBus
    • Parallel processing: Multiple services subscribe independently
    • Extensible: Add new subscribers without modifying existing code
  2. StateManager: Applies events through pure reducer functions

    • Reducer: (state, event) → new_state (pure function)
    • Thread-safe updates with asyncio.Lock
    • Side effects (like triggering LLM) happen AFTER state updates
    • Message Queuing of user messages during LLM processing
  3. LLMService: Handles OpenAI API calls

    • Subscribes to llm_response_started events
    • Uses AsyncOpenAI for non-blocking I/O
    • Publishes llm_response_completed when done
    • Current limitation: Directly accesses StateManager's state (tight coupling)
    • Future: Will only interact through EventBus (fully decoupled)
  4. WebSocket: Bidirectional real-time communication

    • Publishes user_message events
    • Subscribes to all events to broadcast state updates
    • React frontend consumes state updates
    • Future bidirectional features:
      • User can interrupt LLM mid-response
      • Approve/reject function calls in real-time
      • Multi-client synchronization (same conversation across tabs)

Setup

Backend (Python + FastAPI)

# 1. Create virtual environment
python -m venv .venv
source .venv/bin/activate  # or `.venv\Scripts\activate` on Windows

# 2. Install dependencies
pip install -e .

# 3. Configure API key
cp .env.example .env
# Edit .env and add: OPENAI_API_KEY=your_key_here

# 4. Run the server
cd src
uvicorn main:app --reload --port 8000

Backend starts at: http://localhost:8000 WebSocket endpoint: ws://localhost:8000/ws

Frontend (React + TypeScript + Vite)

# 1. Navigate to frontend
cd frontend

# 2. Install dependencies
npm install

# 3. Configure backend URL (optional)
cp .env.example .env
# Edit .env if backend is not on localhost:8000

# 4. Run development server
npm run dev

Frontend starts at: http://localhost:5173

Full Stack

Open two terminals:

# Terminal 1 - Backend
source .venv/bin/activate && cd src && uvicorn main:app --reload --port 8000

# Terminal 2 - Frontend
cd frontend && npm run dev

Then open http://localhost:5173 in your browser.

Quick Test

Using the Frontend

  1. Start both backend and frontend (see Setup above)
  2. Open http://localhost:5173
  3. Type a message and press Enter
  4. Watch the UI:
    • 🟢 Connected indicator when WebSocket is active
    • AI is responding... indicator during LLM processing
    • Input remains enabled - try sending another message while AI responds!
    • Queued messages will process automatically after current response

Using Browser Console

Open browser console at http://localhost:8000 and paste:

const ws = new WebSocket('ws://localhost:8000/ws');

ws.onmessage = (e) => console.log('📩', JSON.parse(e.data));

ws.onopen = () => {
  ws.send(JSON.stringify({
    type: 'user_message',
    content: 'Write a Python function to add two numbers'
  }));
};

You'll see state updates:

📩 {type: "state_update", event_type: "user_message", messages: [...], is_streaming: false}
📩 {type: "state_update", event_type: "llm_response_started", is_streaming: true}
📩 {type: "state_update", event_type: "llm_response_completed", is_streaming: false}

Current Features (Phase 1)

Event-Driven Architecture

  • EventBus pub-sub pattern
  • Loose coupling between services
  • Extensible design

Message Queuing

  • Send messages while LLM is processing
  • Automatic sequential execution
  • Backend-managed queue

Async OpenAI Integration

  • Non-blocking I/O with AsyncOpenAI
  • Allows concurrent event processing
  • Full response returned at once (not streaming yet)

React Frontend

  • Real-time WebSocket connection
  • Connection status indicator
  • Processing spinner during LLM responses
  • Input enabled during processing (for queuing)

Pure Reducer Pattern

  • Predictable state updates
  • Thread-safe with asyncio.Lock
  • Side effects separated from state logic

Future Enhancements

True Streaming (Phase 2)

  • Stream text character-by-character from OpenAI
  • Update UI incrementally as chunks arrive

Interrupt Handling (Phase 3)

  • Cancel LLM mid-response
  • User-initiated stop button

Function Calling (Phase 4)

  • Parse <function_calls> in LLM responses
  • Execute code with user approval
  • Tool integration

Project Structure

event-driven-agent/
├── src/                          # Backend (Python + FastAPI)
│   ├── main.py                   # FastAPI app + WebSocket endpoint
│   ├── event_bus.py              # EventBus implementation (pub-sub)
│   ├── events.py                 # Event type definitions
│   ├── reducers.py               # Pure state reducer functions
│   ├── state_manager.py          # State management + side effects
│   ├── models.py                 # Pydantic models
│   └── services/
│       ├── __init__.py
│       └── llm_service.py        # LLM API integration
├── frontend/                     # Frontend (React + TypeScript + Vite)
│ 
├── tests/
│   
├── pyproject.toml                # Python 
├── .env                          # Backend API keys (gitignored)
├── .env.example                  # Example environment variables
└── README.md

Current State: Phase 1

What works:

  • ✅ WebSocket connection
  • ✅ Send message → get response
  • ✅ Basic chat functionality

What's coming:

  • ⏳ Streaming (Phase 2)
  • ⏳ Message history (Phase 3)
  • ⏳ Queuing inputs during response (Phase 4)
  • ⏳ Interrupt handling (Phase 5)
  • ⏳ Function calling (Phase 6)
  • ⏳ Command execution (Phase 7)

Why Build This?

Goal: Understand how to build production-ready conversational AI systems with:

  • Real-time streaming
  • Concurrent operation handling
  • Graceful interrupts
  • Function calling with approval flows

Production Patterns: Patterns used here (state management, queuing, interrupts) are needed for any real-world AI agent.

Contributing

Contributions are welcome! Here's how to get started:

  1. Fork and Clone: Fork the repository and clone it locally
  2. Setup: Follow the setup instructions above for both backend and frontend
  3. Make Changes: Create a new branch for your feature or bug fix
  4. Test: Ensure your changes work and don't break existing functionality
  5. Submit PR: Open a pull request with a clear description of your changes

Guidelines

  • Follow existing code style and patterns (event-driven architecture, pure reducers)
  • Add comments for complex logic
  • Test your changes before submitting
  • Keep PRs focused on a single feature or fix
  • Update documentation if needed

Areas for Contribution

  • Adding true streaming (character-by-character)
  • Implementing interrupt handling
  • Adding function calling support
  • Improving UI/UX
  • Adding tests
  • Fixing bugs
  • Improving documentation

Questions? Open an issue to discuss your ideas!

About

Event-driven framework for building AI agents with streaming responses, function calling, interrupts, and message queuing.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published