Skip to content

Commit

Permalink
Coordinator.heartbeat also cleans up exhausted shards #17
Browse files Browse the repository at this point in the history
This isn't a bug fix, just an optimization.

Previously, if an active shard without a sequence_number became
exhausted, heartbeat() would still try to get records out of it.
Because Shard.__next__ returns immediately when self.exhausted,
this wasn't an issue.  The shard wouldn't be removed from active
on future heartbeats.  It would be removed from the active list
and its children would be promoted on the next Coordinator.next
when the buffer is empty.

Now, exhausted active shards will be cleaned up as part of a
heartbeat.  This means shards in the active list will never be
exhausted[0].  In the future, that distinction could be relevant,
or the assumption made that exhausted shards aren't part of the active
list.

[0] Won't be exhausted locally.  The actual ShardIterator may be
    the last for the Shard, but that isn't known yet.
  • Loading branch information
numberoverzero committed Oct 16, 2016
1 parent 220d8e3 commit 812c0ed
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 21 deletions.
36 changes: 20 additions & 16 deletions bloop/stream/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,7 @@ def advance_shards(self) -> None:
record_shard_pairs.extend((record, shard) for record in records)
self.buffer.push_all(record_shard_pairs)

# 1) Clean up exhausted Shards.
# Can't modify the active list while iterating it.
to_remove = [shard for shard in self.active if shard.exhausted]
for shard in to_remove:
# A) Fetch Shard's children if they haven't been loaded
# (perhaps the Shard just closed?)
shard.load_children()

# B) Remove the shard from the Coordinator. If the Shard has
# children, those children are now active.
# If the Shard was a root, those children become roots.
self.remove_shard(shard)

# C) Move each child Shard to its trim_horizon.
for child in shard.children:
child.jump_to(iterator_type="trim_horizon")
self._handle_exhausted()

def heartbeat(self) -> None:
# Try to keep active shards with ``latest`` and ``trim_horizon`` iterators alive.
Expand All @@ -101,6 +86,25 @@ def heartbeat(self) -> None:
# Success! This shard now has an ``at_sequence`` iterator
if records:
self.buffer.push_all((record, shard) for record in records)
self._handle_exhausted()

def _handle_exhausted(self):
# 1) Clean up exhausted Shards.
# Can't modify the active list while iterating it.
to_remove = [shard for shard in self.active if shard.exhausted]
for shard in to_remove:
# A) Fetch Shard's children if they haven't been loaded
# (perhaps the Shard just closed?)
shard.load_children()

# B) Remove the shard from the Coordinator. If the Shard has
# children, those children are now active.
# If the Shard was a root, those children become roots.
self.remove_shard(shard)

# C) Move each child Shard to its trim_horizon.
for child in shard.children:
child.jump_to(iterator_type="trim_horizon")

@property
def token(self) -> Dict[str, Any]:
Expand Down
9 changes: 7 additions & 2 deletions tests/unit/test_stream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,14 @@ def build_shards(n: int, shape: Optional[Dict[int, Union[int, List[int]]]]=None,
return shards


def stream_description(n: int, shape: Dict[int, Union[int, List[int]]], stream_arn=None) -> Dict[str, Any]:
def stream_description(
n: int, shape: Optional[Dict[int, Union[int, List[int]]]]=None,
stream_arn=None) -> Dict[str, Any]:
"""Build a DescribeStream response with the given number of shards"""
shard_ids = [random_str("shard_id-{}-".format(i), 4) for i in range(n)]
# Default to flat shards, no hierarchy
shape = shape or {}

shard_ids = [random_str("shard-id-{}-".format(i), 4) for i in range(n)]
template = {
"SequenceNumberRange": {
"EndingSequenceNumber": "820400000000000001192334",
Expand Down
8 changes: 5 additions & 3 deletions tests/unit/test_stream/test_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ def mock_get_records(iterator_id):
has_sequence_id: {}
}[iterator_id]
session.get_stream_records.side_effect = mock_get_records
# None of the shards have children
session.describe_stream.return_value = {"StreamArn": coordinator.stream_arn, "Shards": []}

make_shard = functools.partial(Shard, stream_arn=coordinator.stream_arn, shard_id="shard-id", session=session)
coordinator.active = [
Expand All @@ -211,20 +213,20 @@ def mock_get_records(iterator_id):


def test_heartbeat_until_sequence_number(coordinator, session):
"""After heartbeat() finds records for a shard, the shard doens't check during the next heartbeat."""
"""After heartbeat() finds records for a shard, the shard doesn't check during the next heartbeat."""
shard = Shard(stream_arn=coordinator.stream_arn, shard_id="shard-id", session=session,
iterator_id="iterator-id", iterator_type="latest")
coordinator.active.append(shard)

session.get_stream_records.side_effect = build_get_records_responses(1)
session.get_stream_records.side_effect = build_get_records_responses(1, 0)

# First call fetches records from DynamoDB
coordinator.heartbeat()
assert coordinator.buffer
assert shard.sequence_number is not None
session.get_stream_records.assert_called_once_with("iterator-id")

# Second call ships the shard, since it now has a sequence_number.
# Second call skips the shard, since it now has a sequence_number.
coordinator.heartbeat()
assert session.get_stream_records.call_count == 1

Expand Down

0 comments on commit 812c0ed

Please sign in to comment.