Skip to content

Commit

Permalink
Use concurrency :shared and sync codecs to optimize performance
Browse files Browse the repository at this point in the history
As part of this refactor the `message_format` option had to be (finally)
obsoleted.It was previously deprecated for quite some time.

This provides a nice boost:

Before:

```
time bin/logstash -e "input { generator { count => 3000000} } filter {  } output { file { path => '/tmp/newfileout'} }"
Settings: Default pipeline workers: 8
Pipeline main started
Pipeline main has been shutdown
stopping pipeline {:id=>"main"}
      139.95 real       223.61 user        28.93 sys
```

After

```
rm /tmp/newfileout; time bin/logstash -e "input { generator { count => 3000000} } filter {  } output { file { codec => json_lines path => '/tmp/newfileout'} }" ; ls -lh /tmp/newfileout
Settings: Default pipeline workers: 8
Pipeline main started
Pipeline main has been shutdown
stopping pipeline {:id=>"main"}
       56.12 real       192.99 user        17.38 sys
```

Fixes #46
  • Loading branch information
andrewvc committed Aug 26, 2016
1 parent 2cf8878 commit 4e8ca37
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 70 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
@@ -1,3 +1,7 @@
## 4.0.0
- Make 'message_format' option obsolete
- Use new Logsash 2.4/5.0 APIs for working batchwise and with shared concurrency

## 3.0.2
- Relax constraint on logstash-core-plugin-api to >= 1.60 <= 2.99

Expand Down
77 changes: 47 additions & 30 deletions lib/logstash/outputs/file.rb
Expand Up @@ -17,6 +17,8 @@
# }
# }
class LogStash::Outputs::File < LogStash::Outputs::Base
concurrency :shared

FIELD_REF = /%\{[^}]+\}/

config_name "file"
Expand All @@ -35,13 +37,7 @@ class LogStash::Outputs::File < LogStash::Outputs::Base
# E.g: `/%{myfield}/`, `/test-%{myfield}/` are not valid paths
config :path, :validate => :string, :required => true

# The format to use when writing events to the file. This value
# supports any string and can include `%{name}` and other dynamic
# strings.
#
# If this setting is omitted, the full json representation of the
# event will be written as a single line.
config :message_format, :validate => :string, :deprecated => "You can achieve the same behavior with the 'line' codec"
config :message_format, :validate => :string, :obsolete => "You can achieve the same behavior with the 'line' codec"

# Flush interval (in seconds) for flushing writes to log files.
# 0 will flush on every message.
Expand Down Expand Up @@ -76,10 +72,9 @@ class LogStash::Outputs::File < LogStash::Outputs::Base
def register
require "fileutils" # For mkdir_p

workers_not_supported

@files = {}

@io_mutex = Mutex.new

@path = File.expand_path(path)

validate_path
Expand All @@ -91,6 +86,7 @@ def register
end
@failure_path = File.join(@file_root, @filename_failure)


now = Time.now
@last_flush_cycle = now
@last_stale_cleanup_cycle = now
Expand All @@ -101,8 +97,6 @@ def register
@codec = LogStash::Plugin.lookup("codec", "line").new
@codec.format = @message_format
end

@codec.on_event(&method(:write_event))
end # def register

private
Expand All @@ -125,20 +119,37 @@ def root_directory
end

public
def receive(event)
@codec.encode(event)
close_stale_files
def multi_receive_encoded(events_and_encoded)
encoded_by_path = Hash.new {|h,k| h[k] = []}

events_and_encoded.each do |event,encoded|
file_output_path = event_path(event)
encoded_by_path[file_output_path] << encoded
end

@io_mutex.synchronize do
encoded_by_path.each do |path,chunks|
fd = open(path)
chunks.each {|chunk| fd.write(chunk) }
fd.flush
end

close_stale_files
end
end # def receive

public
def close
@logger.debug("Close: closing files")
@files.each do |path, fd|
begin
fd.close
@logger.debug("Closed file #{path}", :fd => fd)
rescue Exception => e
@logger.error("Exception while flushing and closing files.", :exception => e)
@io_mutex.synchronize do
@logger.debug("Close: closing files")

@files.each do |path, fd|
begin
fd.close
@logger.debug("Closed file #{path}", :fd => fd)
rescue Exception => e
@logger.error("Exception while flushing and closing files.", :exception => e)
end
end
end
end
Expand All @@ -150,7 +161,7 @@ def inside_file_root?(log_path)
end

private
def write_event(event, data)
def event_path(event)
file_output_path = generate_filepath(event)
if path_with_field_ref? && !inside_file_root?(file_output_path)
@logger.warn("File: the event tried to write outside the files root, writing the event to the failure file", :event => event, :filename => @failure_path)
Expand All @@ -159,10 +170,8 @@ def write_event(event, data)
file_output_path = @failure_path
end
@logger.debug("File, writing event to file.", :filename => file_output_path)
fd = open(file_output_path)
# TODO(sissel): Check if we should rotate the file.
fd.write(data)
flush(fd)

file_output_path
end

private
Expand Down Expand Up @@ -195,10 +204,12 @@ def flush(fd)
def flush_pending_files
return unless Time.now - @last_flush_cycle >= flush_interval
@logger.debug("Starting flush cycle")

@files.each do |path, fd|
@logger.debug("Flushing file", :path => path, :fd => fd)
fd.flush
end

@last_flush_cycle = Time.now
end

Expand All @@ -207,6 +218,7 @@ def flush_pending_files
def close_stale_files
now = Time.now
return unless now - @last_stale_cleanup_cycle >= @stale_cleanup_interval

@logger.info("Starting stale files cleanup cycle", :files => @files)
inactive_files = @files.select { |path, fd| not fd.active }
@logger.debug("%d stale files found" % inactive_files.count, :inactive_files => inactive_files)
Expand All @@ -222,7 +234,7 @@ def close_stale_files

private
def cached?(path)
@files.include?(path) && !@files[path].nil?
@files.include?(path) && !@files[path].nil?
end

private
Expand All @@ -234,16 +246,19 @@ def deleted?(path)
def open(path)
if !deleted?(path) && cached?(path)
return @files[path]
elsif deleted?(path)
end

if deleted?(path)
if @create_if_deleted
@logger.debug("Required path was deleted, creating the file again", :path => path)
@files.delete(path)
else
return @files[path] if cached?(path)
end
end
@logger.info("Opening file", :path => path)

@logger.info("Opening file", :path => path)

dir = File.dirname(path)
if !Dir.exist?(dir)
@logger.info("Creating directory", :directory => dir)
Expand All @@ -253,6 +268,7 @@ def open(path)
FileUtils.mkdir_p(dir)
end
end

# work around a bug opening fifos (bug JRUBY-6280)
stat = File.stat(path) rescue nil
if stat && stat.ftype == "fifo" && LogStash::Environment.jruby?
Expand Down Expand Up @@ -288,6 +304,7 @@ def flush
end
def method_missing(method_name, *args, &block)
if @io.respond_to?(method_name)

@io.send(method_name, *args, &block)
else
super
Expand Down
4 changes: 2 additions & 2 deletions logstash-output-file.gemspec
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-output-file'
s.version = '3.0.2'
s.version = '4.0.0'
s.licenses = ['Apache License (2.0)']
s.summary = "This output will write events to files on disk"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand All @@ -20,7 +20,7 @@ Gem::Specification.new do |s|
s.metadata = { "logstash_plugin" => "true", "logstash_group" => "output" }

# Gem dependencies
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
s.add_runtime_dependency "logstash-core-plugin-api", ">= 2.0.0", "< 2.99"
s.add_runtime_dependency 'logstash-codec-json_lines'
s.add_runtime_dependency 'logstash-codec-line'

Expand Down
56 changes: 18 additions & 38 deletions spec/outputs/file_spec.rb
Expand Up @@ -47,7 +47,7 @@

describe "ship lots of events to a file gzipped" do
Stud::Temporary.file('logstash-spec-output-file') do |tmp_file|
event_count = 10000 + rand(500)
event_count = 100000 + rand(500)

config <<-CONFIG
input {
Expand Down Expand Up @@ -125,13 +125,14 @@

10.times do |i|
event = LogStash::Event.new("event_id" => i)
output.receive(event)
output.multi_receive([event])
end
FileUtils.rm(temp_file)
10.times do |i|
event = LogStash::Event.new("event_id" => i+10)
output.receive(event)
output.multi_receive([event])
end

expect(FileTest.size(temp_file.path)).to be > 0
end

Expand All @@ -147,12 +148,12 @@

10.times do |i|
event = LogStash::Event.new("event_id" => i)
output.receive(event)
output.multi_receive([event])
end
FileUtils.rm(temp_file)
10.times do |i|
event = LogStash::Event.new("event_id" => i+10)
output.receive(event)
output.multi_receive([event])
end
expect(FileTest.exist?(temp_file.path)).to be_falsey
expect(FileTest.size(output.failure_path)).to be > 0
Expand Down Expand Up @@ -184,7 +185,7 @@

output = LogStash::Outputs::File.new(config)
output.register
output.receive(bad_event)
output.multi_receive([bad_event])

error_file = File.join(path, config["filename_failure"])

Expand All @@ -202,10 +203,10 @@
output.register

bad_event.set('error', encoded_once)
output.receive(bad_event)
output.multi_receive([bad_event])

bad_event.set('error', encoded_twice)
output.receive(bad_event)
output.multi_receive([bad_event])

expect(Dir.glob(File.join(path, "*")).size).to eq(2)
output.close
Expand All @@ -218,7 +219,7 @@
output.register

bad_event.set('error', '../..//test')
output.receive(bad_event)
output.multi_receive([bad_event])

expect(Dir.glob(File.join(path, "*")).size).to eq(1)
output.close
Expand All @@ -235,7 +236,7 @@
config = { "path" => "#{path}/%{error}" }
output = LogStash::Outputs::File.new(config)
output.register
output.receive(good_event)
output.multi_receive([good_event])

good_file = File.join(path, good_event.get('error'))
expect(File.exist?(good_file)).to eq(true)
Expand All @@ -254,7 +255,7 @@
config = { "path" => dynamic_path }
output = LogStash::Outputs::File.new(config)
output.register
output.receive(good_event)
output.multi_receive([good_event])

expect(File.exist?(expected_path)).to eq(true)
output.close
Expand All @@ -276,7 +277,7 @@

output = LogStash::Outputs::File.new(config)
output.register
output.receive(good_event)
output.multi_receive([good_event])

expect(File.exist?(expected_path)).to eq(true)
output.close
Expand All @@ -291,7 +292,7 @@
config = { "path" => "#{path}/%{error}" }
output = LogStash::Outputs::File.new(config)
output.register
output.receive(good_event)
output.multi_receive([good_event])

good_file = File.join(path, good_event.get('error'))
expect(File.exist?(good_file)).to eq(true)
Expand All @@ -310,7 +311,7 @@
config = { "path" => "#{path}/output.txt" }
output = LogStash::Outputs::File.new(config)
output.register
output.receive(good_event)
output.multi_receive([good_event])
good_file = File.join(path, 'output.txt')
expect(File.exist?(good_file)).to eq(true)
output.close #teardown first to allow reading the file
Expand All @@ -328,30 +329,9 @@

Stud::Temporary.directory do |path|
config = { "path" => "#{path}/output.txt" }
output = LogStash::Outputs::File.new(config)
output.codec = LogStash::Codecs::Line.new({ "format" => "Custom format: %{message}"})
output.register
output.receive(good_event)
good_file = File.join(path, 'output.txt')
expect(File.exist?(good_file)).to eq(true)
output.close #teardown first to allow reading the file
File.open(good_file) {|f|
line = f.readline
expect(line).to eq("Custom format: hello world\n")
}
end
end
end
context "when using deprecated message_format config" do
it 'falls back to line codec' do
good_event = LogStash::Event.new
good_event.set('message', 'hello world')

Stud::Temporary.directory do |path|
config = { "path" => "#{path}/output.txt", "message_format" => "Custom format: %{message}" }
output = LogStash::Outputs::File.new(config)
output = LogStash::Outputs::File.new(config.merge("codec" => LogStash::Codecs::Line.new({ "format" => "Custom format: %{message}"})))
output.register
output.receive(good_event)
output.multi_receive([good_event])
good_file = File.join(path, 'output.txt')
expect(File.exist?(good_file)).to eq(true)
output.close #teardown first to allow reading the file
Expand All @@ -375,7 +355,7 @@
}
output = LogStash::Outputs::File.new(config)
output.register
output.receive(good_event)
output.multi_receive([good_event])
good_file = File.join(path, 'is/nested/output.txt')
expect(File.exist?(good_file)).to eq(true)
expect(File.stat(good_file).mode.to_s(8)[-3..-1]).to eq('610')
Expand Down

0 comments on commit 4e8ca37

Please sign in to comment.