Skip to content

Commit

Permalink
Merge pull request #126 from johnbywater/feature/timestamps_as_decimals
Browse files Browse the repository at this point in the history
Feature/timestamps as decimals
  • Loading branch information
johnbywater committed Dec 8, 2017
2 parents 674ed6e + da32a6b commit 3cdd4e3
Show file tree
Hide file tree
Showing 40 changed files with 435 additions and 324 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ os.environ['AES_CIPHER_KEY'] = aes_cipher_key

# SQLAlchemy-style database connection string.
os.environ['DB_URI'] = 'sqlite:///:memory:'
# os.environ['DB_URI'] = 'mysql://username:password@localhost/eventsourcing'
# os.environ['DB_URI'] = 'postgresql://username:password@localhost:5432/eventsourcing'


```

Run the code.
Expand Down
2 changes: 1 addition & 1 deletion docs/ref/modules.rst
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ The utils package contains common functions that are used in more than one layer
time
----

.. automodule:: eventsourcing.utils.time
.. automodule:: eventsourcing.utils.times
:members:
:show-inheritance:
:undoc-members:
Expand Down
3 changes: 2 additions & 1 deletion docs/topics/domainmodel.rst
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,11 @@ Timestamps can be used to sequence events.
from eventsourcing.domain.model.events import EventWithTimestamp
from eventsourcing.domain.model.events import EventWithTimeuuid
from decimal import Decimal
from uuid import UUID
# Automatic timestamp.
assert isinstance(EventWithTimestamp().timestamp, float)
assert isinstance(EventWithTimestamp().timestamp, Decimal)
# Automatic UUIDv1.
assert isinstance(EventWithTimeuuid().event_id, UUID)
Expand Down
8 changes: 5 additions & 3 deletions docs/topics/examples/deployment.rst
Original file line number Diff line number Diff line change
Expand Up @@ -295,11 +295,13 @@ object that is scoped to the request.
class IntegerSequencedItem(db.Model):
__tablename__ = 'integer_sequenced_items'
id = Column(BigInteger().with_variant(Integer, "sqlite"), primary_key=True)
# Sequence ID (e.g. an entity or aggregate ID).
sequence_id = db.Column(UUIDType(), primary_key=True)
sequence_id = db.Column(UUIDType(), nullable=False)
# Position (index) of item in sequence.
position = db.Column(db.BigInteger(), primary_key=True)
position = db.Column(db.BigInteger(), nullable=False)
# Topic of the item (e.g. path to domain event class).
topic = db.Column(db.String(255))
Expand All @@ -308,7 +310,7 @@ object that is scoped to the request.
data = db.Column(db.Text())
# Index.
__table_args__ = db.Index('index', 'sequence_id', 'position'),
__table_args__ = db.Index('index', 'sequence_id', 'position', unique=True),
# Construct eventsourcing application with db table and session.
Expand Down
8 changes: 5 additions & 3 deletions docs/topics/examples/example_application.rst
Original file line number Diff line number Diff line change
Expand Up @@ -367,19 +367,21 @@ with each item positioned in its sequence by an integer index number.
class SequencedItemRecord(ActiveRecord):
__tablename__ = 'sequenced_items'
id = Column(BigInteger().with_variant(Integer, "sqlite"), primary_key=True)
# Sequence ID (e.g. an entity or aggregate ID).
sequence_id = Column(UUIDType(), primary_key=True)
sequence_id = Column(UUIDType(), nullable=False)
# Position (index) of item in sequence.
position = Column(BigInteger(), primary_key=True)
position = Column(BigInteger(), nullable=False)
# Topic of the item (e.g. path to domain event class).
topic = Column(String(255))
# State of the item (serialized dict, possibly encrypted).
data = Column(Text())
__table_args__ = Index('index', 'sequence_id', 'position'),
__table_args__ = Index('index', 'sequence_id', 'position', unique=True),
Expand Down
8 changes: 5 additions & 3 deletions docs/topics/examples/schema.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,21 @@ Then define a suitable active record class.
class StoredEventRecord(Base):
__tablename__ = 'stored_events'
id = Column(BigInteger().with_variant(Integer, "sqlite"), primary_key=True)
# Sequence ID (e.g. an entity or aggregate ID).
aggregate_id = Column(UUIDType(), primary_key=True)
aggregate_id = Column(UUIDType(), nullable=False)
# Position (timestamp) of item in sequence.
aggregate_version = Column(BigInteger(), primary_key=True)
aggregate_version = Column(BigInteger(), nullable=False)
# Type of the event (class name).
event_type = Column(String(100))
# State of the item (serialized dict, possibly encrypted).
state = Column(Text())
__table_args__ = Index('index', 'aggregate_id', 'aggregate_version'),
__table_args__ = Index('index', 'aggregate_id', 'aggregate_version', unique=True),
Expand Down
77 changes: 49 additions & 28 deletions docs/topics/infrastructure.rst
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,39 @@ Since by now only one item was stored, so there is only one item in the results.
assert len(results) == 1
assert results[0] == stored_event1
MySQL
~~~~~

For MySQL, the Python package `mysqlclient <https://pypi.python.org/pypi/mysqlclient>`__
can be used.

.. code::
$ pip install mysqlclient
The ``uri`` for MySQL would look something like this.

.. code::
mysql://username:password@localhost/eventsourcing
PostgreSQL
~~~~~~~~~~

For PostgreSQL, the Python package `psycopg2 <https://pypi.python.org/pypi/psycopg2>`__
can be used.

.. code::
$ pip install psycopg2
The ``uri`` for PostgreSQL would look something like this.

.. code::
postgresql://username:password@localhost:5432/eventsourcing
Apache Cassandra
----------------
Expand Down Expand Up @@ -335,8 +368,6 @@ The method ``from_sequenced_item()`` can be used to convert sequenced item objec
domain_event = sequenced_item_mapper.from_sequenced_item(sequenced_item1)
assert domain_event.sequence_id == sequence1
assert domain_event.position == 0
assert domain_event.foo == 'bar'
Expand All @@ -345,7 +376,7 @@ The method ``to_sequenced_item()`` can be used to convert application-level obje

.. code:: python
assert sequenced_item_mapper.to_sequenced_item(domain_event) == sequenced_item1
assert sequenced_item_mapper.to_sequenced_item(domain_event).data == sequenced_item1.data
If the names of the first two fields of the sequenced item namedtuple (e.g. ``sequence_id`` and ``position``) do not
Expand All @@ -356,17 +387,23 @@ using constructor args ``sequence_id_attr_name`` and ``position_attr_name``.

.. code:: python
from eventsourcing.domain.model.events import DomainEvent
domain_event1 = DomainEvent(
originator_id=aggregate1,
originator_version=1,
foo='baz',
)
sequenced_item_mapper = SequencedItemMapper(
sequence_id_attr_name='originator_id',
position_attr_name='originator_version'
)
domain_event1 = sequenced_item_mapper.from_sequenced_item(sequenced_item1)
assert domain_event1.foo == 'bar', domain_event1
assert domain_event1.originator_id == sequence1
assert domain_event1.originator_version == 0
assert sequenced_item_mapper.to_sequenced_item(domain_event1) == sequenced_item1
assert domain_event1.foo == 'baz'
assert sequenced_item_mapper.to_sequenced_item(domain_event1).sequence_id == aggregate1
Alternatively, the constructor arg ``sequenced_item_class`` can be set with a sequenced item namedtuple type that is
Expand All @@ -382,8 +419,6 @@ different from the default ``SequencedItem`` namedtuple, such as the library's `
domain_event1 = sequenced_item_mapper.from_sequenced_item(stored_event1)
assert domain_event1.foo == 'bar', domain_event1
assert domain_event1.originator_id == aggregate1
assert sequenced_item_mapper.to_sequenced_item(domain_event1) == stored_event1
Since the alternative ``StoredEvent`` namedtuple can be used instead of the default
Expand Down Expand Up @@ -501,7 +536,7 @@ function ``decode_random_bytes()`` decodes the unicode key string into a sequenc
cipher = AESCipher(aes_key=decode_random_bytes(cipher_key))
# Encrypt some plaintext (using nonce arguments).
ciphertext = cipher.encrypt('plaintext', nonce_args=('sequence3', 'item12'))
ciphertext = cipher.encrypt('plaintext')
assert ciphertext != 'plaintext'
# Decrypt some ciphertext.
Expand Down Expand Up @@ -568,11 +603,8 @@ The event store's ``append()`` method can append a domain event to its sequence.

In the code below, a ``DomainEvent`` is appended to sequence ``aggregate1`` at position ``1``.


.. code:: python
from eventsourcing.domain.model.events import DomainEvent
event_store.append(
DomainEvent(
originator_id=aggregate1,
Expand All @@ -599,10 +631,7 @@ Since by now two domain events have been stored, so there are two domain events
assert len(results) == 2
assert results[0].originator_id == aggregate1
assert results[0].foo == 'bar'
assert results[1].originator_id == aggregate1
assert results[1].foo == 'baz'
Expand All @@ -627,29 +656,21 @@ order of the results. Hence, it can affect both the content of the results and t
# Get events below and at position 0.
result = event_store.get_domain_events(aggregate1, lte=0)
assert len(result) == 1, result
assert result[0].originator_id == aggregate1
assert result[0].originator_version == 0
assert result[0].foo == 'bar'
# Get events at and above position 1.
result = event_store.get_domain_events(aggregate1, gte=1)
assert len(result) == 1, result
assert result[0].originator_id == aggregate1
assert result[0].originator_version == 1
assert result[0].foo == 'baz'
# Get the first event in the sequence.
result = event_store.get_domain_events(aggregate1, limit=1)
assert len(result) == 1, result
assert result[0].originator_id == aggregate1
assert result[0].originator_version == 0
assert result[0].foo == 'bar'
# Get the last event in the sequence.
result = event_store.get_domain_events(aggregate1, limit=1, is_ascending=False)
assert len(result) == 1, result
assert result[0].originator_id == aggregate1
assert result[0].originator_version == 1
assert result[0].foo == 'baz'
Expand Down Expand Up @@ -737,10 +758,10 @@ record class ``TimestampedSequencedItemRecord``.

.. code:: python
import time
from uuid import uuid4
from eventsourcing.infrastructure.sqlalchemy.activerecords import TimestampSequencedItemRecord
from eventsourcing.utils.times import decimaltimestamp
# Setup database table for timestamped sequenced items.
datastore.setup_table(TimestampSequencedItemRecord)
Expand All @@ -758,7 +779,7 @@ record class ``TimestampedSequencedItemRecord``.
aggregate_id = uuid4()
event = DomainEvent(
originator_id=aggregate_id,
timestamp=time.time(),
timestamp=decimaltimestamp(),
)
# Store the event.
Expand All @@ -768,7 +789,7 @@ record class ``TimestampedSequencedItemRecord``.
events = timestamped_event_store.get_domain_events(aggregate_id)
assert len(events) == 1
assert events[0].originator_id == aggregate_id
assert events[0].timestamp < time.time()
assert events[0].timestamp < decimaltimestamp()
Please note, optimistic concurrent control doesn't work to maintain entity consistency, because each
Expand Down
12 changes: 9 additions & 3 deletions docs/topics/release_notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,16 @@ have been introduced since the previous major version.
Version 4.x series was released after quite a lot of refactoring made
things backward-incompatible. Object namespaces for entity and event
classes was cleaned up, by moving library names to double-underscore
prefixed and postfixed names. Data integrity feature was introduced.
prefixed and postfixed names. Domain events can be hashed, and also
hash-chained together, allowing entity state to be verified.
Created events were changed to have originator_topic, which allowed
other things to be greatly simplified. Mutators for entity are now by
default expected to be implemented on entity event classes.
other things such as mutators and repositories to be greatly
simplified. Mutators are now by default expected to be implemented
on entity event classes. Event timestamps were changed from floats
to decimal objects, an exact number type. Cipher was changed to use
AES-GCM to allow authentication of encrypted data returned by database.
Documentation was improved, in particular with pages for each of the
layers in the library (infrastructure, domain model, application).

Version 3.x series was a released after quite of a lot of refactoring
made things backwards-incompatible.
Expand Down
7 changes: 4 additions & 3 deletions eventsourcing/domain/model/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
EventWithOriginatorVersion, EventWithTimestamp, GENESIS_HASH, QualnameABC, publish
from eventsourcing.exceptions import EntityIsDiscarded, HeadHashError, OriginatorIDError, \
OriginatorVersionError
from eventsourcing.utils.time import timestamp_from_uuid
from eventsourcing.utils.times import decimaltimestamp_from_uuid
from eventsourcing.utils.topic import get_topic, resolve_topic


Expand Down Expand Up @@ -40,6 +40,7 @@ class Event(EventWithOriginatorID, DomainEvent):
"""
Supertype for events of domain entities.
"""
__with_data_integrity__ = True

def __init__(self, **kwargs):
super(DomainEntity.Event, self).__init__(**kwargs)
Expand Down Expand Up @@ -316,11 +317,11 @@ def __init__(self, event_id, **kwargs):

@property
def __created_on__(self):
return timestamp_from_uuid(self.___initial_event_id__)
return decimaltimestamp_from_uuid(self.___initial_event_id__)

@property
def __last_modified__(self):
return timestamp_from_uuid(self.___last_event_id__)
return decimaltimestamp_from_uuid(self.___last_event_id__)


class TimestampedVersionedEntity(TimestampedEntity, VersionedEntity):
Expand Down

0 comments on commit 3cdd4e3

Please sign in to comment.