Skip to content

Commit

Permalink
Changed to allow json_encoder_cls and json_decoder_cls to be passed a…
Browse files Browse the repository at this point in the history
…s application constructor parameters, to support injection of custom JSON encoder and decoder (used to serialize and deserialize stored domain events).
  • Loading branch information
johnbywater committed Dec 27, 2015
1 parent 1326083 commit 7f9dd66
Show file tree
Hide file tree
Showing 12 changed files with 45 additions and 40 deletions.
7 changes: 4 additions & 3 deletions eventsourcing/application/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@

class EventSourcingApplication(with_metaclass(ABCMeta)):

def __init__(self, *args, **kwargs):
self.stored_event_repo = self.create_stored_event_repo()
def __init__(self, json_encoder_cls=None, json_decoder_cls=None):
self.stored_event_repo = self.create_stored_event_repo(json_encoder_cls=json_encoder_cls,
json_decoder_cls=json_decoder_cls)
self.event_store = self.create_event_store()
self.persistence_subscriber = self.create_persistence_subscriber()

@abstractmethod
def create_stored_event_repo(self):
def create_stored_event_repo(self, **kwargs):
raise NotImplementedError()

def create_event_store(self):
Expand Down
4 changes: 2 additions & 2 deletions eventsourcing/application/example/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ class ExampleApplication(EventSourcingApplication):
It doesn't have a stored event repository.
"""
def __init__(self):
super(ExampleApplication, self).__init__()
def __init__(self, **kwargs):
super(ExampleApplication, self).__init__(**kwargs)
self.example_repo = ExampleRepository(event_store=self.event_store)

def register_new_example(self, a, b):
Expand Down
4 changes: 2 additions & 2 deletions eventsourcing/application/with_cassandra.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ def __init__(self, hosts=('localhost',), consistency='QUORUM', default_keyspace=
def setup_cassandra_connection(*args):
setup_cassandra_connection(*get_cassandra_setup_params(*args))

def create_stored_event_repo(self):
return CassandraStoredEventRepository()
def create_stored_event_repo(self, **kwargs):
return CassandraStoredEventRepository(**kwargs)

def close(self):
super(EventSourcingWithCassandra, self).close()
Expand Down
2 changes: 1 addition & 1 deletion eventsourcing/application/with_pythonobjects.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@

class EventSourcingWithPythonObjects(EventSourcingApplication):

def create_stored_event_repo(self):
def create_stored_event_repo(self, **kwargs):
return PythonObjectsStoredEventRepository()
4 changes: 2 additions & 2 deletions eventsourcing/application/with_sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ def __init__(self, db_session=None, db_uri=None, **kwargs):
def create_db_session(uri):
return get_scoped_session_facade(uri)

def create_stored_event_repo(self):
return SQLAlchemyStoredEventRepository(db_session=self.db_session)
def create_stored_event_repo(self, **kwargs):
return SQLAlchemyStoredEventRepository(db_session=self.db_session, **kwargs)

def close(self):
super(EventSourcingWithSQLAlchemy, self).close()
Expand Down
18 changes: 15 additions & 3 deletions eventsourcing/infrastructure/stored_events/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ class StoredEventRepository(with_metaclass(ABCMeta)):
serialize_without_json = False
serialize_with_uuid1 = False

def __init__(self, json_encoder_cls=None, json_decoder_cls=None):
self.json_encoder_cls = json_encoder_cls
self.json_decoder_cls = json_decoder_cls

@abstractmethod
def append(self, stored_event):
"""Saves given stored event in this repository.
Expand Down Expand Up @@ -43,11 +47,19 @@ def serialize(self, domain_event):
:type domain_event: object
:param domain_event:
"""
return serialize_domain_event(domain_event, without_json=self.serialize_without_json,
with_uuid1=self.serialize_with_uuid1)
return serialize_domain_event(
domain_event,
json_encoder_cls=self.json_encoder_cls,
without_json=self.serialize_without_json,
with_uuid1=self.serialize_with_uuid1
)

def deserialize(self, stored_event):
"""Returns a domain event from a stored event.
:type stored_event: object
"""
return deserialize_domain_event(stored_event, without_json=self.serialize_without_json)
return deserialize_domain_event(
stored_event,
json_decoder_cls=self.json_decoder_cls,
without_json=self.serialize_without_json
)
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from eventsourcing.infrastructure.stored_events.base import StoredEventRepository
from eventsourcing.infrastructure.stored_events.transcoders import StoredEvent


class SharedMemoryStoredEventRepository(StoredEventRepository):

serialize_without_json = True

def __init__(self):
super(SharedMemoryStoredEventRepository, self).__init__()
def __init__(self, **kwargs):
super(SharedMemoryStoredEventRepository, self).__init__(**kwargs)
self._by_id = {}
self._by_stored_entity_id = {}
self._by_topic = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ def sql_from_stored(stored_event):

class SQLAlchemyStoredEventRepository(StoredEventRepository):

def __init__(self, db_session):
def __init__(self, db_session, **kwargs):
super(SQLAlchemyStoredEventRepository, self).__init__(**kwargs)
assert isinstance(db_session, ScopedSession)
self.db_session = db_session

Expand Down
12 changes: 4 additions & 8 deletions eventsourcing/infrastructure/stored_events/transcoders.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from eventsourcing.exceptions import TopicResolutionError


def serialize_domain_event(domain_event, without_json=False, with_uuid1=False):
def serialize_domain_event(domain_event, json_encoder_cls=None, without_json=False, with_uuid1=False):
"""
Serializes a domain event into a stored event.
"""
Expand All @@ -25,7 +25,7 @@ def serialize_domain_event(domain_event, without_json=False, with_uuid1=False):
if without_json:
event_attrs = domain_event
else:
event_attrs = json.dumps(domain_event.__dict__, separators=(',', ':'), sort_keys=True, cls=ObjectJSONEncoder)
event_attrs = json.dumps(domain_event.__dict__, separators=(',', ':'), sort_keys=True, cls=json_encoder_cls)
return StoredEvent(
event_id=event_id,
stored_entity_id=stored_entity_id,
Expand All @@ -34,7 +34,7 @@ def serialize_domain_event(domain_event, without_json=False, with_uuid1=False):
)


def deserialize_domain_event(stored_event, without_json=False):
def deserialize_domain_event(stored_event, json_decoder_cls=None, without_json=False):
"""
Recreates original domain event from stored event topic and event attrs.
"""
Expand All @@ -43,7 +43,7 @@ def deserialize_domain_event(stored_event, without_json=False):
domain_event = stored_event.event_attrs
else:
event_class = resolve_event_topic(stored_event.event_topic)
event_data = json.loads(stored_event.event_attrs, cls=ObjectJSONDecoder)
event_data = json.loads(stored_event.event_attrs, cls=json_decoder_cls)
try:
domain_event = event_class(**event_data)
except TypeError:
Expand Down Expand Up @@ -135,12 +135,10 @@ def default(self, obj):
if isinstance(obj, datetime.date):
return { 'ISO8601_date': obj.isoformat() }
if numpy is not None and isinstance(obj, numpy.ndarray) and obj.ndim == 1:

memfile = BytesIO()
numpy.save(memfile, obj)
memfile.seek(0)
serialized = json.dumps(memfile.read().decode('latin-1'))

d = {
'__ndarray__': serialized,
}
Expand All @@ -150,8 +148,6 @@ def default(self, obj):
'__class__': obj.__class__.__qualname__,
'__module__': obj.__module__,
}
# for attr, value in obj.__dict__.items():
# d[attr] = self.default(value)
return d


Expand Down
4 changes: 3 additions & 1 deletion eventsourcingtests/test_application_with_cassandra.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
from eventsourcing.application.example.with_cassandra import ExampleApplicationWithCassandra
from eventsourcing.application.with_cassandra import DEFAULT_CASSANDRA_KEYSPACE
from eventsourcing.infrastructure.stored_events.cassandra_stored_events import create_cassandra_keyspace_and_tables
from eventsourcing.infrastructure.stored_events.transcoders import ObjectJSONEncoder, ObjectJSONDecoder
from eventsourcingtests.example_application_testcase import ExampleApplicationTestCase


class TestApplicationWithCassandra(ExampleApplicationTestCase):

def test_application_with_cassandra(self):
# Setup the example application, use it as a context manager.
with ExampleApplicationWithCassandra() as app:
with ExampleApplicationWithCassandra(json_encoder_cls=ObjectJSONEncoder,
json_decoder_cls=ObjectJSONDecoder) as app:
create_cassandra_keyspace_and_tables(DEFAULT_CASSANDRA_KEYSPACE)
try:
self.assert_is_example_application(app)
Expand Down
12 changes: 2 additions & 10 deletions eventsourcingtests/test_application_with_sqlalchemy.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,11 @@
import unittest

from sqlalchemy.orm.scoping import ScopedSession

from eventsourcing.application.example.with_sqlalchemy import ExampleApplicationWithSQLAlchemy
from eventsourcing.domain.model.example import Example
from eventsourcing.infrastructure.event_sourced_repos.example_repo import ExampleRepository
from eventsourcing.infrastructure.event_store import EventStore
from eventsourcing.infrastructure.persistence_subscriber import PersistenceSubscriber
from eventsourcing.infrastructure.stored_events.base import StoredEventRepository
from eventsourcing.infrastructure.stored_events.transcoders import ObjectJSONEncoder, ObjectJSONDecoder
from eventsourcingtests.example_application_testcase import ExampleApplicationTestCase


class TestApplicationWithSQLAlchemy(ExampleApplicationTestCase):

def test_application_with_sqlalchemy(self):
# Setup the example application, use it as a context manager.
with ExampleApplicationWithSQLAlchemy(db_uri='sqlite:///:memory:') as app:
with ExampleApplicationWithSQLAlchemy(db_uri='sqlite:///:memory:', json_encoder_cls=ObjectJSONEncoder, json_decoder_cls=ObjectJSONDecoder) as app:
self.assert_is_example_application(app)
10 changes: 5 additions & 5 deletions eventsourcingtests/test_stored_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from eventsourcing.domain.model.example import Example
from eventsourcing.exceptions import TopicResolutionError
from eventsourcing.infrastructure.stored_events.transcoders import serialize_domain_event, deserialize_domain_event, \
resolve_event_topic, StoredEvent
resolve_event_topic, StoredEvent, ObjectJSONDecoder, ObjectJSONEncoder
from eventsourcing.utils.time import utc_timezone


Expand All @@ -17,7 +17,7 @@ def test_serialize_domain_event(self):
datetime_now_tzaware = datetime.datetime(2015, 9, 8, 16, 20, 50, 577429, tzinfo=utc_timezone)
date_now = datetime.date(2015, 9, 8)
event1 = DomainEvent(a=1, b=2, c=datetime_now, d=datetime_now_tzaware, e=date_now, entity_version=0, entity_id='entity1', timestamp=3)
stored_event = serialize_domain_event(event1)
stored_event = serialize_domain_event(event1, json_encoder_cls=ObjectJSONEncoder)
self.assertEqual('DomainEvent::entity1', stored_event.stored_entity_id)
self.assertEqual('eventsourcing.domain.model.events#DomainEvent', stored_event.event_topic)
self.assertEqual('{"a":1,"b":2,"c":{"ISO8601_datetime":"2015-09-08T16:20:50.577429"},"d":{"ISO8601_datetime":"2015-09-08T16:20:50.577429+0000"},"e":{"ISO8601_date":"2015-09-08"},"entity_id":"entity1","entity_version":0,"timestamp":3}',
Expand All @@ -32,7 +32,7 @@ def test_serialize_domain_event_with_numpy_array(self):
if numpy is not None:
event1 = DomainEvent(a=numpy.array([10.123456]), entity_version=0, entity_id='entity1', timestamp=3)

stored_event = serialize_domain_event(event1)
stored_event = serialize_domain_event(event1, json_encoder_cls=ObjectJSONEncoder)
self.assertEqual('eventsourcing.domain.model.events#DomainEvent', stored_event.event_topic)
self.assertEqual('{"a":{"__ndarray__":"\\"\\\\u0093NUMPY\\\\u0001\\\\u0000F\\\\u0000{\'descr\': \'<f8\', \'fortran_order\': False, \'shape\': (1,), } \\\\nm\\\\u00fd\\\\u00f4\\\\u009f5?$@\\""},"entity_id":"entity1","entity_version":0,"timestamp":3}',
stored_event.event_attrs)
Expand All @@ -45,7 +45,7 @@ def test_recreate_domain_event(self):
stored_entity_id='entity1',
event_topic='eventsourcing.domain.model.events#DomainEvent',
event_attrs='{"a":1,"b":2,"c":{"ISO8601_datetime":"2015-09-08T16:20:50.577429"},"d":{"ISO8601_datetime":"2015-09-08T16:20:50.577429+0000"},"e":{"ISO8601_date":"2015-09-08"},"entity_id":"entity1","entity_version":0,"timestamp":3}')
domain_event = deserialize_domain_event(stored_event)
domain_event = deserialize_domain_event(stored_event, json_decoder_cls=ObjectJSONDecoder)
self.assertIsInstance(domain_event, DomainEvent)
self.assertEqual('entity1', domain_event.entity_id)
self.assertEqual(1, domain_event.a)
Expand All @@ -63,7 +63,7 @@ def test_recreate_domain_event(self):
stored_entity_id='entity1',
event_topic='os#path',
event_attrs='{"a":1,"b":2,"entity_id":"entity1","timestamp":3}')
self.assertRaises(TypeError, deserialize_domain_event, stored_event)
self.assertRaises(TypeError, deserialize_domain_event, stored_event, json_decoder_cls=ObjectJSONDecoder)

def test_resolve_event_topic(self):
example_topic = 'eventsourcing.domain.model.example#Example.Created'
Expand Down

0 comments on commit 7f9dd66

Please sign in to comment.