Skip to content

Commit

Permalink
Merge d0d03a1 into d62d5b9
Browse files Browse the repository at this point in the history
  • Loading branch information
johnbywater committed Nov 9, 2019
2 parents d62d5b9 + d0d03a1 commit 07a9a62
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 48 deletions.
6 changes: 3 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ before_install:

# Install Cassandra.
- sudo rm -rf /var/lib/cassandra/*
- wget http://www.us.apache.org/dist/cassandra/3.11.4/apache-cassandra-3.11.4-bin.tar.gz
&& tar -xzf apache-cassandra-3.11.4-bin.tar.gz
- wget http://www.us.apache.org/dist/cassandra/3.11.5/apache-cassandra-3.11.5-bin.tar.gz
&& tar -xzf apache-cassandra-3.11.5-bin.tar.gz

# Start Cassandra.
- sudo sh ./apache-cassandra-3.11.4/bin/cassandra -R
- sudo sh ./apache-cassandra-3.11.5/bin/cassandra -R
- sleep 20

before_script:
Expand Down
2 changes: 1 addition & 1 deletion eventsourcing/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "7.2.5dev0"
__version__ = "7.3.0dev0"
71 changes: 48 additions & 23 deletions eventsourcing/application/process.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import time
from collections import OrderedDict, defaultdict
from collections import OrderedDict, defaultdict, deque
from threading import Lock

from eventsourcing.application.notificationlog import NotificationLogReader
Expand Down Expand Up @@ -40,6 +40,7 @@ def __init__(
setup_table=False,
use_direct_query_if_available=False,
notification_log_reader_class=None,
apply_policy_to_generated_events=False,
**kwargs
):
self.policy_func = policy
Expand All @@ -53,6 +54,7 @@ def __init__(
self.notification_log_reader_class = (
notification_log_reader_class or type(self).notification_log_reader_class
)
self.apply_policy_to_generated_events = apply_policy_to_generated_events

super(ProcessApplication, self).__init__(
name=name, setup_table=setup_table, **kwargs
Expand Down Expand Up @@ -198,14 +200,12 @@ def process_upstream_event(self, event, notification_id, upstream_name):
cycle_started = None
# Call policy with the upstream event.
(
all_aggregates,
new_events,
causal_dependencies,
orm_objs_pending_save,
orm_objs_pending_delete,
) = self.call_policy(event)

# Collect pending events.
new_events = self.collect_pending_events(all_aggregates)
# Record process event.
try:
tracking_kwargs = self.construct_tracking_kwargs(
Expand Down Expand Up @@ -298,23 +298,51 @@ def call_policy(self, event):
# Wrap the actual repository, so we can collect aggregates.
repository = WrappedRepository(self.repository)

# Actually call the policy.
new_aggregates = policy(repository, event)

# Collect all aggregates.
repo_aggregates = list(repository.retrieved_aggregates.values())
all_aggregates = repo_aggregates[:]
if new_aggregates is not None:
if not isinstance(new_aggregates, (list, tuple)):
new_aggregates = [new_aggregates]
if self.repository._use_cache:
for new_aggregate in new_aggregates:
self.repository._cache[new_aggregate.id] = new_aggregate
all_aggregates += new_aggregates

# Identify causal dependencies.
fifo = deque()
fifo.append(event)

all_new_events = []

while len(fifo):

# Get the next unprocessed event.
event = fifo.popleft()

# Actually call the policy.
new_aggregates = policy(repository, event)

# Collect all aggregates.
repo_aggregates = list(repository.retrieved_aggregates.values())
all_aggregates = repo_aggregates[:]
if new_aggregates is not None:
if not isinstance(new_aggregates, (list, tuple)):
new_aggregates = [new_aggregates]
for aggregate in new_aggregates:
if self.repository._use_cache:
# Cache new aggregates in repository (avoids replay).
self.repository._cache[aggregate.id] = aggregate

if self.apply_policy_to_generated_events:
# Make new aggregates available in subsequent policy calls.
repository.retrieved_aggregates[aggregate.id] = aggregate
all_aggregates += new_aggregates

# Collect pending events.
new_events = self.collect_pending_events(all_aggregates)

# Enqueue these new events, if policy is being applied to generated events.
if self.apply_policy_to_generated_events:
fifo.extend(new_events)

# Extend 'all events' with these new events.
all_new_events.extend(new_events)

# Translate causal dependencies from version of entity to position in pipeline.
causal_dependencies = []
if self.use_causal_dependencies:
# Todo: Optionally reference causal dependencies in current pipeline
# and then support processing notification from a single pipeline in
# parallel, according to dependencies.
highest = defaultdict(int)
rm = self.event_store.record_manager
for entity_id, entity_version in repository.causal_dependencies:
Expand All @@ -329,11 +357,8 @@ def call_policy(self, event):
causal_dependencies.append(
{"pipeline_id": pipeline_id, "notification_id": notification_id}
)
# Todo: Optionally reference causal dependencies in current pipeline
# and then support processing notification from a single pipeline in
# parallel, according to dependencies.
return (
all_aggregates,
all_new_events,
causal_dependencies,
repository.orm_objs_pending_save,
repository.orm_objs_pending_delete,
Expand Down
83 changes: 64 additions & 19 deletions eventsourcing/tests/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@


class TestProcessApplication(TestCase):
process_class = SQLAlchemyApplication
infrastructure_class = SQLAlchemyApplication

def test_process_with_example_policy(self):
# Construct example process.
process_class = ProcessApplication.mixin(self.process_class)
process_class = ProcessApplication.mixin(self.infrastructure_class)
with process_class(
name="test",
policy=example_policy,
Expand Down Expand Up @@ -69,7 +69,7 @@ def test_process_with_example_policy(self):

def test_process_application_with_snapshotting(self):
# Construct example process.
with ProcessApplicationWithSnapshotting.mixin(self.process_class)(
with ProcessApplicationWithSnapshotting.mixin(self.infrastructure_class)(
name="test",
policy=example_policy,
persist_event_type=ExampleAggregate.Event,
Expand Down Expand Up @@ -108,7 +108,7 @@ def test_causal_dependencies(self):
pipeline_id2 = 1

# Create two events, one has causal dependency on the other.
process_class = ProcessApplication.mixin(self.process_class)
process_class = ProcessApplication.mixin(self.infrastructure_class)
core1 = process_class(
name="core",
# persist_event_type=ExampleAggregate.Created,
Expand Down Expand Up @@ -246,7 +246,7 @@ def projection_policy(repository: WrappedRepository, event):
repository.delete_orm_obj(projection)

# Construct example process.
process_class = ProcessApplication.mixin(self.process_class)
process_class = ProcessApplication.mixin(self.infrastructure_class)
with process_class(
name="test",
policy=projection_policy,
Expand Down Expand Up @@ -312,7 +312,7 @@ def get_projection_record(
return record_manager.session.query(projection_record_class).get(projection_id)

def test_handle_prompt_failed(self):
process = ProcessApplication.mixin(self.process_class)(
process = ProcessApplication.mixin(self.infrastructure_class)(
name="test",
policy=example_policy,
persist_event_type=ExampleAggregate.Event,
Expand Down Expand Up @@ -378,7 +378,8 @@ def test_command_process(self):

self.assertFalse(list(commands.event_store.all_domain_events()))

cmd = CreateExample.__create__()
example_id = uuid4()
cmd = CreateExample.__create__(example_id=example_id)
# cmd = Command.__create__(cmd_method='create_example', cmd_args={})
cmd.__save__()

Expand All @@ -392,10 +393,52 @@ def test_command_process(self):
core.run()

self.assertTrue(list(core.event_store.all_domain_events()))
self.assertIn(example_id, core.repository)

# Example shouldn't be "moved on" because core isn't following itself,
# or applying its policy to generated events.
self.assertFalse(core.repository[example_id].is_moved_on)

commands.close()
core.close()

def test_apply_policy_to_generated_domain_events(self):
commands = CommandProcess.mixin(self.infrastructure_class)(
setup_table=True
)
core = ProcessApplication.mixin(self.infrastructure_class)(
name="core",
policy=example_policy,
session=commands.session,
apply_policy_to_generated_events=True,
)

with core, commands:
self.assertFalse(list(commands.event_store.all_domain_events()))

example_id = uuid4()
cmd = CreateExample.__create__(example_id=example_id)
cmd.__save__()

domain_events = list(commands.event_store.all_domain_events())
self.assertEqual(len(domain_events), 1)

self.assertFalse(list(core.event_store.all_domain_events()))

core.follow("commands", commands.notification_log)
core.run()

self.assertTrue(list(core.event_store.all_domain_events()))
self.assertIn(example_id, core.repository)

# Example should be "moved on" because core is
# applying its policy to generated events.
example = core.repository[example_id]
self.assertTrue(example.is_moved_on)

# Check the "second" aggregate exists.
self.assertIn(example.second_id, core.repository)

def tearDown(self):
assert_event_handlers_empty()

Expand Down Expand Up @@ -430,24 +473,24 @@ def mutate(self, aggregate):


def example_policy(repository, event):
# Whenever an aggregate is created, then "move it on".
if isinstance(event, ExampleAggregate.Created):
# Get aggregate and move it on.
aggregate = repository[event.originator_id]

assert isinstance(aggregate, ExampleAggregate)

# Also create a second entity, allows test to check that
# Create a second aggregate, allowing test to check that
# events from more than one entity are stored.
second_id = uuid4()
other_entity = AggregateRoot.__create__(originator_id=second_id)
aggregate.move_on(second_id=second_id)
return other_entity
second = AggregateRoot.__create__(originator_id=second_id)

# Get first aggregate and move it on, allowing test to
# check that the first aggregate was mutated by the policy.
first = repository[event.originator_id]
assert isinstance(first, ExampleAggregate)
first.move_on(second_id=second_id)

return second

elif isinstance(event, Command.Created):
command_class = resolve_topic(event.originator_topic)
if command_class is CreateExample:
return ExampleAggregate.__create__()
return ExampleAggregate.__create__(event.example_id)


class LogMessage(BaseAggregateRoot):
Expand All @@ -467,7 +510,9 @@ def event_logging_policy(_, event):


class CreateExample(Command):
pass
def __init__(self, example_id=None, **kwargs):
super().__init__(**kwargs)
self.example_id = example_id


class SaveOrmObject(Command):
Expand Down
2 changes: 1 addition & 1 deletion eventsourcing/tests/test_process_with_django.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@


class TestProcessWithDjango(DjangoTestCase, TestProcessApplication):
process_class = DjangoApplication
infrastructure_class = DjangoApplication

def test_projection_into_custom_orm_obj(self):
super(TestProcessWithDjango, self).test_projection_into_custom_orm_obj()
Expand Down
2 changes: 1 addition & 1 deletion eventsourcing/tests/test_process_with_popo.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@


class TestProcessWithPopos(PopoTestCase, TestProcessApplication):
process_class = PopoApplication
infrastructure_class = PopoApplication

@skip("Popo record manager doesn't support pipelines")
def test_causal_dependencies(self):
Expand Down

0 comments on commit 07a9a62

Please sign in to comment.