Skip to content

Commit

Permalink
Changed EventStore's async_put and async_get to use thread pool
Browse files Browse the repository at this point in the history
executor to do mapping between stored events and domain events.

Because it's mostly CPU-bound work that runs in C-extensions. See
note in Python docs about CPU-bound functions that release the GIL:
https://docs.python.org/3/library/asyncio-task.html#running-in-threads
  • Loading branch information
johnbywater committed Oct 12, 2021
1 parent 6bd9f55 commit 5d44f06
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 31 deletions.
8 changes: 4 additions & 4 deletions eventsourcing/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ def get(self, aggregate_id: UUID, version: Optional[int] = None) -> TAggregate:
lte=version,
)
try:
snapshot = next(snapshots)
except StopIteration:
snapshot = snapshots[0]
except IndexError:
pass
else:
gt = snapshot.originator_version
Expand Down Expand Up @@ -110,8 +110,8 @@ async def async_get(
lte=version,
)
try:
snapshot = next(snapshots)
except StopIteration:
snapshot = snapshots[0]
except IndexError:
pass
else:
gt = snapshot.originator_version
Expand Down
55 changes: 30 additions & 25 deletions eventsourcing/persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@
import os
import uuid
from abc import ABC, abstractmethod
from asyncio import get_running_loop
from dataclasses import dataclass
from datetime import datetime
from decimal import Decimal
from typing import (
Any,
Callable,
Dict,
Generic,
Iterator,
List,
Mapping,
Optional,
Type,
TypeVar,
cast,
)
from uuid import UUID
Expand Down Expand Up @@ -590,12 +592,7 @@ def put(self, events: List[TDomainEvent], **kwargs: Any) -> None:
Stores domain events in aggregate sequence.
"""
self.recorder.insert_events(
list(
map(
self.mapper.from_domain_event,
events,
)
),
self._from_domain_events(events),
**kwargs,
)

Expand All @@ -604,12 +601,7 @@ async def async_put(self, events: List[TDomainEvent], **kwargs: Any) -> None:
Stores domain events in aggregate sequence using asyncio.
"""
await self.recorder.async_insert_events(
list(
map(
self.mapper.from_domain_event,
events,
)
),
await _to_thread(self._from_domain_events, events),
**kwargs,
)

Expand All @@ -620,19 +612,14 @@ def get(
lte: Optional[int] = None,
desc: bool = False,
limit: Optional[int] = None,
) -> Iterator[TDomainEvent]:
) -> List[TDomainEvent]:
"""
Retrieves domain events from aggregate sequence.
"""
return map(
self.mapper.to_domain_event,
return self._to_domain_events(
self.recorder.select_events(
originator_id=originator_id,
gt=gt,
lte=lte,
desc=desc,
limit=limit,
),
originator_id=originator_id, gt=gt, lte=lte, desc=desc, limit=limit
)
)

async def async_get(
Expand All @@ -642,12 +629,12 @@ async def async_get(
lte: Optional[int] = None,
desc: bool = False,
limit: Optional[int] = None,
) -> Iterator[TDomainEvent]:
) -> List[TDomainEvent]:
"""
Retrieves domain events from aggregate sequence.
"""
return map(
self.mapper.to_domain_event,
return await _to_thread(
self._to_domain_events,
await self.recorder.async_select_events(
originator_id=originator_id,
gt=gt,
Expand All @@ -657,6 +644,24 @@ async def async_get(
),
)

def _from_domain_events(self, events: List[TDomainEvent]) -> List[StoredEvent]:
return list(map(self.mapper.from_domain_event, events))

def _to_domain_events(self, stored_events: List[StoredEvent]) -> List[TDomainEvent]:
return list(
map(
self.mapper.to_domain_event,
stored_events,
)
)


_T = TypeVar("_T")


async def _to_thread(func: Callable[..., _T], *args: Any) -> _T:
return await get_running_loop().run_in_executor(None, func, *args)


class InfrastructureFactory(ABC):
"""
Expand Down
2 changes: 1 addition & 1 deletion eventsourcing/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class ProcessEvent:

def __init__(self, tracking: Optional[Tracking] = None):
"""
Initalises the process event with the given tracking object.
Initialises the process event with the given tracking object.
"""
self.tracking = tracking
self.events: List[AggregateEvent] = []
Expand Down
2 changes: 1 addition & 1 deletion eventsourcing/tests/test_snapshotting.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def test(self):

# Get snapshot.
snapshots = snapshot_store.get(account.id, desc=True, limit=1)
snapshot = next(snapshots)
snapshot = snapshots[0]
assert isinstance(snapshot, Snapshot)

# Reconstruct the bank account.
Expand Down

0 comments on commit 5d44f06

Please sign in to comment.