diff --git a/lib/logstash/outputs/file.rb b/lib/logstash/outputs/file.rb index ede10a5..f5aaf12 100644 --- a/lib/logstash/outputs/file.rb +++ b/lib/logstash/outputs/file.rb @@ -207,13 +207,14 @@ def extract_file_root # the back-bone of @flusher, our periodic-flushing interval. private def flush_pending_files - @io_mutex.synchronize do + pending_files = @io_mutex.synchronize do @logger.debug("Starting flush cycle") - - @files.each do |path, fd| - @logger.debug("Flushing file", :path => path, :fd => fd) - fd.flush - end + # flushing all the files is slow, so we should do that without blocking + @files.clone + end + pending_files.each do |path, fd| + @logger.debug("Flushing file", :path => path, :fd => fd) + fd.flush end rescue => e # squash exceptions caught while flushing after logging them @@ -374,6 +375,7 @@ def run class IOWriter def initialize(io, buffer_size, recreate) @io = io + @mutex = Mutex.new @buffers = [] @size = 0 @limit = buffer_size @@ -395,14 +397,16 @@ def reopen public def write(*args) - args.each do |arg| - @size += arg.bytesize - @buffers.push(arg) - end - if (@limit >= 0) and (@size > @limit) - do_write + @mutex.synchronize do + args.each do |arg| + @size += arg.bytesize + @buffers.push(arg) + end + if (@limit >= 0) and (@size > @limit) + do_write + end + @active = true end - @active = true end private @@ -417,23 +421,29 @@ def do_write public def flush - do_write - @io.flush - if @io.class == Zlib::GzipWriter - @io.to_io.flush + @mutex.synchronize do + do_write + @io.flush + if @io.class == Zlib::GzipWriter + @io.to_io.flush + end end end public def truncate(length) - reopen - @io.truncate(length) + @mutex.synchronize do + reopen + @io.truncate(length) + end end public def close flush - @io.close + @mutex.synchronize do + @io.close + end end def method_missing(method_name, *args, &block) if @io.respond_to?(method_name)