Skip to content

melvinkcx/fastapi-events

dev
Switch branches/tags
Code

Latest commit

 

Git stats

Files

Permalink
Failed to load latest commit information.
Type
Name
Latest commit message
Commit time
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

fastapi-events

An event dispatching/handling library for FastAPI, and Starlette.

Features:

  • straightforward API to emit events anywhere in your code
  • events are handled after responses are returned (doesn't affect response time)
  • supports event piping to remote queues
  • powerful built-in handlers to handle events locally and remotely
  • coroutine functions (async def) are the first-class citizen
  • write your handlers, never be limited to just what fastapi_events provides
  • (>=0.3.0) supports event payload validation via Pydantic (See here)
  • (>=0.4.0) supports event chaining: dispatching events within handlers (thank @ndopj for contributing to the idea)

Installation

pip install fastapi-events

To use it with AWS handlers, install:

pip install fastapi-events[aws]

Usage

fastapi-events supports both FastAPI and Starlette. To use it, simply configure it as middleware.

  • Configuring fastapi-events for FastAPI:

    from fastapi import FastAPI
    from fastapi.requests import Request
    from fastapi.responses import JSONResponse
    
    from fastapi_events.dispatcher import dispatch
    from fastapi_events.middleware import EventHandlerASGIMiddleware
    from fastapi_events.handlers.local import local_handler
    
    
    app = FastAPI()
    app.add_middleware(EventHandlerASGIMiddleware, 
                       handlers=[local_handler])   # registering handler(s)
    
    
    @app.get("/")
    def index(request: Request) -> JSONResponse:
        dispatch("my-fancy-event", payload={"id": 1})  # Emit events anywhere in your code
        return JSONResponse()    
  • Configuring fastapi-events for Starlette:

    from starlette.applications import Starlette
    from starlette.middleware import Middleware
    from starlette.requests import Request
    from starlette.responses import JSONResponse
    
    from fastapi_events.dispatcher import dispatch
    from fastapi_events.handlers.local import local_handler
    from fastapi_events.middleware import EventHandlerASGIMiddleware
    
    app = Starlette(middleware=[
        Middleware(EventHandlerASGIMiddleware,
                   handlers=[local_handler])  # registering handlers
    ])
    
    @app.route("/")
    async def root(request: Request) -> JSONResponse:
        dispatch("new event", payload={"id": 1})   # Emit events anywhere in your code
        return JSONResponse()

Dispatching events

Events can be dispatched anywhere in the code, as long as they are dispatched before a response is made.

# anywhere in code

from fastapi_events.dispatcher import dispatch

dispatch(
    "cat-requested-a-fish",  # Event name, accepts any valid string
    payload={"cat_id": "fd375d23-b0c9-4271-a9e0-e028c4cd7230"}  # Event payload, accepts any arbitrary data
)

dispatch("a_cat_is_spotted")  # This works too!

Event Payload Validation With Pydantic

Event payload validation is possible since version 0.3.0. To enable, simply register a Pydantic models with the corresponding event name.

import uuid
from enum import Enum
from datetime import datetime

from pydantic import BaseModel
from fastapi_events.registry.payload_schema import registry as payload_schema


class UserEvents(Enum):
    SIGNED_UP = "USER_SIGNED_UP"
    ACTIVATED = "USER_ACTIVATED"


# Registering your event payload schema
@payload_schema.register(event_name=UserEvents.SIGNED_UP)
class SignUpPayload(BaseModel):
    user_id: uuid.UUID
    created_at: datetime

Wildcard in event name is currently not supported

Payload will be validated automatically without any changes made while invoking the dispatcher.

# Events with payload schema registered
dispatch(UserEvents.SIGNED_UP)  # raises ValidationError, missing payload
dispatch(UserEvents.SIGNED_UP,
         {"user_id": "9e79cdbb-b216-40f7-9a05-20d223dee89a"})  # raises ValidationError, missing `created_at`
dispatch(UserEvents.SIGNED_UP,
         {"user_id": "9e79cdbb-b216-40f7-9a05-20d223dee89a", created_at: datetime.utcnow()})  # OK!

# Events without payload schema -> No validation will be performed
dispatch(UserEvents.ACTIVATED,
         {"user_id": "9e79cdbb-b216-40f7-9a05-20d223dee89a"})  # OK! no validation will be performed

Reminder: payload validation is optional. Payload of events without its schema registered will not be validated.

Handling Events

Handle events locally

The flexibility of fastapi-events allows us to customise how the events should be handled. For starters, you might want to handle your events locally.

# ex: in handlers.py

from fastapi_events.handlers.local import local_handler
from fastapi_events.typing import Event


@local_handler.register(event_name="cat*")
def handle_all_cat_events(event: Event):
    """
    this handler will match with an events prefixed with `cat`.
    ex: "cat_eats_a_fish", "cat_is_cute", etc
    """
    # the `event` argument is nothing more than a tuple of event name and payload
    event_name, payload = event

    # TODO do anything you'd like with the event


@local_handler.register(event_name="cat*")  # Tip: You can register several handlers with the same event name
def handle_all_cat_events_another_way(event: Event):
    pass


@local_handler.register(event_name="*")
async def handle_all_events(event: Event):
    # event handlers can be coroutine function too (`async def`)
    pass

Piping Events To Remote Queues

For larger projects, you might have services dedicated to handling events separately.

For instance, fastapi-events comes with AWS SQS forwarder to forward events to a remote queue.

  1. Register SQSForwardHandler as handlers:

    app = FastAPI()
    app.add_middleware(EventHandlerASGIMiddleware, 
                       handlers=[SQSForwardHandler(queue_url="test-queue",
                                                   region_name="eu-central-1")])   # registering handler(s)
  2. Start dispatching events! Events will be serialised into JSON format by default:

    ["event name", {"payload": "here is the payload"}]

Tip: to pipe events to multiple queues, provide multiple handlers while adding EventHandlerASGIMiddleware.

Built-in handlers

Here is a list of built-in event handlers:

  • LocalHandler / local_handler:

    • import from fastapi_events.handlers.local
    • for handling events locally. See examples above
    • event name pattern matching is done using Unix shell-style matching (fnmatch)
  • SQSForwardHandler:

    • import from fastapi_events.handlers.aws
    • to forward events to an AWS SQS queue
  • EchoHandler:

    • import from fastapi_events.handlers.echo
    • to forward events to stdout with pprint. Great for debugging purpose

Creating your own handler

Creating your own handler is nothing more than inheriting from the BaseEventHandler class in fastapi_events.handlers.base.

To handle events, fastapi_events calls one of these methods, in the following priority order:

  1. handle_many(events): The coroutine function should expect the backlog of the events collected.

  2. handle(event): In cases where handle_many() weren't defined in your custom handler, handle() will be called by iterating through the events in the backlog.

from typing import Iterable

from fastapi_events.typing import Event
from fastapi_events.handlers.base import BaseEventHandler


class MyOwnEventHandler(BaseEventHandler):
    async def handle(self, event: Event) -> None:
        """
        Handle events one by one
        """
        pass

    async def handle_many(self, events: Iterable[Event]) -> None:
        """
        Handle events by batch
        """
        pass

Cookbook

1) Suppressing Events / Disabling dispatch() Globally

In case you want to suppress events globally especially during testing, you can do so without having to mock or patch the dispatch() function. Simple set the environment variable FASTAPI_EVENTS_DISABLE_DISPATCH to 1, True or any truthy values.

2) Validating Event Payload During Dispatch

Requires Pydantic, which comes with FastAPI. If you're using Starlette, you might need to install Pydantic

See Event Payload Validation With Pydantic

3) Dispatching events within handlers (Event Chaining)

It is now possible to dispatch events within another event handlers. You'll need version 0.4 or above.

Comparison between events dispatched within the request-response cycle and event handlers are:

dispatched within request-response cycle dispatched within event handlers
processing of events will be handled after the response has been made will be scheduled to the running event loop immediately
order of processing always after the response is made not guaranteed
supports payload schema validation with Pydantic Yes Yes
can be disabled globally with FASTAPI_EVENTS_DISABLE_DISPATCH Yes Yes

FAQs:

  1. I'm getting LookupError when dispatch() is used:

        def dispatch(event_name: str, payload: Optional[Any] = None) -> None:
    >       q: Deque[Event] = event_store.get()
    E       LookupError: <ContextVar name='fastapi_context' at 0x400a1f12b0>

    Answer:

    dispatch() relies on ContextVars to work properly. There are many reasons why LookupError can occur. A common reason is dispatch() is called outside the request-response lifecycle of FastAPI/Starlette, such as calling dispatch() after a response has been returned.

    If you're getting this during testing, you may consider disabling dispatch() during testing. See Suppressing Events / Disabling dispatch() Globally for details.

  2. My event handlers are not registered / Local handlers are not being executed:

    Answer:

    Make sure the module where your local event handlers are defined is loaded during runtime. A simple fix is to import the module in your __init__.py. This will ensure the modules are properly loaded during runtime.

Feedback, Questions?

Any form of feedback and questions are welcome! Please create an issue here.

About

Asynchronous event dispatching/handling library for FastAPI and Starlette

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages