Eventing infrastructure for event-sourced architectures.
pip install logicblocks-event-store
import asyncio
from logicblocks.event.store import EventStore, adapters
from logicblocks.event.types import NewEvent, StreamIdentifier
from logicblocks.event.projection import Projector
class ProfileProjector(
Projector[StreamIdentifier, dict[str, str], dict[str, str]]
):
def initial_state_factory(self) -> dict[str, str]:
return {}
def initial_metadata_factory(self) -> dict[str, str]:
return {}
def id_factory(self, state, source: StreamIdentifier) -> str:
return source.stream
def profile_created(self, state, event):
state['name'] = event.payload['name']
state['email'] = event.payload['email']
return state
def date_of_birth_set(self, state, event):
state['dob'] = event.payload['dob']
return state
async def main():
adapter = adapters.InMemoryEventStorageAdapter()
store = EventStore(adapter)
stream = store.stream(category="profiles", stream="joe.bloggs")
profile_created_event = NewEvent(name="profile-created",
payload={"name": "Joe Bloggs", "email": "joe.bloggs@example.com"})
date_of_birth_set_event = NewEvent(name="date-of-birth-set", payload={"dob": "1992-07-10"})
await stream.publish(
events=[
profile_created_event
])
await stream.publish(
events=[
date_of_birth_set_event
]
)
projector = ProfileProjector()
projection = await projector.project(source=stream)
profile = projection.state
asyncio.run(main())
# profile == {
# "name": "Joe Bloggs",
# "email": "joe.bloggs@example.com",
# "dob": "1992-07-10"
# }
- Event modelling:
- Log / category / stream based: events are grouped into logs of categories of streams.
- Arbitrary payloads and metadata: events can have arbitrary payloads and metadata limited only by what the underlying storage backend can support.
- Bi-temporality support: events included timestamps for both the time the event occurred and the time the event was recorded in the log.
- Event storage:
- Immutable and append only: the event store is modelled as an append-only log of immutable events.
- Consistency guarantees: concurrent stream updates can optionally be handled with optimistic concurrency control.
- Write conditions: an extensible write condition system allows pre-conditions to be evaluated before publish.
- Ordering guarantees: event writes are serialised (at log level by default, but customisable) to guarantee consistent ordering at scan time.
asyncio
support: the event store is implemented usingasyncio
and can be used in cooperative multitasking applications.
- Storage adapters:
- Storage adapter abstraction: adapters are provided for different storage
backends, currently including:
- an in-memory implementation for testing and experimentation; and
- a PostgreSQL backed implementation for production use.
- Extensible to other backends: the storage adapter abstract base class is designed to be relatively easily implemented to support other storage backends.
- Storage adapter abstraction: adapters are provided for different storage
backends, currently including:
- Projections:
- Reduction: event sequences can be reduced to a single value, a projection, using a projector.
- Metadata: projections have metadata for keeping track of things like update timestamps, versions, etc.
- Storage: a general purpose projection store allows easy management of projections for the majority of use cases, utilising the same adapter architecture as the event store, with a rich and customisable query language providing store search.
- Snapshotting: coming soon.
- Types:
- Type hints: includes type hints for all public classes and functions.
- Value types: includes serialisable value types for identifiers, events and projections.
- Pydantic support: coming soon.
- Testing utilities:
- Builders: includes builders for events to simplify testing.
- Data generators: includes random data generators for events and event attributes.
- Storage adapter tests: includes tests for storage adapters to ensure consistency across implementations.
To run the full pre-commit build:
./go
To run tests:
./go library:test:all # all tests
./go library:test:unit # unit tests
./go library:test:integration # integration tests
./go library:test:component # integration tests
The unit, integration and component tests can be run with a filter option allowing running a subset of tests in the suite, for example:
./go library:test:unit[TestAllTestsInFile]
./go library:test:component[test_a_specific_test]
To perform linting:
./go library:lint:check # check linting rules are met
./go library:lint:fix # attempt to fix linting issues
To format code:
./go library:format:check # check code formatting
./go library:format:fix # attempt to fix code formatting
To run type checking:
./go library:type:check # check type hints
To build packages:
./go library:build
To see all available tasks:
./go -T
Bug reports and pull requests are welcome on GitHub at https://github.com/logicblocks/event.store. This project is intended to be a safe, welcoming space for collaboration, and contributors are expected to adhere to the Contributor Covenant code of conduct.
Copyright © 2025 LogicBlocks Maintainers
Distributed under the terms of the MIT License.