Skip to content
main
Switch branches/tags
Code

Build Status Coverage Status Documentation Status Latest Release

Event Sourcing in Python

A library for event sourcing in Python.

"totally amazing and a pleasure to use"

Read the docs. See also extension projects.

Installation

Use pip to install the stable distribution from the Python Package Index. Please note, it is recommended to install Python packages into a Python virtual environment.

$ pip install eventsourcing

Synopsis

Use the library's Aggregate base class and @event decorator to define event-sourced aggregates.

Derive your aggregate classes from the Aggregate base class. Create new aggregate instances by calling the derived aggregate class. Use the @event decorator to define aggregate event classes. Events will be triggered when decorated methods are called.

from eventsourcing.domain import Aggregate, event


class World(Aggregate):
    @event('Created')
    def __init__(self, name: str) -> None:
        self.name = name
        self.history = ()

    @event('SomethingHappened')
    def make_it_so(self, what: str) -> None:
        self.history += (what,)

Use the library's Application class to define an event-sourced application. Applications store and retrieve aggregates, using a persistence mechanism for aggregate events.

Derive your application classes from the Application base class. Add command and query methods to manipulate and access the state of the application.

from typing import Tuple
from uuid import UUID

from eventsourcing.application import Application


class Universe(Application):
    def create_world(self, name: str) -> UUID:
        world = World(name)
        self.save(world)
        return world.id

    def make_it_so(self, world_id: UUID, what: str) -> None:
        world = self._get_world(world_id)
        world.make_it_so(what)
        self.save(world)

    def get_history(self, world_id) -> Tuple:
        return self._get_world(world_id).history

    def _get_world(self, world_id) -> World:
        world = self.repository.get(world_id)
        assert isinstance(world, World)
        return world

Construct an instance of the application by calling the application class.

application = Universe()

Create a new aggregate by calling the application method create_world().

world_id = application.create_world('Earth')

Evolve the state of the application's aggregate by calling the application command method make_it_so().

application.make_it_so(world_id, 'dinosaurs')
application.make_it_so(world_id, 'trucks')
application.make_it_so(world_id, 'internet')

Access the state of the application's aggregate by calling the application query method get_history().

history = application.get_history(world_id)
assert history == ('dinosaurs', 'trucks', 'internet')

See the discussion below and the library's documentation for more information.

Features

Aggregates and applications — base classes for event-sourced aggregates and applications. Suggests how to structure an event-sourced application. All classes are fully type-hinted to guide developers in using the library.

Flexible event store — flexible persistence of aggregate events. Combines an event mapper and an event recorder in ways that can be easily extended. Mapper uses a transcoder that can be easily extended to support custom model object types. Recorders supporting different databases can be easily substituted and configured with environment variables.

Application-level encryption and compression — encrypts and decrypts events inside the application. This means data will be encrypted in transit across a network ("on the wire") and at disk level including backups ("at rest"), which is a legal requirement in some jurisdictions when dealing with personally identifiable information (PII) for example the EU's GDPR. Compression reduces the size of stored aggregate events and snapshots, usually by around 25% to 50% of the original size. Compression reduces the size of data in the database and decreases transit time across a network.

Snapshotting — reduces access-time for aggregates that have many events.

Versioning - allows changes to be introduced after an application has been deployed. Both aggregate events and aggregate snapshots can be versioned.

Optimistic concurrency control — ensures a distributed or horizontally scaled application doesn't become inconsistent due to concurrent method execution. Leverages optimistic concurrency controls in adapted database management systems.

Notifications and projections — reliable propagation of application events with pull-based notifications allows the application state to be projected accurately into replicas, indexes, view models, and other applications. Supports materialized views and CQRS.

Event-driven systems — reliable event processing. Event-driven systems can be defined independently of particular persistence infrastructure and mode of running.

Detailed documentation — documentation provides general overview, introduction of concepts, explanation of usage, and detailed descriptions of library classes.

Worked examples — includes examples showing how to develop aggregates, applications and systems.

Aggregates

The example above shows an event-sourced aggregate class named World. Its __init__() method takes a name argument which is used to initialise the name attribute, and also initialises a history attribute as an empty tuple. It has a method make_it_so() that takes an argument what and appends the given value to the history of the World.

The World class uses the aggregate base class Aggregate from the library's domain module. It uses the @event decorator to define two aggregate event classes, Created and SomethingHappened. The attributes of the event classes match the parameters of the decorated method signature.

Calling the World aggregate class will construct a new aggregate instance.

# Call the aggregate class to create a new aggregate object.
world = World(name='Earth')

# Check the name of the new aggregate object.
assert world.name == 'Earth'

# Check the state of the new aggregate object.
assert world.history == ()

The World aggregate object has an id attribute. It is a version 4 universally unique identifier (UUID). This follows from the default behaviour of the Aggregate base class.

# The aggregate has an ID.
assert isinstance(world.id, UUID)

Calling the make_it_so() command method will change the state of the aggregate.

# Execute aggregate commands.
world.make_it_so('dinosaurs')
world.make_it_so('trucks')
world.make_it_so('internet')

# Check aggregate state.
assert world.history[0] == 'dinosaurs'
assert world.history[1] == 'trucks'
assert world.history[2] == 'internet'

How does it work? When the aggregate class is called, a Created event class is constructed. This event object instance is used to construct an instance of the World aggregate class. The Created event class is defined with an attribute name. The event object instance will have the value of the name argument given when calling the aggregate class. After the aggregate object is instantiated, the event object is appended to an internal list of pending events. The World object is then returned to the caller.

Similarly, when the command method make_it_so() is called on an instance of the World class, a SomethingHappened event is constructed. The SomethingHappened event object class is defined with an attribute what. The event object instance will have the value of the what argument given when calling the command method. This event object is used to evolve the state of the aggregate object, using the body of the decorated method. Hence, the value what is appended to the history of the World aggregate instance. After the SomethingHappened event is applied to the aggregate, the event will be immediately appended to the aggregate's internal list of pending events.

Pending event objects can be collected using the aggregate's collect_events() method. The aggregate event classes are defined on the aggregate class, so that they are "namespaced" within the aggregate class. Hence, the "fully qualified" name of the Created event is World.Created. And the "fully qualified" name of the SomethingHappened event is World.SomethingHappened.

# Collect events.
events = world.collect_events()
assert len(events) == 4

assert type(events[0]).__name__ == 'Created'
assert type(events[1]).__name__ == 'SomethingHappened'
assert type(events[2]).__name__ == 'SomethingHappened'
assert type(events[3]).__name__ == 'SomethingHappened'

assert type(events[0]).__qualname__ == 'World.Created'
assert type(events[1]).__qualname__ == 'World.SomethingHappened'
assert type(events[2]).__qualname__ == 'World.SomethingHappened'
assert type(events[3]).__qualname__ == 'World.SomethingHappened'

Collecting and storing aggregate events and reconstructing aggregates from stored events are responsibilities of the library's Application class. The collect_events() method is used by the application's save() method to collect aggregate events, so the event objects can be stored in a database. You probably won't need to call this method explicitly in your project code.

The collected event objects can be used to reconstruct the state of the aggregate. The application repository's get() method reconstructs aggregates from stored events in this way (see below).

copy = None
for event in events:
    copy = event.mutate(copy)

assert copy.history == world.history
assert copy.id == world.id
assert copy.version == world.version
assert copy.created_on == world.created_on
assert copy.modified_on == world.modified_on

Please note, the body of a method decorated with the @event decorator will be executed each time the associated event is applied to evolve the state of the aggregate, both just after triggering the event and each time when reconstructing aggregates from stored events. For this reason, the body of the decorated method should return the Python value None. It is normal for command methods to return None. However, if you do need to return values from your aggregate command methods, then define a command method that is not decorated with the @event decorator, and return the value from the non-decorated method after calling the decorated method.

Any processing of method arguments that should be done only once, and not repeated when reconstructing aggregates from stored events, should be done in the calling method. For example, if the triggered event will have a new UUID, you will either want to use a separate command method, or create this value in the expression used when calling the method, and not generate the UUID in the decorated method body. Otherwise, rather than being fixed in the stored state of the aggregate, a new UUID will be created each time the aggregate is reconstructed. See the library's domain module documentation for more information.

Please also note, if an exception happens to be raised in the decorated method body, then the triggered event will not be appended to the internal list of pending events as described above. If you are careful, this behaviour (of not appending the event to the list of pending events) can be used to validate the state of the event against the current state of the aggregate. But if you wish to continue using the same aggregate instance after catching a validation exception in the caller of the decorated method, please just be careful to complete all validation before adjusting the state of the aggregate, otherwise you will need to retrieve a fresh instance from the repository.

Applications

An "application" object in this library roughly corresponds to a "bounded context" in Domain-Driven Design. An application can have aggregates of different types in its domain model.

The example above defines an event-sourced application named Universe. The application class Universe uses the application base class Application from the library's application module. When the Universe application class is called, an application object is constructed.

A Universe application object has a command method create_world() that creates and saves new instances of the aggregate class World. It has a command method make_it_so() that calls the aggregate command method make_it_so() of an already existing aggregate object. And it has a query method get_history() that returns the history of an aggregate object.

When the application command method create_world() is called, a new World aggregate object is created, the new aggregate object is saved by calling the application's save() method, and then the ID of the aggregate is returned to the caller.

When the application command method make_it_so() is called with the ID of an aggregate, the repository is used to get the aggregate, the aggregate's make_it_so() method is called with the given value of what, and the aggregate is saved by calling the application's save() method.

When the application query method get_history() is called with the ID of an aggregate, the repository is used to get the aggregate, and the value of the aggregate's history attribute is returned to the caller.

How does it work? The Application class provides persistence infrastructure that can collect, serialise, and store aggregate events. It can also reconstruct aggregates from stored events.

The application save() method saves aggregates by collecting and storing pending aggregate events. The save() method calls the given aggregate's collect_events() method and puts the pending aggregate events in an event store, with a guarantee that either all the events will be stored or none of them will be.

The application repository has a get() method that can be used to obtain previously saved aggregates. The get() method is called with an aggregate ID. It retrieves stored events for an aggregate from an event store, then reconstructs the aggregate object from its previously stored events (see above), and then returns the reconstructed aggregate object to the caller. The application class can be configured using environment variables to work with different databases, and optionally to encrypt and compress stored events. By default, the application serialises aggregate events using JSON, and stores them in memory as "plain old Python objects". The library includes support for storing events in SQLite and PosgreSQL (see below). Other databases are available.

The Application class also has a log object which can be used to get all the aggregate events that have been stored across all the aggregates of an application. The log presents the aggregate events in the order in which they were stored, as a sequence of event notifications. Each of the event notifications has an integer ID which increases along the sequence. The log can be used to propagate the state of the application in a manner that supports deterministic processing of the application state in event-driven systems.

log_section = application.log['1,4']
notifications = log_section.items
assert [n.id for n in notifications] == [1, 2, 3, 4]

assert 'World.Created' in notifications[0].topic
assert 'World.SomethingHappened' in notifications[1].topic
assert 'World.SomethingHappened' in notifications[2].topic
assert 'World.SomethingHappened' in notifications[3].topic

assert b'Earth' in notifications[0].state
assert b'dinosaurs' in notifications[1].state
assert b'trucks' in notifications[2].state
assert b'internet' in notifications[3].state

assert world_id == notifications[0].originator_id
assert world_id == notifications[1].originator_id
assert world_id == notifications[2].originator_id
assert world_id == notifications[3].originator_id

The test() function below demonstrates the example in more detail, by creating many aggregates in one application, reading event notifications from the application log, retrieving historical versions of an aggregate. The optimistic concurrency control feature, and the compression and encryption features are also demonstrated.

from eventsourcing.persistence import IntegrityError
from eventsourcing.system import NotificationLogReader


def test(app: Universe, expect_visible_in_db: bool):
    # Check app has zero event notifications.
    assert len(app.log['1,10'].items) == 0

    # Create a new aggregate.
    world_id = app.create_world('Earth')

    # Execute application commands.
    app.make_it_so(world_id, 'dinosaurs')
    app.make_it_so(world_id, 'trucks')

    # Check recorded state of the aggregate.
    assert app.get_history(world_id) == (
        'dinosaurs',
        'trucks'
    )

    # Execute another command.
    app.make_it_so(world_id, 'internet')

    # Check recorded state of the aggregate.
    assert app.get_history(world_id) == (
        'dinosaurs',
        'trucks',
        'internet'
    )

    # Check values are (or aren't visible) in the database.
    values = [b'dinosaurs', b'trucks', b'internet']
    if expect_visible_in_db:
        expected_num_visible = len(values)
    else:
        expected_num_visible = 0

    actual_num_visible = 0
    reader = NotificationLogReader(app.log)
    for notification in reader.read(start=1):
        for what in values:
            if what in notification.state:
                actual_num_visible += 1
                break
    assert expected_num_visible == actual_num_visible

    # Get historical state (at version 3, before 'internet' happened).
    old = app.repository.get(world_id, version=3)
    assert len(old.history) == 2
    assert old.history[-1] == 'trucks'  # last thing to have happened was 'trucks'

    # Check app has four event notifications.
    assert len(app.log['1,10'].items) == 4

    # Optimistic concurrency control (no branches).
    old.make_it_so('future')
    try:
        app.save(old)
    except IntegrityError:
        pass
    else:
        raise Exception("Shouldn't get here")

    # Check app still has only four event notifications.
    assert len(app.log['1,10'].items) == 4

    # Read event notifications.
    reader = NotificationLogReader(app.log)
    notifications = list(reader.read(start=1))
    assert len(notifications) == 4

    # Create eight more aggregate events.
    world_id = app.create_world('Mars')
    app.make_it_so(world_id, 'plants')
    app.make_it_so(world_id, 'fish')
    app.make_it_so(world_id, 'mammals')

    world_id = app.create_world('Venus')
    app.make_it_so(world_id, 'morning')
    app.make_it_so(world_id, 'afternoon')
    app.make_it_so(world_id, 'evening')

    # Get the new event notifications from the reader.
    last_id = notifications[-1].id
    notifications = list(reader.read(start=last_id + 1))
    assert len(notifications) == 8

    # Get all the event notifications from the application log.
    notifications = list(reader.read(start=1))
    assert len(notifications) == 12

This example can be adjusted and extended for any event-sourced application.

Project structure

You are free to structure your project files however you wish. You may wish to put your aggregate classes in a file named domainmodel.py and your application class in a file named application.py.

myproject/
myproject/application.py
myproject/domainmodel.py
myproject/tests.py

But you can start by first writing a failing test in tests.py, then define your application and aggregate classes in the test module, and then refactor by moving things to separate Python modules.

Development environment

We can run the code in default "development" environment using the default "Plain Old Python Object" infrastructure (which keeps stored events in memory). The example below runs with no compression or encryption of the stored events.

# Construct an application object.
app = Universe()

# Run the test.
test(app, expect_visible_in_db=True)

SQLite environment

You can configure "production" environment to use the library's SQLite infrastructure with the following environment variables. Using SQLite infrastructure will keep stored events in an SQLite database.

An in-memory SQLite database is used in this example. To store your events on disk, use a file path as the value of the SQLITE_DBNAME environment variable.

import os


# Use SQLite infrastructure.
os.environ['INFRASTRUCTURE_FACTORY'] = 'eventsourcing.sqlite:Factory'

# Configure SQLite database URI. Either use a file-based DB;
os.environ['SQLITE_DBNAME'] = '/path/to/your/sqlite-db'
# or use an in-memory DB with cache not shared, only works with single thread;
os.environ['SQLITE_DBNAME'] = ':memory:'
# or use an in-memory DB with shared cache, works with multiple threads;
os.environ['SQLITE_DBNAME'] = ':memory:?mode=memory&cache=shared'
# or use a named in-memory DB, allows distinct databases in same process.
os.environ['SQLITE_DBNAME'] = 'file:application1?mode=memory&cache=shared'

# Set optional lock timeout (default 5s).
os.environ['SQLITE_LOCK_TIMEOUT'] = '10'  # seconds

Please note, a file-based SQLite database will have its journal mode set to use write-ahead logging (WAL), which allows reading to proceed concurrently reading and writing. Writing is serialised with a lock. The lock timeout can be adjusted from the SQLite default of 5s by setting the environment variable SQLITE_LOCK_TIMEOUT.

Optionally, set the cipher key using environment variable CIPHER_KEY and select a compressor by setting environment variable COMPRESSOR_TOPIC.

This example uses the Python zlib module to compress stored events, and AES to encrypt the compressed stored events, before writing them to the SQLite database. To use the library's encryption functionality, please install the library with the crypto option (or just install the pycryptodome package.) To use an alternative cipher strategy, set the environment variable CIPHER_TOPIC.

$ pip install eventsourcing[crypto]
from eventsourcing.cipher import AESCipher

# Generate a cipher key (keep this safe).
cipher_key = AESCipher.create_key(num_bytes=32)

# Cipher key.
os.environ['CIPHER_KEY'] = cipher_key

# Compressor topic.
os.environ['COMPRESSOR_TOPIC'] = 'zlib'

Having configured the application with these environment variables, we can construct the application and run the test using SQLite.

# Construct an application object.
app = Universe()

# Run the test.
test(app, expect_visible_in_db=False)

PostgreSQL environment

You can configure "production" environment to use the library's PostgresSQL infrastructure with the following environment variables. Using PostgresSQL infrastructure will keep stored events in a PostgresSQL database.

Please note, to use the library's PostgreSQL functionality, please install the library with the postgres option (or just install the psycopg2 package.)

$ pip install eventsourcing[postgres]

Please note, the library option postgres_dev will install the psycopg2-binary which is much faster, but this is not recommended for production use. The binary package is a practical choice for development and testing but in production it is advised to use the package built from sources.

The example below also uses zlib and AES to compress and encrypt the stored events (but this is optional). To use the library's encryption functionality with PostgreSQL, please install the library with both the crypto and the postgres option (or just install the pycryptodome and psycopg2 packages.)

$ pip install eventsourcing[crypto,postgres]

It is assumed for this example that the database and database user have already been created, and the database server is running locally.

import os

from eventsourcing.cipher import AESCipher

# Generate a cipher key (keep this safe).
cipher_key = AESCipher.create_key(num_bytes=32)

# Cipher key.
os.environ['CIPHER_KEY'] = cipher_key
# Cipher topic.
os.environ['CIPHER_TOPIC'] = 'eventsourcing.cipher:AESCipher'
# Compressor topic.
os.environ['COMPRESSOR_TOPIC'] = 'eventsourcing.compressor:ZlibCompressor'

# Use Postgres infrastructure.
os.environ['INFRASTRUCTURE_FACTORY'] = 'eventsourcing.postgres:Factory'

# Configure database connections.
os.environ['POSTGRES_DBNAME'] = 'eventsourcing'
os.environ['POSTGRES_HOST'] = '127.0.0.1'
os.environ['POSTGRES_PORT'] = '5432'
os.environ['POSTGRES_USER'] = 'eventsourcing'
os.environ['POSTGRES_PASSWORD'] = 'eventsourcing'

# Optional config.
# - connection max age (connections stay open by default)
os.environ['POSTGRES_CONN_MAX_AGE'] = '60'  # seconds
# - check connection before use (pessimistic disconnect handling, default 'n')
os.environ['POSTGRES_PRE_PING'] = 'y'
# - timeout to wait for table lock when inserting (default no timeout)
os.environ['POSTGRES_LOCK_TIMEOUT'] = '10'  # seconds
# - timeout for sessions with idle transactions (default no timeout)
os.environ['POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT'] = '10'  # seconds

Please note, to avoid interleaving of inserts when writing events, an 'EXCLUSIVE' mode table lock is acquired when using PostgreSQL. This effectively serialises writing events. It prevents concurrent transactions interleaving inserts, which would potentially cause notification log readers that are tailing the application notification log to miss event notifications. Reading from the table can proceed concurrently with other readers and writers, since selecting acquires an 'ACCESS SHARE' lock which does not block and is not blocked by the 'EXCLUSIVE' lock. This issue of interleaving inserts by concurrent writers is not exhibited by SQLite, which supports concurrent readers when its journal mode is set to use write ahead logging.

Having configured the application with these environment variables, we can construct the application and run the test using PostgreSQL.

# Construct an application object.
app = Universe()

# Run the test.
test(app, expect_visible_in_db=False)

Extensions

The GitHub organisation Event Sourcing in Python hosts extension projects for the Python eventsourcing library. There are projects that support ORMs such as Django and SQLAlchemy. There are projects supporting databases such as AxonDB, DynamoDB, EventStoreDB, and Apache Kafka. Another project supports transcoding domain events with Protocol Buffers rather than JSON. There are also projects that provide examples of using the library with such things as FastAPI, Flask, and serverless.

Project

This project is hosted on GitHub.

Please register questions, requests and issues on GitHub, or post in the project's Slack channel.

There is a Slack channel for this project, which you are welcome to join.

Please refer to the documentation for installation and usage guides.