Skip to content

Commit

Permalink
Retrieve correct latest run id when using s3a (closes snowplow/snowpl…
Browse files Browse the repository at this point in the history
  • Loading branch information
BenFradet authored and peel committed May 25, 2020
1 parent 0336355 commit 0581469
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 18 deletions.
19 changes: 12 additions & 7 deletions lib/snowplow-emr-etl-runner/emr_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive
archive_shredded_step = get_archive_step(csbs[:good], csbs[:archive], run_id, s3_endpoint, ": Shredded S3 -> Shredded Archive S3", encrypted)
@jobflow.add_step(archive_shredded_step)
elsif archive_shredded == 'recover'
latest_run_id = get_latest_run_id(s3, csbs[:good])
latest_run_id = get_latest_run_id(s3, csbs[:good], 'atomic-events')
archive_shredded_step = get_archive_step(csbs[:good], csbs[:archive], latest_run_id, s3_endpoint, ": Shredded S3 -> S3 Shredded Archive", encrypted)
@jobflow.add_step(archive_shredded_step)
else # skip
Expand Down Expand Up @@ -798,21 +798,26 @@ def get_rdb_loader_steps(config, targets, resolver, jar, rdbloader_steps, skip_m
end

# List bucket (enriched:good or shredded:good) and return latest run folder
# Assuming, there's usually just one folder
#
# Parameters:
# +s3+:: AWS S3 client
# +s3_path+:: Full S3 path to folder
def get_latest_run_id(s3, s3_path)
# +suffix+:: Suffix to check for emptiness, atomic-events in case of shredded:good
def get_latest_run_id(s3, s3_path, suffix = '')
run_id_regex = /.*\/run=((\d|-)+)\/.*/
folders = list_object_names(s3, s3_path,
folder = last_object_name(s3, s3_path,
lambda { |k| !(k =~ /\$folder\$$/) and !k[run_id_regex, 1].nil? })
.map { |k| k[run_id_regex, 1] }
if folders.empty?
run_id = folder[run_id_regex, 1]
if run_id.nil?
logger.error "No run folders in [#{s3_path}] found"
raise UnexpectedStateError, "No run folders in [#{s3_path}] found"
else
folders.first
path = File.join(s3_path, "run=#{run_id}", suffix)
if empty?(s3, path)
raise NoDataToProcessError, "Cannot archive #{path}, no data found"
else
run_id
end
end
end

Expand Down
19 changes: 12 additions & 7 deletions lib/snowplow-emr-etl-runner/s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ def empty?(client, location,
empty_impl(client, bucket, prefix, key_filter)
end

# List all object names satisfying a key filter.
# Retrieve the alphabetically last object name satisfying a key filter.
#
# Parameters:
# +client+:: S3 client
# +location+:: S3 url of the folder to list the object names for
# +key_filter+:: filter to apply on the keys
def list_object_names(client, location, key_filter)
def last_object_name(client, location, key_filter)
bucket, prefix = parse_bucket_prefix(location)
list_object_names_impl(client, bucket, prefix, key_filter)
last_object_name_impl(client, bucket, prefix, key_filter)
end

# Extract the bucket and prefix from an S3 url.
Expand All @@ -59,18 +59,23 @@ def parse_bucket_prefix(location)

private

def list_object_names_impl(client, bucket, prefix, key_filter, max_keys = 50)
def last_object_name_impl(client, bucket, prefix, key_filter, max_keys = 50)
continuation_token = nil
filtered = []
last = ""
loop do
response = list_objects(client, bucket, prefix, max_keys, continuation_token)
filtered = filtered + response.contents
new_last = response.contents
.select { |c| key_filter[c.key] }
.map { |c| c.key }
.sort
.last
if not new_last.nil? and new_last > last
last = new_last
end
continuation_token = response.next_continuation_token
break unless response.is_truncated
end
filtered
last
end

def empty_impl(client, bucket, prefix, key_filter, max_keys = 50)
Expand Down
20 changes: 16 additions & 4 deletions spec/snowplow-emr-etl-runner/s3_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,27 @@
end
end

describe '#list_object_names' do
describe '#last_object_name' do
it 'should take a client, a location and a filter argument' do
expect(subject).to respond_to(:list_object_names).with(3).argument
expect(subject).to respond_to(:last_object_name).with(3).argument
end

it 'should filter file names based on the filter' do
s3.stub_responses(:list_objects_v2, { contents: [{ key: 'abc' }, { key: 'defg' }]})
expect(subject.list_object_names(s3, 's3://bucket/prefix', lambda { |k| k.length == 3}))
.to eq(['abc'])
expect(subject.last_object_name(s3, 's3://bucket/prefix', lambda { |k| k.length == 3}))
.to eq('abc')
end

it 'should retrieve the alphabetically file names based on the filter' do
s3.stub_responses(:list_objects_v2, { contents: [{ key: 'abc' }, { key: 'defg' }]})
expect(subject.last_object_name(s3, 's3://bucket/prefix', lambda { |k| k.length >= 0}))
.to eq('defg')
end

it 'should be the empty string if there is nothing' do
s3.stub_responses(:list_objects_v2, { contents: []})
expect(subject.last_object_name(s3, 's3://bucket/prefix', lambda { |k| k.length >= 0}))
.to eq('')
end

end
Expand Down

0 comments on commit 0581469

Please sign in to comment.