Skip to content

Commit

Permalink
Handle multiple open shards
Browse files Browse the repository at this point in the history
A stream can have multiple open shards at the same point in time.
Handle this and various code cleanups.
  • Loading branch information
johanneswuerbach committed Jun 29, 2019
1 parent 26e03f5 commit 93ffcee
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 110 deletions.
151 changes: 79 additions & 72 deletions lib/dynamodb/stream/enumerator.rb
Original file line number Diff line number Diff line change
@@ -1,129 +1,136 @@
require "dynamodb/stream/enumerator/version"
require "dynamodb/stream/enumerator/shard_reader"
require "aws-sdk-dynamodbstreams"

module Dynamodb
module Stream
class Enumerator < ::Enumerator
class Error < StandardError; end

def initialize(stream_arn, client_or_options = {})
def initialize(stream_arn, client: nil, client_options: {}, record_batch_size: 1000, throttle_on_empty_records: 1, on_ready: ->{})
@stream_arn = stream_arn
@client = self.class.create_client(client_or_options)
@client = client ? client : Aws::DynamoDBStreams::Client.new(client_options)

@throttle_on_empty_records = 1
@record_batch_size = 1000
@records = []
@throttle_on_empty_records = throttle_on_empty_records
@record_batch_size = record_batch_size
@shard_readers = fetch_current_open_shards.map do |shard|
shard_iterator = get_shard_iterator(shard, "LATEST")
[shard, ShardReader.new(@client, shard_iterator)]
end

on_ready.call

super() do |yielder|
records = []
loop do
while @records.empty? do
load_next_batch
while records.empty? do
records = load_next_batch
end

yielder << @records.shift
yielder << records.shift
end
end
end

def self.for_table(table_name, client_or_options = {})
client = self.create_client(client_or_options)
resp = client.list_streams({
table_name: table_name,
limit: 1
})
private

raise "More then one stream found" if resp.last_evaluated_stream_arn
raise "No streams found" if resp.streams.empty?
# Load next batch of records
def load_next_batch
next_shard_readers = []
records = []

self.new(resp.streams[0].stream_arn, client)
end
until @shard_readers.empty?
shard, shard_reader = @shard_readers.shift

private
records = shard_reader.get_records(@record_batch_size)

def self.create_client(client_or_options)
if client_or_options.is_a?(Aws::DynamoDBStreams::Client)
return client_or_options
end
if shard_reader.finished?
childs_shards = fetch_child_shards(shard)

Aws::DynamoDBStreams::Client.new(client_or_options)
end
next_shard_readers.concat childs_shards.map do |shard|
shard_iterator = get_shard_iterator(shard, "AT_SEQUENCE_NUMBER")
[shard, ShardReader.new(@client, shard_iterator)]
end
else
next_shard_readers << [shard, shard_reader]
end

def load_next_batch
unless @shard_iterator
get_next_shard
get_shard_iterator
break unless records.empty?
end

load_records
# Throttle if no new records are available in all shards
sleep @throttle_on_empty_records if @shard_readers.empty?

@shard_readers.concat(next_shard_readers)

# Throttle if no new records are available
sleep @throttle_on_empty_records if @records.empty?
records
end

# Find all currently open shards
def fetch_current_open_shards
last_evaluated_shard_id = nil

open_shards = []

def load_records
# https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/DynamoDBStreams/Client.html#get_records-instance_method
resp = @client.get_records({
shard_iterator: @shard_iterator,
limit: @record_batch_size
})
each_shard do |shard|
# Shard without an ending sequence number contains the latest records
open_shards << shard unless shard.sequence_number_range.ending_sequence_number
end

@shard_iterator = resp.next_shard_iterator
@records = resp.records
open_shards
end

def get_next_shard
# Find child shards
def fetch_child_shards(shard)
last_evaluated_shard_id = nil
next_shard = nil

childs_shards = []

each_shard do |potential_child_shard|
childs_shards << potential_child_shard if potential_child_shard.parent_shard_id == shard.shard_id
end

childs_shards
end

# Iterate over all shards and yield each shard
def each_shard
# https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/DynamoDBStreams/Client.html#describe_stream-instance_method
last_evaluated_shard_id = nil
limit = 100
loop do
# https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/DynamoDBStreams/Client.html#describe_stream-instance_method
opts = {
stream_arn: @stream_arn,
limit: 100
limit: limit
}
opts[:exclusive_start_shard_id] = @last_evaluated_shard_id if @last_evaluated_shard_id
opts[:exclusive_start_shard_id] = last_evaluated_shard_id if last_evaluated_shard_id

resp = @client.describe_stream(opts)

last_evaluated_shard_id = resp.stream_description.last_evaluated_shard_id
shards = resp.stream_description.shards

shards = resp.stream_description.shards
shards.each do |shard|
# If we followed a shard previously, find the next one
if @current_shard_id
if shard.parent_shard_id == @current_shard_id
next_shard = shard
break
else
next
end
end

# Shard without an ending sequence number contains the latest records
unless shard.sequence_number_range.ending_sequence_number
next_shard = shard
break
end
yield shard
end

break if next_shard || !last_evaluated_shard_id
break if shards.length < limit
end

raise "Couldn't find next shard" unless next_shard

@current_shard_id = next_shard.shard_id
end

def get_shard_iterator
def get_shard_iterator(shard, type)
# https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/DynamoDBStreams/Client.html#get_shard_iterator-instance_method
resp = @client.get_shard_iterator({
shard_id: @current_shard_id,
shard_iterator_type: "LATEST",
opts = {
shard_id: shard.shard_id,
shard_iterator_type: type,
stream_arn: @stream_arn,
})
}

opts[:sequence_number] = shard.sequence_number_range.starting_sequence_number if type == "AT_SEQUENCE_NUMBER"

resp = @client.get_shard_iterator(opts)

@shard_iterator = resp.shard_iterator
resp.shard_iterator
end
end
end
Expand Down
31 changes: 31 additions & 0 deletions lib/dynamodb/stream/enumerator/shard_reader.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
require "dynamodb/stream/enumerator/version"
require "aws-sdk-dynamodbstreams"

module Dynamodb
module Stream
class Enumerator < ::Enumerator
class ShardReader
def initialize(client, shard_iterator)
@client = client
@shard_iterator = shard_iterator
end

def finished?
@shard_iterator.nil?
end

def get_records(limit)
# https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/DynamoDBStreams/Client.html#get_records-instance_method
resp = @client.get_records({
shard_iterator: @shard_iterator,
limit: limit
})

@shard_iterator = resp.next_shard_iterator

resp.records
end
end
end
end
end
68 changes: 30 additions & 38 deletions spec/dynamodb/stream/enumerator_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,11 @@
require "aws-sdk-dynamodbstreams"

RSpec.describe Dynamodb::Stream::Enumerator do
it "has a version number" do
expect(Dynamodb::Stream::Enumerator::VERSION).not_to be nil
end

it "does something useful" do
client = Aws::DynamoDB::Client.new
streams_client = Aws::DynamoDBStreams::Client.new


table_name = "dynamodb-stream-enumerator-test-#{Time.now.to_i}"

begin
client.delete_table({
table_name: table_name,
})
table_name = "dynamodb-stream-enumerator-test-#{Time.now.to_i}"

client.wait_until(:table_not_exists, {
table_name: table_name,
})
rescue Aws::DynamoDB::Errors::ResourceNotFoundException => e
end

client.create_table({
before(:all) do
@client = Aws::DynamoDB::Client.new
@client.create_table({
attribute_definitions: [{
attribute_name: "Id",
attribute_type: "S",
Expand Down Expand Up @@ -52,28 +34,48 @@
],
})

client.wait_until(:table_exists, {
@client.wait_until(:table_exists, {
table_name: table_name,
})
end

after(:all) do
@client.delete_table({
table_name: table_name,
})
end

p "Table created"
it "has a version number" do
expect(Dynamodb::Stream::Enumerator::VERSION).not_to be nil
end

it "does something useful" do
found_records = []

resp = @client.describe_table({
table_name: table_name
})

enumerator_ready = false

t = Thread.new do
records = Dynamodb::Stream::Enumerator.for_table(table_name)
records = Dynamodb::Stream::Enumerator.new(resp.table.latest_stream_arn, on_ready: ->{ enumerator_ready = true })
p "Start enumerator"
records.each do |record|
p "R: #{record}"
found_records << record
end
end

sleep 10
p "Waiting for enumerator"

until enumerator_ready do; end

p "Enumerator running"

test_date = Time.now.iso8601

resp = client.put_item({
resp = @client.put_item({
item: {
"Id" => "test",
"Date" => test_date,
Expand All @@ -83,7 +85,7 @@

p "Item inserted, waiting"

sleep 10
sleep 5

t.exit

Expand All @@ -93,15 +95,5 @@
expect(record.event_name).to eq "INSERT"
expect(record.dynamodb.new_image["Id"].s).to eq "test"
expect(record.dynamodb.new_image["Date"].s).to eq test_date

p "Table deleting"

client.delete_table({
table_name: table_name,
})

# client.wait_until(:table_not_exists, {
# table_name: table_name,
# })
end
end

0 comments on commit 93ffcee

Please sign in to comment.