Skip to content

Commit

Permalink
Merge pull request #192 from johnbywater/feature/tuple_transcoding
Browse files Browse the repository at this point in the history
Added support for encoding tuples (and named tuples) as such.
  • Loading branch information
johnbywater committed Nov 9, 2019
2 parents 9b6f301 + 6413372 commit 02029f3
Show file tree
Hide file tree
Showing 17 changed files with 658 additions and 253 deletions.
11 changes: 5 additions & 6 deletions docs/topics/notifications.rst
Original file line number Diff line number Diff line change
Expand Up @@ -962,10 +962,9 @@ The library class :class:`~eventsourcing.interface.notificationlog.NotificationL
presents sections from a local notification log, and can be used to implement a Web API.

The :class:`~eventsourcing.interface.notificationlog.NotificationLogView`
class is constructed with a local ``notification_log`` object and an optional
``json_encoder_class`` (which defaults to the library's
:class:`~eventsourcing.utils.transcoding.ObjectJSONEncoder` class, used explicitly
in the example below).
class is constructed with a local ``notification_log`` object and a
``json_encoder`` (for example an instance of the library's
:class:`~eventsourcing.utils.transcoding.ObjectJSONEncoder` class).

The example below uses the record notification log, constructed above.

Expand All @@ -978,12 +977,12 @@ The example below uses the record notification log, constructed above.
view = NotificationLogView(
notification_log=notification_log,
json_encoder_class=ObjectJSONEncoder
json_encoder=ObjectJSONEncoder()
)
section_json, is_archived = view.present_section('1,5')
section_dict = json.loads(section_json, cls=ObjectJSONDecoder)
section_dict = ObjectJSONDecoder().decode(section_json)
assert section_dict['section_id'] == '1,5'
assert section_dict['next_id'] == '6,10'
Expand Down
7 changes: 4 additions & 3 deletions eventsourcing/application/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from eventsourcing.exceptions import CausalDependencyFailed, PromptFailed
from eventsourcing.infrastructure.base import ACIDRecordManager
from eventsourcing.infrastructure.eventsourcedrepository import EventSourcedRepository
from eventsourcing.utils.transcoding import json_dumps, json_loads


class ProcessEvent(object):
Expand Down Expand Up @@ -151,7 +150,8 @@ def run(self, prompt=None, advance_by=None):
causal_dependencies = (
notification.get("causal_dependencies") or "[]"
)
causal_dependencies = json_loads(causal_dependencies) or []
causal_dependencies = self.event_store.mapper.json_loads(
causal_dependencies) or []

# Check causal dependencies are satisfied.
for causal_dependency in causal_dependencies:
Expand Down Expand Up @@ -454,7 +454,8 @@ def construct_event_records(self, pending_events, causal_dependencies=None):
assert hasattr(
self.event_store.record_manager.record_class, "causal_dependencies"
)
causal_dependencies = json_dumps(causal_dependencies)
causal_dependencies = self.event_store.mapper.json_dumps(
causal_dependencies)
# Only need first event to carry the dependencies.
event_records[0].causal_dependencies = causal_dependencies

Expand Down
3 changes: 2 additions & 1 deletion eventsourcing/application/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,8 @@ def loop_on_prompts(self):
# Loop on getting prompts.
while True:
try:
# Todo: Make the poll interval gradually increase if there are only timeouts?
# Todo: Make the poll interval gradually
# increase if there are only timeouts?
prompt = self.inbox.get(timeout=self.poll_interval)

except Empty:
Expand Down
6 changes: 3 additions & 3 deletions eventsourcing/domain/model/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from eventsourcing.utils.hashing import hash_object
from eventsourcing.utils.times import decimaltimestamp
from eventsourcing.utils.topic import get_topic
from eventsourcing.utils.transcoding import ObjectJSONEncoder
from eventsourcing.utils.transcoding import JSON_SEPARATORS, ObjectJSONEncoder

GENESIS_HASH = os.getenv("GENESIS_HASH", "")

Expand All @@ -26,7 +26,7 @@ class DomainEvent(object):
of the state of the event.
"""

__json_encoder_class__ = ObjectJSONEncoder
__json_encoder__ = ObjectJSONEncoder(separators=JSON_SEPARATORS)
__notifiable__ = True

def __init__(self, **kwargs):
Expand Down Expand Up @@ -128,7 +128,7 @@ def __hash_object__(cls, obj):
:return: SHA-256 as hexadecimal string.
:rtype: str
"""
return hash_object(cls.__json_encoder_class__, obj)
return hash_object(cls.__json_encoder__, obj)


class EventWithHash(DomainEvent):
Expand Down
3 changes: 2 additions & 1 deletion eventsourcing/infrastructure/eventstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ def all_domain_events(self):
"""


# Todo: Unify iterators in EventStore and in NotificationLog, by pushing behaviour down to record manager?
# Todo: Unify iterators in EventStore and in NotificationLog,
# by pushing behaviour down to record manager?


class EventStore(AbstractEventStore):
Expand Down
38 changes: 31 additions & 7 deletions eventsourcing/infrastructure/sequenceditemmapper.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import unicode_literals

from abc import ABC, abstractmethod
from json import JSONDecodeError

from eventsourcing.infrastructure.sequenceditem import (
SequencedItem,
Expand All @@ -10,9 +11,7 @@
from eventsourcing.utils.transcoding import (
ObjectJSONDecoder,
ObjectJSONEncoder,
json_dumps,
json_loads,
)
JSON_SEPARATORS)


class AbstractSequencedItemMapper(ABC):
Expand All @@ -28,6 +27,18 @@ def event_from_item(self, sequenced_item):
Constructs and returns a domain event for given sequenced item.
"""

@abstractmethod
def json_dumps(self, event_attrs):
"""
Encodes given object as JSON.
"""

@abstractmethod
def json_loads(self, event_attrs):
"""
Decodes given JSON as object.
"""


class SequencedItemMapper(AbstractSequencedItemMapper):
"""
Expand All @@ -46,7 +57,10 @@ def __init__(
):
self.sequenced_item_class = sequenced_item_class
self.json_encoder_class = json_encoder_class or ObjectJSONEncoder
self.json_encoder = self.json_encoder_class(separators=JSON_SEPARATORS,
sort_keys=True)
self.json_decoder_class = json_decoder_class or ObjectJSONDecoder
self.json_decoder = self.json_decoder_class()
self.cipher = cipher
self.field_names = SequencedItemFieldNames(self.sequenced_item_class)
self.sequence_id_attr_name = (
Expand Down Expand Up @@ -91,14 +105,17 @@ def get_item_topic_and_state(self, domain_event_class, event_attrs):
topic = get_topic(domain_event_class)

# Serialise the event attributes.
state = json_dumps(event_attrs, cls=self.json_encoder_class)
state = self.json_dumps(event_attrs)

# Encrypt serialised state.
# Compress and encrypt serialised state.
if self.cipher:
state = self.cipher.encrypt(state)

return topic, state

def json_dumps(self, event_attrs):
return self.json_encoder.encode(event_attrs)

def construct_sequenced_item(self, item_args):
return self.sequenced_item_class(*item_args)

Expand Down Expand Up @@ -128,14 +145,21 @@ def get_event_class_and_attrs(self, topic, state):
# Resolve topic to event class.
domain_event_class = resolve_topic(topic)

# Decrypt state.
# Decrypt and decompress state.
if self.cipher:
state = self.cipher.decrypt(state)

# Deserialize data.
event_attrs = json_loads(state, cls=self.json_decoder_class)
event_attrs = self.json_loads(state)
return domain_event_class, event_attrs

def json_loads(self, state):
try:
return self.json_decoder.decode(state)
except JSONDecodeError:
raise ValueError("Couldn't load JSON string: {}".format(s))



def reconstruct_object(obj_class, obj_state):
obj = object.__new__(obj_class)
Expand Down
36 changes: 17 additions & 19 deletions eventsourcing/interface/notificationlog.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,19 @@
from __future__ import absolute_import, division, print_function, unicode_literals

import json
from json import JSONDecodeError

import requests

from eventsourcing.application.notificationlog import (
Section,
AbstractNotificationLog,
LocalNotificationLog,
)
from eventsourcing.utils.transcoding import (
ObjectJSONDecoder,
ObjectJSONEncoder,
json_dumps,
)
Section,
RecordManagerNotificationLog)
from eventsourcing.utils.transcoding import ObjectJSONDecoder


# These classes are imported here only to avoid breaking backwards compatibility.
# Todo: Remove this import statement >= v8.x
from eventsourcing.application.notificationlog import (
RecordManagerNotificationLog,
BigArrayNotificationLog,
)


class RemoteNotificationLog(AbstractNotificationLog):
Expand All @@ -36,10 +28,17 @@ def __init__(self, base_url, json_decoder_class=None):
Initialises remote notification log object.
:param str base_url: A URL for the HTTP API.
:param JSONDecoder json_decoder_class: JSON decoder class used to decode remote sections.
:param JSONDecoder json_decoder_class: used to deserialize remote sections.
"""
self.base_url = base_url
self.json_decoder_class = json_decoder_class
json_decoder_class = json_decoder_class or ObjectJSONDecoder
self.json_decoder = json_decoder_class()

def json_loads(self, value: str):
try:
return self.json_decoder.decode(value)
except JSONDecodeError:
raise ValueError("Couldn't load JSON string: {}".format(value))

def __getitem__(self, section_id):
"""
Expand All @@ -54,8 +53,7 @@ def __getitem__(self, section_id):

def deserialize_section(self, section_json):
try:
decoder_class = self.json_decoder_class or ObjectJSONDecoder
section = Section(**json.loads(section_json, cls=decoder_class))
section = Section(**self.json_loads(section_json))
except ValueError as e:
raise ValueError(
"Couldn't deserialize notification log section: "
Expand Down Expand Up @@ -85,7 +83,7 @@ class NotificationLogView(object):
remotely, for example by a RemoteNotificationLog.
"""

def __init__(self, notification_log: LocalNotificationLog, json_encoder_class=None):
def __init__(self, notification_log: RecordManagerNotificationLog, json_encoder):
"""
Initialises notification log view object.
Expand All @@ -96,7 +94,7 @@ def __init__(self, notification_log: LocalNotificationLog, json_encoder_class=No
notification_log
)
self.notification_log = notification_log
self.json_encoder_class = json_encoder_class or ObjectJSONEncoder
self.json_encoder = json_encoder

def present_section(self, section_id):
"""
Expand All @@ -110,5 +108,5 @@ def present_section(self, section_id):
"""
section = self.notification_log[section_id]
is_archived = bool(section.next_id)
section_json = json_dumps(section.__dict__, self.json_encoder_class)
section_json = self.json_encoder.encode(section.__dict__)
return section_json, is_archived
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
clear_event_handlers,
)
from eventsourcing.tests.base import notquick
from eventsourcing.tests.test_system_fixtures import set_db_uri
from eventsourcing.tests.system_test_fixtures import set_db_uri


class TestPaxosSystem(unittest.TestCase):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import os
from uuid import uuid5, NAMESPACE_OID

from eventsourcing.application.process import ProcessApplication
from eventsourcing.domain.model.aggregate import BaseAggregateRoot
Expand Down Expand Up @@ -62,7 +63,14 @@ class Created(Event, BaseAggregateRoot.Created):

@classmethod
def create(cls, order_id):
return cls.__create__(order_id=order_id)
return cls.__create__(
originator_id=Reservation.create_reservation_id(order_id),
order_id=order_id
)

@classmethod
def create_reservation_id(cls, order_id):
return uuid5(NAMESPACE_OID, str(order_id))


class Payment(BaseAggregateRoot):
Expand Down Expand Up @@ -114,6 +122,7 @@ def policy(repository, event):


class Reservations(ProcessApplication):

@staticmethod
def policy(repository, event):
if isinstance(event, Order.Created):
Expand Down
2 changes: 1 addition & 1 deletion eventsourcing/tests/test_actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
assert_event_handlers_empty,
clear_event_handlers,
)
from eventsourcing.tests.test_system_fixtures import (
from eventsourcing.tests.system_test_fixtures import (
Orders,
Payments,
Reservations,
Expand Down
4 changes: 3 additions & 1 deletion eventsourcing/tests/test_notificationlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
SQLAlchemyRecordManagerTestCase,
)
from eventsourcing.utils.topic import get_topic
from eventsourcing.utils.transcoding import ObjectJSONEncoder, JSON_SEPARATORS


class NotificationLogTestCase(SQLAlchemyRecordManagerTestCase, WithEventPersistence):
Expand Down Expand Up @@ -283,7 +284,8 @@ def simple_app(environ, start_response):
notification_log = self.create_notification_log(section_size)

# Get serialized section.
view = NotificationLogView(notification_log)
json_encoder = ObjectJSONEncoder(separators=JSON_SEPARATORS)
view = NotificationLogView(notification_log, json_encoder)
section, is_archived = view.present_section(section_id)
# Todo: Maybe redirect if the section ID is a mismatch, so
# the URL is good for cacheing.
Expand Down
4 changes: 2 additions & 2 deletions eventsourcing/tests/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from eventsourcing.exceptions import CausalDependencyFailed, PromptFailed
from eventsourcing.infrastructure.sqlalchemy.records import Base
from eventsourcing.utils.topic import resolve_topic
from eventsourcing.utils.transcoding import json_loads
from eventsourcing.utils.transcoding import ObjectJSONDecoder


class TestProcessApplication(TestCase):
Expand Down Expand Up @@ -164,7 +164,7 @@ def test_causal_dependencies(self):

# - the second 'Created' event depends on the Created event in another pipeline.
expect = [{"notification_id": 1, "pipeline_id": pipeline_id1}]
actual = json_loads(second_entity_records[0].causal_dependencies)
actual = ObjectJSONDecoder().decode(second_entity_records[0].causal_dependencies)

self.assertEqual(expect, actual)

Expand Down
Loading

0 comments on commit 02029f3

Please sign in to comment.