Skip to content

Commit

Permalink
Observable repositories [2] (#103)
Browse files Browse the repository at this point in the history
A follow-up to #92:
- revised the current implementation of ObservableRepository and inheritance hierarchy;
- moved Observable-related code from Base SQLAlchemy repository to the separate base class;
- made all SQLAlchemy-related repos not Observable by default.

Full logs:
* WIP: Made ObservableRepository to inherit from AbsRepository
* WIP: Updated SQLAlchemy repo implementation to use ObservableRepository as a base class
* WIP: Updated ThingRepository interface to inherit from ObservableRepository
* WIP: Added a new BaseObservableRepository for SQLAlchemy repos
* WIP: Removed Observable-related code from SQLAlchemy BaseRepository
  • Loading branch information
s-kostyuk committed May 12, 2018
1 parent 65292f3 commit 67e7dc4
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 116 deletions.
148 changes: 148 additions & 0 deletions dpl/repo_impls/sql_alchemy/base_observable_repository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import weakref
from typing import TypeVar, Optional, MutableSet, Type
from functools import partial

import sqlalchemy.event

from dpl.model.domain_id import TDomainId
from dpl.model.base_entity import BaseEntity
from dpl.repos.observable_repository import (
ObservableRepository, RepositoryEventType
)
from dpl.utils.observer import Observer
from .db_session_manager import DbSessionManager
from .base_repository import BaseRepository


TEntity = TypeVar("TEntity", bound=BaseEntity)


class BaseObservableRepository(BaseRepository[TEntity], ObservableRepository):
"""
A base implementation of SQLAlchemy repository which also implements
an ObservableRepository interface
"""
def __init__(
self, session_manager: DbSessionManager,
stored_cls: Type[TEntity]
):
"""
Constructor. Receives an instance of SessionManager
to be used and saves a link to it to the internal
variable. Also it receives a type of stored objects
used for fetching from data from appropriate DB table
:param session_manager: an instance of SessionManager
to be used for requesting SQLAlchemy Sessions
:param stored_cls: a type of objects that to stored
in this Repository; this object must to be
associated with a DB table by means of
SQLAlchemy ORM mappers
"""
super().__init__(session_manager, stored_cls)

self._observers = set() # type: MutableSet[Observer]
self._weak_self = weakref.proxy(self)

self._setup_object_event_handlers()

def _setup_object_event_handlers(self) -> None:
"""
Performs setup of handlers for object addition, modification and
removal for SQLAlchemy-mapped class
:return: None
"""
added_listener = partial(
self._db_event_handler,
event_type=RepositoryEventType.added
)

modified_listener = partial(
self._db_event_handler,
event_type=RepositoryEventType.modified
)

deleted_listener = partial(
self._db_event_handler,
event_type=RepositoryEventType.deleted
)

sqlalchemy.event.listen(
target=self._stored_cls,
identifier='after_insert',
fn=added_listener
)

sqlalchemy.event.listen(
target=self._stored_cls,
identifier='after_update',
fn=modified_listener
)

sqlalchemy.event.listen(
target=self._stored_cls,
identifier='after_delete',
fn=deleted_listener
)

def _db_event_handler(
self, mapper, connection,
target: TEntity, event_type: RepositoryEventType
) -> None:
"""
A handler method to be called of any of the objects controlled by
this Repository will be added to, modified in or deleted from the DB
:param mapper: an instance of SQLAlchemy DB Mapper
:param connection: an instance of SQLAlchemy DB Connection
:param target: an object that was altered
:param event_type: Enum value; determines if the object was added,
modified or removed
:return: None
"""
self._notify(
object_id=target.domain_id,
event_type=event_type,
object_ref=weakref.proxy(target)
)

def subscribe(self, observer: Observer) -> None:
"""
Adds the specified Observer to the list of subscribers
:param observer: an instance of Observer to be added
:return: None
"""
self._observers.add(observer)

def unsubscribe(self, observer: Observer) -> None:
"""
Removes the specified Observer from the list of subscribers
:param observer: an instance of Observer to be deleted
:return: None
"""
self._observers.discard(observer)

def _notify(
self, object_id: TDomainId, event_type: RepositoryEventType,
object_ref: Optional[TEntity]
):
"""
Notifies all of the subscribers that an object was modified in,
added to or deleted from this Repository
:param object_id: an identifier of an altered object
:param event_type: enum value, specifies what happened to the object
:param object_ref: a reference to the altered object or None if it was
deleted
:return: None
"""
for o in self._observers:
o.update(
source=self._weak_self,
event_type=event_type,
object_id=object_id,
object_ref=object_ref
)
112 changes: 2 additions & 110 deletions dpl/repo_impls/sql_alchemy/base_repository.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,23 @@
import weakref
from typing import (
TypeVar, Optional, MutableMapping, Sequence, Iterable, Type,
MutableSet
TypeVar, Optional, MutableMapping, Sequence, Iterable, Type
)
from functools import partial

from sqlalchemy import func
from sqlalchemy.orm import Session
import sqlalchemy.event

from dpl.utils.flatten import flatten
from dpl.model.domain_id import TDomainId
from dpl.model.base_entity import BaseEntity
from dpl.repos.abs_repository import AbsRepository
from dpl.repos.observable_repository import ObservableRepository, RepositoryEventType
from dpl.utils.observer import Observer
from .db_session_manager import DbSessionManager


TEntity = TypeVar("TEntity", bound=BaseEntity)
TEntityCollection = MutableMapping[TDomainId, TEntity]


class BaseRepository(AbsRepository[TEntity], ObservableRepository[TEntity]):
class BaseRepository(AbsRepository[TEntity]):
"""
A base implementation of SQLAlchemy-based repository
"""
Expand All @@ -44,111 +39,8 @@ def __init__(
"""
self._session_manager = session_manager
self._stored_cls = stored_cls
self._observers = set() # type: MutableSet[Observer]
self._weak_self = weakref.proxy(self)

self._setup_object_event_handlers()

def _setup_object_event_handlers(self) -> None:
"""
Performs setup of handlers for object addition, modification and
removal for SQLAlchemy-mapped class
:return: None
"""
added_listener = partial(
self._db_event_handler,
event_type=RepositoryEventType.added
)

modified_listener = partial(
self._db_event_handler,
event_type=RepositoryEventType.modified
)

deleted_listener = partial(
self._db_event_handler,
event_type=RepositoryEventType.deleted
)

sqlalchemy.event.listen(
target=self._stored_cls,
identifier='after_insert',
fn=added_listener
)

sqlalchemy.event.listen(
target=self._stored_cls,
identifier='after_update',
fn=modified_listener
)

sqlalchemy.event.listen(
target=self._stored_cls,
identifier='after_delete',
fn=deleted_listener
)

def _db_event_handler(
self, mapper, connection, target: TEntity, event_type: RepositoryEventType
) -> None:
"""
A handler method to be called of any of the objects controlled by
this Repository will be added to, modified in or deleted from the DB
:param mapper: an instance of SQLAlchemy DB Mapper
:param connection: an instance of SQLAlchemy DB Connection
:param target: an object that was altered
:param event_type: Enum value; determines if the object was added,
modified or removed
:return: None
"""
self._notify(
object_id=target.domain_id,
event_type=event_type,
object_ref=weakref.proxy(target)
)

def subscribe(self, observer: Observer) -> None:
"""
Adds the specified Observer to the list of subscribers
:param observer: an instance of Observer to be added
:return: None
"""
self._observers.add(observer)

def unsubscribe(self, observer: Observer) -> None:
"""
Removes the specified Observer from the list of subscribers
:param observer: an instance of Observer to be deleted
:return: None
"""
self._observers.discard(observer)

def _notify(
self, object_id: TDomainId, event_type: RepositoryEventType,
object_ref: Optional[TEntity]
):
"""
Notifies all of the subscribers that an object was modified in,
added to or deleted from this Repository
:param object_id: an identifier of an altered object
:param event_type: enum value, specifies what happened to the object
:param object_ref: a reference to the altered object or None if it was
deleted
:return: None
"""
for o in self._observers:
o.update(
source=self._weak_self,
event_type=event_type,
object_id=object_id,
object_ref=object_ref
)

@property
def _session(self) -> Session:
"""
Expand Down
5 changes: 2 additions & 3 deletions dpl/repos/abs_thing_repository.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from typing import Optional

from .abs_repository import AbsRepository, TDomainId
from .observable_repository import ObservableRepository
from .observable_repository import ObservableRepository, TDomainId
from dpl.things import Thing


class AbsThingRepository(AbsRepository[Thing], ObservableRepository[Thing]):
class AbsThingRepository(ObservableRepository[Thing]):
"""
Pure abstract base implementation of Repository
containing Things.
Expand Down
6 changes: 3 additions & 3 deletions dpl/repos/observable_repository.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from enum import Enum
from typing import TypeVar, Generic, Optional
from typing import TypeVar, Optional

from dpl.utils.observable import Observable
from dpl.model.domain_id import TDomainId

from .abs_repository import AbsRepository

T = TypeVar('T')

Expand All @@ -14,7 +14,7 @@ class RepositoryEventType(Enum):
deleted = 2


class ObservableRepository(Observable, Generic[T]):
class ObservableRepository(AbsRepository[T], Observable):
"""
ObservableRepository is a declaration of an interface to be implemented
by Observable Repositories. Is a sample of Observable pattern; notifies all
Expand Down

0 comments on commit 67e7dc4

Please sign in to comment.