The application layer combines objects from the domain and infrastructure layers.
- local
An application object normally has repositories and policies. A repository allows aggregates to be retrieved by ID, using a dictionary-like interface. Whereas aggregates implement commands that publish events, obversely, policies subscribe to events and then execute commands as events are received. An application can be well understood by understanding its policies, aggregates, commands, and events.
An application object can have methods ("application services") which provide a relatively simple interface for client operations, hiding the complexity and usage of the application's domain and infrastructure layers.
Application services can be developed outside-in, with a test- or behaviour-driven development approach. A test suite can be imagined as an interface that uses the application. Interfaces are outside the scope of the application layer.
To run the examples below, please install the library with the 'sqlalchemy' option.
$ pip install eventsourcing[sqlalchemy]
The library provides a simple application class SimpleApplication
which can be constructed directly.
Its uri
attribute is an SQLAlchemy-style database connection string. An SQLAlchemy thread-scoped session facade will be setup using the uri
value.
uri = 'sqlite:///:memory:'
As you can see, this example is using SQLite to manage an in memory relational database. You can change uri
to any valid connection string.
Here are some example connection strings: for an SQLite file; for a PostgreSQL database; or for a MySQL database. See SQLAlchemy's create_engine() documentation for details. You may need to install drivers for your database management system (such as psycopg2
or mysqlclient
).
sqlite:////tmp/mydatabase
postgresql://scott:tiger@localhost:5432/mydatabase
mysql://scott:tiger@hostname/dbname
Encryption is optionally enabled in SimpleApplication
with a suitable AES key (16, 24, or 32 random bytes encoded as Base64).
from eventsourcing.utils.random import encode_random_bytes
# Keep this safe (random bytes encoded with Base64).
cipher_key = encode_random_bytes(num_bytes=32)
An application object can be constructed with these values as constructor argument. The uri
value can alternatively be set as environment variable DB_URI
. The cipher_key
value can be set as environment variable CIPHER_KEY
.
from eventsourcing.application.simple import SimpleApplication
app = SimpleApplication(
uri='sqlite:///:memory:',
cipher_key=cipher_key
)
Alternatively to using the uri
argument, an already existing SQLAlchemy session can be passed in with the session
argument, for example a session object provided by a framework such as Flask-SQLAlchemy.
Once constructed, the SimpleApplication
will have an event store, provided by the library's EventStore
class, for which it uses the library's infrastructure classes for SQLAlchemy.
app.event_store
The SimpleApplication
uses the library function construct_sqlalchemy_eventstore()
to construct its event store, for integer-sequenced items with SQLAlchemy.
To use different infrastructure for storing events, subclass the SimpleApplication
class and override the method setup_event_store()
. You can read about the available alternatives in the infrastructure layer </topics/infrastructure>
documentation.
The SimpleApplication
also has a persistence policy, provided by the library's PersistencePolicy
class.
app.persistence_policy
The persistence policy appends domain events to its event store whenever they are published.
The SimpleApplication
also has a repository, an instance of the library's EventSourcedRepository
class.
app.repository
Both the repository and persistence policy use the event store.
The aggregate repository is generic, and can retrieve all aggregates in an application, regardless of their class.
The SimpleApplication
can be used as a context manager. The example below uses the AggregateRoot
class directly to create a new aggregate object that is available in the application's repository.
from eventsourcing.domain.model.aggregate import AggregateRoot
with app:
obj = AggregateRoot.__create__()
obj.__change_attribute__(name='a', value=1)
assert obj.a == 1
obj.__save__()
# Check the repository has the latest values.
copy = app.repository[obj.id]
assert copy.a == 1
# Check the aggregate can be discarded.
copy.__discard__()
assert copy.id not in app.repository
# Check optimistic concurrency control is working ok.
from eventsourcing.exceptions import ConcurrencyError
try:
obj.__change_attribute__(name='a', value=2)
obj.__save__()
except ConcurrencyError:
pass
else:
raise Exception("Shouldn't get here")
Because of the unique constraint on the sequenced item table, it isn't possible to branch the evolution of an entity and store two events at the same version. Hence, if the entity you are working on has been updated elsewhere, an attempt to update your object will cause a ConcurrencyError
exception to be raised.
The SimpleApplication
class can be extended.
The example below shows a custom application class MyApplication
that extends SimpleApplication
with application service create_aggregate()
that can create new CustomAggregate
entities.
class MyApplication(SimpleApplication):
def create_aggregate(self, a):
return CustomAggregate.__create__(a=1)
The application code above depends on an entity class called CustomAggregate
, which is defined below. It extends the library's AggregateRoot
entity with an event sourced, mutable attribute a
.
from eventsourcing.domain.model.decorators import attribute
class CustomAggregate(AggregateRoot):
def __init__(self, a, **kwargs):
super(CustomAggregate, self).__init__(**kwargs)
self._a = a
@attribute
def a(self):
"""Mutable attribute a."""
For more sophisticated domain models, please read about the custom entities, commands, and domain events that can be developed using classes from the library's domain model layer </topics/domainmodel>
.
The custom application object can be constructed.
# Construct application object.
app = MyApplication(uri='sqlite:///:memory:')
The application service aggregate factor method create_aggregate()
can be called.
# Create aggregate using application service, and save it.
aggregate = app.create_aggregate(a=1)
aggregate.__save__()
Existing aggregates can be retrieved by ID using the repository's dictionary-like interface.
# Aggregate is in the repository.
assert aggregate.id in app.repository
# Get aggregate using dictionary-like interface.
aggregate = app.repository[aggregate.id]
assert aggregate.a == 1
Changes to the aggregate's attribute a
are visible in the repository once pending events have been published.
# Change attribute value.
aggregate.a = 2
aggregate.a = 3
# Don't forget to save!
aggregate.__save__()
# Retrieve again from repository.
aggregate = app.repository[aggregate.id]
# Check attribute has new value.
assert aggregate.a == 3
The aggregate can be discarded. After being saved, a discarded aggregate will no longer be available in the repository.
# Discard the aggregate.
aggregate.__discard__()
# Check discarded aggregate no longer exists in repository.
assert aggregate.id not in app.repository
Attempts to retrieve an aggregate that does not exist will cause a KeyError
to be raised.
# Fail to get aggregate from dictionary-like interface.
try:
app.repository[aggregate.id]
except KeyError:
pass
else:
raise Exception("Shouldn't get here")
It is always possible to get the domain events for an aggregate, by using the application's event store method get_domain_events()
.
events = app.event_store.get_domain_events(originator_id=aggregate.id)
assert len(events) == 4
assert events[0].originator_id == aggregate.id
assert isinstance(events[0], CustomAggregate.Created)
assert events[0].a == 1
assert events[1].originator_id == aggregate.id
assert isinstance(events[1], CustomAggregate.AttributeChanged)
assert events[1].name == '_a'
assert events[1].value == 2
assert events[2].originator_id == aggregate.id
assert isinstance(events[2], CustomAggregate.AttributeChanged)
assert events[2].name == '_a'
assert events[2].value == 3
assert events[3].originator_id == aggregate.id
assert isinstance(events[3], CustomAggregate.Discarded)
It is also possible to get the sequenced item namedtuples for an aggregate, by using the event store's active record strategy method get_items()
.
items = app.event_store.active_record_strategy.list_items(aggregate.id)
assert len(items) == 4
assert items[0].originator_id == aggregate.id
assert items[0].event_type == 'eventsourcing.domain.model.aggregate#AggregateRoot.Created'
assert '"a":1' in items[0].state, items[0].state
assert '"timestamp":' in items[0].state
assert items[1].originator_id == aggregate.id
assert items[1].event_type == 'eventsourcing.domain.model.aggregate#AggregateRoot.AttributeChanged'
assert '"name":"_a"' in items[1].state
assert '"timestamp":' in items[1].state
assert items[2].originator_id == aggregate.id
assert items[2].event_type == 'eventsourcing.domain.model.aggregate#AggregateRoot.AttributeChanged'
assert '"name":"_a"' in items[2].state
assert '"timestamp":' in items[2].state
assert items[3].originator_id == aggregate.id
assert items[3].event_type == 'eventsourcing.domain.model.aggregate#AggregateRoot.Discarded'
assert '"timestamp":' in items[3].state
In this example, the cipher_key
was not set, so the stored data is visible.
Of course, it is also possible to just use the active record class directly to obtain records. After all, it's just an SQLAlchemy ORM object.
app.event_store.active_record_strategy.active_record_class
The query
property of the SQLAlchemy active record strategy is a convenient way to get a query object for the active record class from the session.
active_records = app.event_store.active_record_strategy.query.all()
assert len(active_records) == 4
If the application isn't being used as a context manager, then it is useful to unsubscribe any handlers subscribed by the policies (avoids dangling handlers being called inappropriately, if the process isn't going to terminate immediately, such as when this documentation is tested as part of the library's test suite).
# Clean up.
app.close()