-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Streams #17
Comments
While thinking about inheritance I came up with a draft for one solution to streaming across multiple engines: # ------------------------------------------------------
# User function that is called when a change passes through the streaming api
# Signals a stop to processing once 100 changes are observed.
# Return Truthy values to stop processing, Falsey values to continue
# ------------------------------------------------------
changes = 0
def handle_update(engine, model, old, new):
global changes
changes += 1
# Stop processing after 100 changes
if changes > 99:
return True
else:
return False
# Blocking call that monitors all tables registered for any number of engines
bloop.stream.monitor(handle_update, legacy_engine, engine_compat)
# ------------------------------------------------------
# bloop.streaming.py
# ------------------------------------------------------
def monitor(callback, *engines):
tables = collections.defaultdict(list)
for engine in engines:
for model in engine.models:
tables[model.Meta.table_name].append((engine, model))
_listen_for(callback, tables)
def _listen_for(callback, tables):
# client.listen_to is some infinite generator that
# yields tuples of changes for a given list of table names
for change in client.listen_to(tables.keys()):
table, old_value, new_value = change
# Engines should be unordered - don't terminate until all
# engines for a given change have had a chance to process it
terminate = False
for engine, model in tables[table]:
old = engine._load(model, old_value)
new = engine._load(model, new_value)
terminate |= bool(callback(engine, model, old, new))
if terminate:
return |
The stuff above was trying to do too many things that aren't flexible:
Keeping the call pattern closer to the streaming API means less surprises, and the user can customize everything above. This reduces the goals of
This is the design I've wanted for a while but for some reason got caught up with callbacks. I still need to come up with a way to switch between streaming modes (I think there's old object, new object, and old+new object) which means another engine config value, and make sure that the yielded objects are consistent between streaming modes (so we don't have old->obj, new->obj, old+new->dict). That probably means yielding a dict for each event, like: {
"old": instance_of_model,
"new": None
} class Stream:
def __init__(self, *, engine, model, token):
self.engine = engine
self.model = model
self.token = token
def __iter__(self):
if self.token is None:
self._generate_token()
while True:
events = self._advance_stream()
if events:
for event in events:
yield self._unpack(event)
else:
# Caught up to head of the stream, explicit None lets caller
# decide how long to wait before next poll.
# We don't need this for the backlog case (every call returns items)
# because it means we aren't at the head of the stream yet, and there
# are unprocessed events.
yield None
def _generate_token(self):
# TODO some boto3 calls to establish shard, etc
pass
def _advance_stream(self):
# TODO result = boto3.some_call(unpack self.token)
# TODO update self.token from result
# return None on empty result
pass
def _unpack(event):
# TODO handle old + new
blob = some_unpacking_here(event)
obj = self.engine._instance(self.model)
self.engine._update(
obj, blob, obj.Meta.columns,
context={"engine": self.engine})
return obj
class Engine:
def stream(self, model, *, token=None):
# TODO abstract check
return iter(Stream(engine=self, model=model, token=token))
# ===============================================================
stream = engine.stream(MyModel)
processed = 0
while processed < 100:
obj = next(stream)
if obj is not None:
process(obj)
processed += 1
else:
# At head of stream, no new events. Back off polling.
time.sleep(1) |
The event item dict will also need to include the type of modification performed. From the Record this is For a new-object-only stream, an event would look like: {
"type": "MODIFY",
"old": None,
"new": instance_of_model
} Users may want to start at the beginning (replay the whole stream), or start at the end (process only the events that occur after they make the stream object), or start somewhere in the middle (token from previous bloop stream). In the DynamoDB api this is exposed when getting an iterator for a single shard through the User intention though is at the stream level, and needs to expose three options: existing token, start, or end.
|
The example above has class Stream:
...
@property
def token(self):
# Pack the streamArn, shardId, and shardIterator into a dict
...
def __iter__(self):
return self
def __next__(self):
# __iter__ code from the previous example
...
class Engine:
...
def stream(self, model, *, token=None):
return Stream(engine=self, model=model, token=token) Making the Stream class an iterable and its own iterator means users can still call next() on the returned object, but can now get the token, too: stream = engine.stream(MyModel)
processed = 0
while processed < 100:
obj = next(stream)
if obj is not None:
process(obj)
processed += 1
else:
# At head of stream, no new events. Back off polling.
time.sleep(1)
# See the new token after every call
print(stream.token) There's no need to make a There's a little work to make two streams start at the same point when the first starts at the head of the stream, though. The user would need something like: first_stream = engine.stream(MyModel, whatever_indicates_start)
snapshot_start_token = first_stream.token
# iterate through first_stream here
...
# Use the token that was computed at the start of the first stream to
# construct another bloop.Stream that starts at the same point
second_stream = engine.stream(MyModel, token=snapshot_start_token) |
This will make it into the 1.0.0 release; it's behind the docs rewrite (#13) which has been coming along in small bursts. |
Spent some time thinking about how to specify the stream view type when the model is declared. Simple and intuitive are my highest priorities, followed much further down by extensibility. Here's the enum that bloop will map options to: StreamViewType Preference (tl;dr)Leaning towards a stream = {"new": True} # (vs) stream = "new" but I do like that it mirrors what I expect the event format will be: {
"type": string-based-enum,
"new": obj,
"old": obj
} CriteriaAny option needs to fulfill the following criteria:
OptionsThese aren't exhaustive, but they make use of the most common python constructs (lists/tuples and dicts). I expect iterations on these options, but new orthogonal shapes are unlikely. String or Tuple of Stringsclass User(Base):
class Meta:
# Default if not specified
stream = None
# string or tuple
stream = "keys"
stream = "new"
stream = "old"
stream = "new", "old"
# invalid, can't specify new and keys
stream = "new", "keys"
# invalid, unknown view type "foo"
stream = "old", "foo"
id = Column(Integer, hash_key=True, name="h")
sequence = Column(Integer, range_key=True, name="r")
data = Column(Binary)
some_gsi = Column(Integer)
by_gsi = bloop.GlobalSecondaryIndex(projection="keys", hash_key="data") Dict of Optionsclass User(Base):
class Meta:
# Default if not specified
stream = None
stream = {
"new": True,
"old": True
}
# options default to false
stream = {
"old": True
}
# invalid, can't specify new and keys
stream = {
"new": True,
"old": False,
"keys": True
}
# valid; unexpected keys are ignored
stream = {
"new": True,
"foo": "bar"
}
id = Column(Integer, hash_key=True, name="h")
sequence = Column(Integer, range_key=True, name="r")
data = Column(Binary)
some_gsi = Column(Integer)
by_gsi = bloop.GlobalSecondaryIndex(projection="keys", hash_key="data") TradeoffsIn either case, Pros of strings:
Cons of strings:
Pros of dicts:
Cons of dicts:
|
To try to minimize deprecation + migration if/when new stream features are available, this will be the syntax for specifying a stream: # No stream (default if missing)
stream = None
# Basic keys-only stream
stream = {
"include": ["keys"]
}
# new and old, with an expected stream label
stream = {
"include": ["new", "old"],
"label": "2016-08-29T03:26:22.376"
} |
Debugging info from some exploratory
When a stream exists, all three of these are returned. The UI shows the table as "updating" but the status is immediately ACTIVE after adding or removing a stream. The fields look like this:
The combination account, table, stream label is guaranteed unique, but a label can be the same across tables. |
Stream creation and validation are done! Next, debugging some stream calls. |
The declaration looks a bit weird now, since "include" is the only key, and a required key. I'd be shocked though if there are never any new configuration options for a Stream, and there are never any bloop-specific configuration options for a Stream.
The inner `was_active = shard in self.active` was redundant, because the only shards considered for removal were *from* `self.active`.
This isn't a bug fix, just an optimization. Previously, if an active shard without a sequence_number became exhausted, heartbeat() would still try to get records out of it. Because Shard.__next__ returns immediately when self.exhausted, this wasn't an issue. The shard wouldn't be removed from active on future heartbeats. It would be removed from the active list and its children would be promoted on the next Coordinator.next when the buffer is empty. Now, exhausted active shards will be cleaned up as part of a heartbeat. This means shards in the active list will never be exhausted[0]. In the future, that distinction could be relevant, or the assumption made that exhausted shards aren't part of the active list. [0] Won't be exhausted locally. The actual ShardIterator may be the last for the Shard, but that isn't known yet.
The first thing shard.seek_to does is jump_to("trim_horizon"), so there's no need to do an extra jump within the shard walking of _move_stream_time
It's easy to lose track of the frames involved in the recursive validation. Hopefully a visual clears up the overlap between token shards and actual shards.
I should get around to writing a sample of streaming records without bloop: https://gist.github.com/numberoverzero/8f95c5b21c50d2140347b43d63dd1488 That's going to suck. |
New features are now GA:
How can we expose streams elegantly? Ideally we could leverage the Type system to load values as they are retrieved from the stream.
Some very brief review of the GetRecords operation suggests we could load the new and old values into two objects - StreamRecord is encouraging that all attributes are included, noting "The item in the DynamoDB table as it appeared after it was modified." and "The item in the DynamoDB table as it appeared before it was modified."
The text was updated successfully, but these errors were encountered: