Skip to content

Commit

Permalink
Final flush (#223)
Browse files Browse the repository at this point in the history
- Decorate flushed events from multiline codecs
- Fix missing metadata from the last event

Fixed: #153

Co-authored-by: Jonathan Gough <jonathanpgough@gmail.com>
Co-authored-by: Kaise Cheng <kaise.cheng@elastic.co>
  • Loading branch information
3 people committed Jan 20, 2022
1 parent 5768c61 commit 909787d
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 15 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
@@ -1,3 +1,6 @@
## 3.8.3
- Fix missing `metadata` and `type` of the last event [#223](https://github.com/logstash-plugins/logstash-input-s3/pull/223)

## 3.8.2
- Refactor: read sincedb time once per bucket listing [#233](https://github.com/logstash-plugins/logstash-input-s3/pull/233)

Expand Down
32 changes: 18 additions & 14 deletions lib/logstash/inputs/s3.rb
Expand Up @@ -235,30 +235,34 @@ def process_local_log(queue, filename, object)
@logger.debug('Event is metadata, updating the current cloudfront metadata', :event => event)
update_metadata(metadata, event)
else
decorate(event)

if @include_object_properties
event.set("[@metadata][s3]", object.data.to_h)
else
event.set("[@metadata][s3]", {})
end

event.set("[@metadata][s3][key]", object.key)
event.set(@cloudfront_version_key, metadata[:cloudfront_version]) unless metadata[:cloudfront_version].nil?
event.set(@cloudfront_fields_key, metadata[:cloudfront_fields]) unless metadata[:cloudfront_fields].nil?

queue << event
push_decoded_event(queue, metadata, object, event)
end
end
end
# #ensure any stateful codecs (such as multi-line ) are flushed to the queue
@codec.flush do |event|
queue << event
push_decoded_event(queue, metadata, object, event)
end

return true
end # def process_local_log

def push_decoded_event(queue, metadata, object, event)
decorate(event)

if @include_object_properties
event.set("[@metadata][s3]", object.data.to_h)
else
event.set("[@metadata][s3]", {})
end

event.set("[@metadata][s3][key]", object.key)
event.set(@cloudfront_version_key, metadata[:cloudfront_version]) unless metadata[:cloudfront_version].nil?
event.set(@cloudfront_fields_key, metadata[:cloudfront_fields]) unless metadata[:cloudfront_fields].nil?

queue << event
end

def event_is_metadata?(event)
return false unless event.get("message").class == String
line = event.get("message")
Expand Down
2 changes: 1 addition & 1 deletion logstash-input-s3.gemspec
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-input-s3'
s.version = '3.8.2'
s.version = '3.8.3'
s.licenses = ['Apache-2.0']
s.summary = "Streams events from files in a S3 bucket"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
1 change: 1 addition & 0 deletions spec/inputs/s3_spec.rb
Expand Up @@ -329,6 +329,7 @@
events = fetch_events(config)
expect(events.size).to eq(events_to_process)
expect(events[0].get("[@metadata][s3][key]")).to eql log.key
expect(events[1].get("[@metadata][s3][key]")).to eql log.key
end

it "deletes the temporary file" do
Expand Down

0 comments on commit 909787d

Please sign in to comment.