diff --git a/lib/snowplow-emr-etl-runner/emr_job.rb b/lib/snowplow-emr-etl-runner/emr_job.rb index 6dfe823..468cf56 100755 --- a/lib/snowplow-emr-etl-runner/emr_job.rb +++ b/lib/snowplow-emr-etl-runner/emr_job.rb @@ -593,7 +593,7 @@ def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive archive_shredded_step = get_archive_step(csbs[:good], csbs[:archive], run_id, s3_endpoint, ": Shredded S3 -> Shredded Archive S3", encrypted) @jobflow.add_step(archive_shredded_step) elsif archive_shredded == 'recover' - latest_run_id = get_latest_run_id(s3, csbs[:good]) + latest_run_id = get_latest_run_id(s3, csbs[:good], 'atomic-events') archive_shredded_step = get_archive_step(csbs[:good], csbs[:archive], latest_run_id, s3_endpoint, ": Shredded S3 -> S3 Shredded Archive", encrypted) @jobflow.add_step(archive_shredded_step) else # skip @@ -798,21 +798,26 @@ def get_rdb_loader_steps(config, targets, resolver, jar, rdbloader_steps, skip_m end # List bucket (enriched:good or shredded:good) and return latest run folder - # Assuming, there's usually just one folder # # Parameters: # +s3+:: AWS S3 client # +s3_path+:: Full S3 path to folder - def get_latest_run_id(s3, s3_path) + # +suffix+:: Suffix to check for emptiness, atomic-events in case of shredded:good + def get_latest_run_id(s3, s3_path, suffix = '') run_id_regex = /.*\/run=((\d|-)+)\/.*/ - folders = list_object_names(s3, s3_path, + folder = last_object_name(s3, s3_path, lambda { |k| !(k =~ /\$folder\$$/) and !k[run_id_regex, 1].nil? }) - .map { |k| k[run_id_regex, 1] } - if folders.empty? + run_id = folder[run_id_regex, 1] + if run_id.nil? logger.error "No run folders in [#{s3_path}] found" raise UnexpectedStateError, "No run folders in [#{s3_path}] found" else - folders.first + path = File.join(s3_path, "run=#{run_id}", suffix) + if empty?(s3, path) + raise NoDataToProcessError, "Cannot archive #{path}, no data found" + else + run_id + end end end diff --git a/lib/snowplow-emr-etl-runner/s3.rb b/lib/snowplow-emr-etl-runner/s3.rb index 32ed2dc..8042b18 100644 --- a/lib/snowplow-emr-etl-runner/s3.rb +++ b/lib/snowplow-emr-etl-runner/s3.rb @@ -36,15 +36,15 @@ def empty?(client, location, empty_impl(client, bucket, prefix, key_filter) end - # List all object names satisfying a key filter. + # Retrieve the alphabetically last object name satisfying a key filter. # # Parameters: # +client+:: S3 client # +location+:: S3 url of the folder to list the object names for # +key_filter+:: filter to apply on the keys - def list_object_names(client, location, key_filter) + def last_object_name(client, location, key_filter) bucket, prefix = parse_bucket_prefix(location) - list_object_names_impl(client, bucket, prefix, key_filter) + last_object_name_impl(client, bucket, prefix, key_filter) end # Extract the bucket and prefix from an S3 url. @@ -59,18 +59,23 @@ def parse_bucket_prefix(location) private - def list_object_names_impl(client, bucket, prefix, key_filter, max_keys = 50) + def last_object_name_impl(client, bucket, prefix, key_filter, max_keys = 50) continuation_token = nil - filtered = [] + last = "" loop do response = list_objects(client, bucket, prefix, max_keys, continuation_token) - filtered = filtered + response.contents + new_last = response.contents .select { |c| key_filter[c.key] } .map { |c| c.key } + .sort + .last + if not new_last.nil? and new_last > last + last = new_last + end continuation_token = response.next_continuation_token break unless response.is_truncated end - filtered + last end def empty_impl(client, bucket, prefix, key_filter, max_keys = 50) diff --git a/spec/snowplow-emr-etl-runner/s3_spec.rb b/spec/snowplow-emr-etl-runner/s3_spec.rb index 80c4bb4..d8e5de4 100644 --- a/spec/snowplow-emr-etl-runner/s3_spec.rb +++ b/spec/snowplow-emr-etl-runner/s3_spec.rb @@ -54,15 +54,27 @@ end end - describe '#list_object_names' do + describe '#last_object_name' do it 'should take a client, a location and a filter argument' do - expect(subject).to respond_to(:list_object_names).with(3).argument + expect(subject).to respond_to(:last_object_name).with(3).argument end it 'should filter file names based on the filter' do s3.stub_responses(:list_objects_v2, { contents: [{ key: 'abc' }, { key: 'defg' }]}) - expect(subject.list_object_names(s3, 's3://bucket/prefix', lambda { |k| k.length == 3})) - .to eq(['abc']) + expect(subject.last_object_name(s3, 's3://bucket/prefix', lambda { |k| k.length == 3})) + .to eq('abc') + end + + it 'should retrieve the alphabetically file names based on the filter' do + s3.stub_responses(:list_objects_v2, { contents: [{ key: 'abc' }, { key: 'defg' }]}) + expect(subject.last_object_name(s3, 's3://bucket/prefix', lambda { |k| k.length >= 0})) + .to eq('defg') + end + + it 'should be the empty string if there is nothing' do + s3.stub_responses(:list_objects_v2, { contents: []}) + expect(subject.last_object_name(s3, 's3://bucket/prefix', lambda { |k| k.length >= 0})) + .to eq('') end end