Skip to content

Commit

Permalink
Test Coordinator.move_to time #17
Browse files Browse the repository at this point in the history
  • Loading branch information
numberoverzero committed Oct 17, 2016
1 parent 0a68abb commit cfb6a26
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 2 deletions.
10 changes: 8 additions & 2 deletions tests/unit/test_stream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,17 @@ def stream_description(
}


def dynamodb_record_with(key=False, new=False, old=False, sequence_number=None):
def dynamodb_record_with(
key=False, new=False, old=False,
sequence_number=None, creation_time: Optional[arrow.Arrow]=None):
if creation_time is None:
creation_time = 1.46480527E9
else:
creation_time = creation_time.timestamp
template = {
"awsRegion": "us-west-2",
"dynamodb": {
"ApproximateCreationDateTime": 1.46480527E9,
"ApproximateCreationDateTime": creation_time,
"SequenceNumber": sequence_number if sequence_number is not None else "400000000000000499660",
"SizeBytes": 41,
"StreamViewType": "KEYS_ONLY",
Expand Down
117 changes: 117 additions & 0 deletions tests/unit/test_stream/test_coordinator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import arrow
import collections
import pytest
import functools
from bloop.exceptions import InvalidPosition
Expand Down Expand Up @@ -287,6 +289,121 @@ def test_remove_shard(is_active, is_root, has_buffered, coordinator):
assert record_shard is not shard


def test_move_to_future_time(coordinator, session):
"""Moving to a time in the future simply moves to Stream latest."""
# -----------
# 0 -> 1 -> 2
# -> 3
# -----------
# 4 -> 5
# -> 6 -> 7
# -----------
description = session.describe_stream.return_value = stream_description(
8, {0: 1, 1: [2, 3], 4: [5, 6], 6: 7},
stream_arn=coordinator.stream_arn)
expected_active_ids = [description["Shards"][i]["ShardId"] for i in [2, 3, 5, 7]]
session.get_shard_iterator.return_value = "child-iterator-id"

coordinator.move_to(arrow.now().replace(hours=1))

# Remote calls - no new records fetched, loaded child shards and jumped to their latest
session.get_stream_records.assert_not_called()
session.describe_stream.assert_called_once_with(stream_arn=coordinator.stream_arn)
assert session.get_shard_iterator.call_count == len(expected_active_ids)
for expected_shard_id in expected_active_ids:
session.get_shard_iterator.assert_any_call(
stream_arn=coordinator.stream_arn,
shard_id=expected_shard_id,
iterator_type="latest",
sequence_number=None
)

assert set(s.shard_id for s in coordinator.active) == set(expected_active_ids)
assert not coordinator.buffer


def test_move_to_datetime(coordinator, session):
"""Move to a time somewhere in the middle of a stream"""
position = arrow.now()

# 0 -> 1
# -> 2 -> 3
# ^ Never searched for a record, since record is found in 2
description = stream_description(4, {0: [1, 2], 2: 3}, stream_arn=coordinator.stream_arn)
# No shards in the root, and it's exhausted
root_id = description["Shards"][0]["ShardId"]
# No shards in one child, but it's still open
fail_to_seek = description["Shards"][1]["ShardId"]
# Shards in the other child, and it's still open
find_records = description["Shards"][2]["ShardId"]

# Hand back the same iterator id for fail_to_seek to simplify responses table
continue_response = {"Records": [], "NextShardIterator": fail_to_seek}
record = dynamodb_record_with(
key=True,
sequence_number="found-record-sequence-number",
creation_time=position.replace(hours=1))

responses = {
# Shard 0 will immediately exhaust without finding records
root_id: [{"Records": [], "NextShardIterator": last_iterator}],
# Shard 1 will not find any records, but stay open (always has a NextShardIterator)
fail_to_seek: [continue_response] * CALLS_TO_REACH_HEAD,
# Shard 2 will find a record, and stay open.
find_records: [{"Records": [record], "NextShardIterator": "not-followed-iterator-id"}],
}

# Fixed description, should be called once
session.describe_stream.return_value = description
# For simplicity, iterator ids are shard ids
session.get_shard_iterator.side_effect = lambda shard_id, **kwargs: shard_id
# Responses are iterator in the same order as their list in `responses`
session.get_stream_records.side_effect = lambda iterator_id: responses[iterator_id].pop(0)

coordinator.move_to(position)

# Records from seeking are pushed into the buffer. Only one record from second child.
actual_record, source_shard = coordinator.buffer.pop()
assert not coordinator.buffer
assert actual_record["meta"]["sequence_number"] == "found-record-sequence-number"
assert source_shard.shard_id == find_records

# Both child shards are active, even though only one found a child.
# Since root shard was exhausted, its children are the current roots.
assert {shard.shard_id for shard in coordinator.active} == {fail_to_seek, find_records}
assert {shard.shard_id for shard in coordinator.roots} == {fail_to_seek, find_records}

# Remote calls: 1 DescribeStream at the beginning: stream moves to trim_horizon
# 4 GetShardIterator: 2 for root shard (stream jump to trim_horizon, shard jump to trim_horizon)
# 1 for find_records (seek_to jumps to trim_horizon)
# 1 for fail_to_seek (seek_to jumps to trim_horizon)
# 7 GetRecords: 1 for exhausted root (when the stream moves to trim_horizon)
# 1 for find_records shard
# 5 for fail_to_seek, which is still open
session.describe_stream.assert_called_once_with(stream_arn=coordinator.stream_arn)
assert session.get_shard_iterator.call_count == 4
for shard_id in {root_id, fail_to_seek, find_records}:
session.get_shard_iterator.assert_any_call(
stream_arn=coordinator.stream_arn,
shard_id=shard_id,
iterator_type="trim_horizon",
sequence_number=None
)
# This odd construct is because we can't use assert_has_calls (doesn't check number of each call)
# and we can't use call_args_list directly, because we don't know the order the root's children will
# have when they're unpacked (dict -> list in stream.shard.unpack_shards).
# Instead, we pull the args (c[0]) out of each call in call_args_list, and grab the first element (c[0][0]).
# We don't need to check the length of call args or call kwargs, because the side_effect only takes one arg.
args = [c[0][0] for c in session.get_stream_records.call_args_list]
calls_with_counts = collections.Counter(args)
# Doesn't contain the child shard of find_records, because it was never searched.
assert calls_with_counts == {
root_id: 1,
find_records: 1,
fail_to_seek: CALLS_TO_REACH_HEAD
}


def test_move_to_trim_horizon(coordinator, session):
"""Moving to the trim_horizon clears existing state and adds new shards"""
# All of these should be cleaned up entirely
Expand Down

0 comments on commit cfb6a26

Please sign in to comment.