diff --git a/CHANGELOG.md b/CHANGELOG.md index ef1e261..6672346 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 4.0.0 + - Make 'message_format' option obsolete + - Use new Logsash 2.4/5.0 APIs for working batchwise and with shared concurrency + ## 3.0.2 - Relax constraint on logstash-core-plugin-api to >= 1.60 <= 2.99 diff --git a/lib/logstash/outputs/file.rb b/lib/logstash/outputs/file.rb index b849b6a..686db02 100644 --- a/lib/logstash/outputs/file.rb +++ b/lib/logstash/outputs/file.rb @@ -17,6 +17,8 @@ # } # } class LogStash::Outputs::File < LogStash::Outputs::Base + concurrency :shared + FIELD_REF = /%\{[^}]+\}/ config_name "file" @@ -35,13 +37,7 @@ class LogStash::Outputs::File < LogStash::Outputs::Base # E.g: `/%{myfield}/`, `/test-%{myfield}/` are not valid paths config :path, :validate => :string, :required => true - # The format to use when writing events to the file. This value - # supports any string and can include `%{name}` and other dynamic - # strings. - # - # If this setting is omitted, the full json representation of the - # event will be written as a single line. - config :message_format, :validate => :string, :deprecated => "You can achieve the same behavior with the 'line' codec" + config :message_format, :validate => :string, :obsolete => "You can achieve the same behavior with the 'line' codec" # Flush interval (in seconds) for flushing writes to log files. # 0 will flush on every message. @@ -76,10 +72,9 @@ class LogStash::Outputs::File < LogStash::Outputs::Base def register require "fileutils" # For mkdir_p - workers_not_supported - @files = {} - + @io_mutex = Mutex.new + @path = File.expand_path(path) validate_path @@ -91,6 +86,7 @@ def register end @failure_path = File.join(@file_root, @filename_failure) + now = Time.now @last_flush_cycle = now @last_stale_cleanup_cycle = now @@ -101,8 +97,6 @@ def register @codec = LogStash::Plugin.lookup("codec", "line").new @codec.format = @message_format end - - @codec.on_event(&method(:write_event)) end # def register private @@ -125,20 +119,37 @@ def root_directory end public - def receive(event) - @codec.encode(event) - close_stale_files + def multi_receive_encoded(events_and_encoded) + encoded_by_path = Hash.new {|h,k| h[k] = []} + + events_and_encoded.each do |event,encoded| + file_output_path = event_path(event) + encoded_by_path[file_output_path] << encoded + end + + @io_mutex.synchronize do + encoded_by_path.each do |path,chunks| + fd = open(path) + chunks.each {|chunk| fd.write(chunk) } + fd.flush + end + + close_stale_files + end end # def receive public def close - @logger.debug("Close: closing files") - @files.each do |path, fd| - begin - fd.close - @logger.debug("Closed file #{path}", :fd => fd) - rescue Exception => e - @logger.error("Exception while flushing and closing files.", :exception => e) + @io_mutex.synchronize do + @logger.debug("Close: closing files") + + @files.each do |path, fd| + begin + fd.close + @logger.debug("Closed file #{path}", :fd => fd) + rescue Exception => e + @logger.error("Exception while flushing and closing files.", :exception => e) + end end end end @@ -150,7 +161,7 @@ def inside_file_root?(log_path) end private - def write_event(event, data) + def event_path(event) file_output_path = generate_filepath(event) if path_with_field_ref? && !inside_file_root?(file_output_path) @logger.warn("File: the event tried to write outside the files root, writing the event to the failure file", :event => event, :filename => @failure_path) @@ -159,10 +170,8 @@ def write_event(event, data) file_output_path = @failure_path end @logger.debug("File, writing event to file.", :filename => file_output_path) - fd = open(file_output_path) - # TODO(sissel): Check if we should rotate the file. - fd.write(data) - flush(fd) + + file_output_path end private @@ -195,10 +204,12 @@ def flush(fd) def flush_pending_files return unless Time.now - @last_flush_cycle >= flush_interval @logger.debug("Starting flush cycle") + @files.each do |path, fd| @logger.debug("Flushing file", :path => path, :fd => fd) fd.flush end + @last_flush_cycle = Time.now end @@ -207,6 +218,7 @@ def flush_pending_files def close_stale_files now = Time.now return unless now - @last_stale_cleanup_cycle >= @stale_cleanup_interval + @logger.info("Starting stale files cleanup cycle", :files => @files) inactive_files = @files.select { |path, fd| not fd.active } @logger.debug("%d stale files found" % inactive_files.count, :inactive_files => inactive_files) @@ -222,7 +234,7 @@ def close_stale_files private def cached?(path) - @files.include?(path) && !@files[path].nil? + @files.include?(path) && !@files[path].nil? end private @@ -234,7 +246,9 @@ def deleted?(path) def open(path) if !deleted?(path) && cached?(path) return @files[path] - elsif deleted?(path) + end + + if deleted?(path) if @create_if_deleted @logger.debug("Required path was deleted, creating the file again", :path => path) @files.delete(path) @@ -242,8 +256,9 @@ def open(path) return @files[path] if cached?(path) end end - @logger.info("Opening file", :path => path) + @logger.info("Opening file", :path => path) + dir = File.dirname(path) if !Dir.exist?(dir) @logger.info("Creating directory", :directory => dir) @@ -253,6 +268,7 @@ def open(path) FileUtils.mkdir_p(dir) end end + # work around a bug opening fifos (bug JRUBY-6280) stat = File.stat(path) rescue nil if stat && stat.ftype == "fifo" && LogStash::Environment.jruby? @@ -288,6 +304,7 @@ def flush end def method_missing(method_name, *args, &block) if @io.respond_to?(method_name) + @io.send(method_name, *args, &block) else super diff --git a/logstash-output-file.gemspec b/logstash-output-file.gemspec index d257f8d..549e295 100644 --- a/logstash-output-file.gemspec +++ b/logstash-output-file.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-output-file' - s.version = '3.0.2' + s.version = '4.0.0' s.licenses = ['Apache License (2.0)'] s.summary = "This output will write events to files on disk" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" @@ -20,7 +20,7 @@ Gem::Specification.new do |s| s.metadata = { "logstash_plugin" => "true", "logstash_group" => "output" } # Gem dependencies - s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" + s.add_runtime_dependency "logstash-core-plugin-api", ">= 2.0.0", "< 2.99" s.add_runtime_dependency 'logstash-codec-json_lines' s.add_runtime_dependency 'logstash-codec-line' diff --git a/spec/outputs/file_spec.rb b/spec/outputs/file_spec.rb index f9d68ad..eee9739 100644 --- a/spec/outputs/file_spec.rb +++ b/spec/outputs/file_spec.rb @@ -47,7 +47,7 @@ describe "ship lots of events to a file gzipped" do Stud::Temporary.file('logstash-spec-output-file') do |tmp_file| - event_count = 10000 + rand(500) + event_count = 100000 + rand(500) config <<-CONFIG input { @@ -125,13 +125,14 @@ 10.times do |i| event = LogStash::Event.new("event_id" => i) - output.receive(event) + output.multi_receive([event]) end FileUtils.rm(temp_file) 10.times do |i| event = LogStash::Event.new("event_id" => i+10) - output.receive(event) + output.multi_receive([event]) end + expect(FileTest.size(temp_file.path)).to be > 0 end @@ -147,12 +148,12 @@ 10.times do |i| event = LogStash::Event.new("event_id" => i) - output.receive(event) + output.multi_receive([event]) end FileUtils.rm(temp_file) 10.times do |i| event = LogStash::Event.new("event_id" => i+10) - output.receive(event) + output.multi_receive([event]) end expect(FileTest.exist?(temp_file.path)).to be_falsey expect(FileTest.size(output.failure_path)).to be > 0 @@ -184,7 +185,7 @@ output = LogStash::Outputs::File.new(config) output.register - output.receive(bad_event) + output.multi_receive([bad_event]) error_file = File.join(path, config["filename_failure"]) @@ -202,10 +203,10 @@ output.register bad_event.set('error', encoded_once) - output.receive(bad_event) + output.multi_receive([bad_event]) bad_event.set('error', encoded_twice) - output.receive(bad_event) + output.multi_receive([bad_event]) expect(Dir.glob(File.join(path, "*")).size).to eq(2) output.close @@ -218,7 +219,7 @@ output.register bad_event.set('error', '../..//test') - output.receive(bad_event) + output.multi_receive([bad_event]) expect(Dir.glob(File.join(path, "*")).size).to eq(1) output.close @@ -235,7 +236,7 @@ config = { "path" => "#{path}/%{error}" } output = LogStash::Outputs::File.new(config) output.register - output.receive(good_event) + output.multi_receive([good_event]) good_file = File.join(path, good_event.get('error')) expect(File.exist?(good_file)).to eq(true) @@ -254,7 +255,7 @@ config = { "path" => dynamic_path } output = LogStash::Outputs::File.new(config) output.register - output.receive(good_event) + output.multi_receive([good_event]) expect(File.exist?(expected_path)).to eq(true) output.close @@ -276,7 +277,7 @@ output = LogStash::Outputs::File.new(config) output.register - output.receive(good_event) + output.multi_receive([good_event]) expect(File.exist?(expected_path)).to eq(true) output.close @@ -291,7 +292,7 @@ config = { "path" => "#{path}/%{error}" } output = LogStash::Outputs::File.new(config) output.register - output.receive(good_event) + output.multi_receive([good_event]) good_file = File.join(path, good_event.get('error')) expect(File.exist?(good_file)).to eq(true) @@ -310,7 +311,7 @@ config = { "path" => "#{path}/output.txt" } output = LogStash::Outputs::File.new(config) output.register - output.receive(good_event) + output.multi_receive([good_event]) good_file = File.join(path, 'output.txt') expect(File.exist?(good_file)).to eq(true) output.close #teardown first to allow reading the file @@ -328,30 +329,9 @@ Stud::Temporary.directory do |path| config = { "path" => "#{path}/output.txt" } - output = LogStash::Outputs::File.new(config) - output.codec = LogStash::Codecs::Line.new({ "format" => "Custom format: %{message}"}) - output.register - output.receive(good_event) - good_file = File.join(path, 'output.txt') - expect(File.exist?(good_file)).to eq(true) - output.close #teardown first to allow reading the file - File.open(good_file) {|f| - line = f.readline - expect(line).to eq("Custom format: hello world\n") - } - end - end - end - context "when using deprecated message_format config" do - it 'falls back to line codec' do - good_event = LogStash::Event.new - good_event.set('message', 'hello world') - - Stud::Temporary.directory do |path| - config = { "path" => "#{path}/output.txt", "message_format" => "Custom format: %{message}" } - output = LogStash::Outputs::File.new(config) + output = LogStash::Outputs::File.new(config.merge("codec" => LogStash::Codecs::Line.new({ "format" => "Custom format: %{message}"}))) output.register - output.receive(good_event) + output.multi_receive([good_event]) good_file = File.join(path, 'output.txt') expect(File.exist?(good_file)).to eq(true) output.close #teardown first to allow reading the file @@ -375,7 +355,7 @@ } output = LogStash::Outputs::File.new(config) output.register - output.receive(good_event) + output.multi_receive([good_event]) good_file = File.join(path, 'is/nested/output.txt') expect(File.exist?(good_file)).to eq(true) expect(File.stat(good_file).mode.to_s(8)[-3..-1]).to eq('610')