From 909787d570c453710b756baea0c2cfe8ab59bcc2 Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Thu, 20 Jan 2022 08:14:07 -0800 Subject: [PATCH] Final flush (#223) - Decorate flushed events from multiline codecs - Fix missing metadata from the last event Fixed: #153 Co-authored-by: Jonathan Gough Co-authored-by: Kaise Cheng --- CHANGELOG.md | 3 +++ lib/logstash/inputs/s3.rb | 32 ++++++++++++++++++-------------- logstash-input-s3.gemspec | 2 +- spec/inputs/s3_spec.rb | 1 + 4 files changed, 23 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b906dce..01e26ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 3.8.3 + - Fix missing `metadata` and `type` of the last event [#223](https://github.com/logstash-plugins/logstash-input-s3/pull/223) + ## 3.8.2 - Refactor: read sincedb time once per bucket listing [#233](https://github.com/logstash-plugins/logstash-input-s3/pull/233) diff --git a/lib/logstash/inputs/s3.rb b/lib/logstash/inputs/s3.rb index 56e976a..013c7e2 100644 --- a/lib/logstash/inputs/s3.rb +++ b/lib/logstash/inputs/s3.rb @@ -235,30 +235,34 @@ def process_local_log(queue, filename, object) @logger.debug('Event is metadata, updating the current cloudfront metadata', :event => event) update_metadata(metadata, event) else - decorate(event) - - if @include_object_properties - event.set("[@metadata][s3]", object.data.to_h) - else - event.set("[@metadata][s3]", {}) - end - - event.set("[@metadata][s3][key]", object.key) - event.set(@cloudfront_version_key, metadata[:cloudfront_version]) unless metadata[:cloudfront_version].nil? - event.set(@cloudfront_fields_key, metadata[:cloudfront_fields]) unless metadata[:cloudfront_fields].nil? - - queue << event + push_decoded_event(queue, metadata, object, event) end end end # #ensure any stateful codecs (such as multi-line ) are flushed to the queue @codec.flush do |event| - queue << event + push_decoded_event(queue, metadata, object, event) end return true end # def process_local_log + def push_decoded_event(queue, metadata, object, event) + decorate(event) + + if @include_object_properties + event.set("[@metadata][s3]", object.data.to_h) + else + event.set("[@metadata][s3]", {}) + end + + event.set("[@metadata][s3][key]", object.key) + event.set(@cloudfront_version_key, metadata[:cloudfront_version]) unless metadata[:cloudfront_version].nil? + event.set(@cloudfront_fields_key, metadata[:cloudfront_fields]) unless metadata[:cloudfront_fields].nil? + + queue << event + end + def event_is_metadata?(event) return false unless event.get("message").class == String line = event.get("message") diff --git a/logstash-input-s3.gemspec b/logstash-input-s3.gemspec index 08f55fe..21ed2bd 100644 --- a/logstash-input-s3.gemspec +++ b/logstash-input-s3.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-input-s3' - s.version = '3.8.2' + s.version = '3.8.3' s.licenses = ['Apache-2.0'] s.summary = "Streams events from files in a S3 bucket" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/inputs/s3_spec.rb b/spec/inputs/s3_spec.rb index 17b988e..b34b682 100644 --- a/spec/inputs/s3_spec.rb +++ b/spec/inputs/s3_spec.rb @@ -329,6 +329,7 @@ events = fetch_events(config) expect(events.size).to eq(events_to_process) expect(events[0].get("[@metadata][s3][key]")).to eql log.key + expect(events[1].get("[@metadata][s3][key]")).to eql log.key end it "deletes the temporary file" do