Skip to content

Commit

Permalink
refactored Stream.__init__; stream_for builds streams #17
Browse files Browse the repository at this point in the history
  • Loading branch information
numberoverzero committed Oct 17, 2016
1 parent 2ca0ccf commit 26c5515
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 10 deletions.
8 changes: 4 additions & 4 deletions bloop/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
object_loaded,
object_saved,
)
from .stream import Stream
from .stream import stream_for, Stream
from .util import missing, unpack_from_dynamodb, walk_subclasses


Expand Down Expand Up @@ -239,6 +239,6 @@ def scan(self, model_or_index, filter=None, projection="all", limit=None, consis

def stream(self, model, position) -> Stream:
validate_not_abstract(model)
s = Stream(engine=self, model=model, session=self.session)
s.move_to(position=position)
return s
stream = stream_for(self, model)
stream.move_to(position=position)
return stream
4 changes: 2 additions & 2 deletions bloop/stream/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .shard import Shard
from .stream import Stream
__all__ = ["Shard", "Stream"]
from .stream import Stream, stream_for
__all__ = ["Shard", "Stream", "stream_for"]
16 changes: 12 additions & 4 deletions bloop/stream/stream.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
from typing import Dict, List, MutableMapping, Any, Mapping, Optional

from ..session import SessionWrapper
from ..exceptions import InvalidStream
from ..signals import object_loaded
from ..util import unpack_from_dynamodb

from .coordinator import Coordinator


def stream_for(engine, model):
if not model.Meta.stream or not model.Meta.stream.get("arn"):
raise InvalidStream("{!r} does not have a stream arn".format(model))
coordinator = Coordinator(engine=engine, session=engine.session, stream_arn=model.Meta.stream["arn"])
stream = Stream(model=model, engine=engine, coordinator=coordinator)
return stream


class Stream:
"""Provides an approximate iterator over all Records in all Shards in a Stream.
Expand Down Expand Up @@ -36,10 +44,10 @@ class Stream:
next_heartbeat = calculate_next_heartbeat()
stream.heartbeat()
"""
def __init__(self, *, engine, model, session: SessionWrapper):
self.engine = engine
def __init__(self, *, model, engine, coordinator: Coordinator):
self.model = model
self.coordinator = Coordinator(engine=engine, session=session, stream_arn=model.Meta.stream["arn"])
self.engine = engine
self.coordinator = coordinator

def __repr__(self):
# <Stream[User]>
Expand Down

0 comments on commit 26c5515

Please sign in to comment.