Skip to content

Commit

Permalink
fine-gained locks while flushing (#2)
Browse files Browse the repository at this point in the history
* fine-gained locks

* fix flushing
  • Loading branch information
davies committed Sep 18, 2018
1 parent 08f233c commit ce8804d
Showing 1 changed file with 30 additions and 20 deletions.
50 changes: 30 additions & 20 deletions lib/logstash/outputs/file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,14 @@ def extract_file_root
# the back-bone of @flusher, our periodic-flushing interval.
private
def flush_pending_files
@io_mutex.synchronize do
pending_files = @io_mutex.synchronize do
@logger.debug("Starting flush cycle")

@files.each do |path, fd|
@logger.debug("Flushing file", :path => path, :fd => fd)
fd.flush
end
# flushing all the files is slow, so we should do that without blocking
@files.clone
end
pending_files.each do |path, fd|
@logger.debug("Flushing file", :path => path, :fd => fd)
fd.flush
end
rescue => e
# squash exceptions caught while flushing after logging them
Expand Down Expand Up @@ -374,6 +375,7 @@ def run
class IOWriter
def initialize(io, buffer_size, recreate)
@io = io
@mutex = Mutex.new
@buffers = []
@size = 0
@limit = buffer_size
Expand All @@ -395,14 +397,16 @@ def reopen

public
def write(*args)
args.each do |arg|
@size += arg.bytesize
@buffers.push(arg)
end
if (@limit >= 0) and (@size > @limit)
do_write
@mutex.synchronize do
args.each do |arg|
@size += arg.bytesize
@buffers.push(arg)
end
if (@limit >= 0) and (@size > @limit)
do_write
end
@active = true
end
@active = true
end

private
Expand All @@ -417,23 +421,29 @@ def do_write

public
def flush
do_write
@io.flush
if @io.class == Zlib::GzipWriter
@io.to_io.flush
@mutex.synchronize do
do_write
@io.flush
if @io.class == Zlib::GzipWriter
@io.to_io.flush
end
end
end

public
def truncate(length)
reopen
@io.truncate(length)
@mutex.synchronize do
reopen
@io.truncate(length)
end
end

public
def close
flush
@io.close
@mutex.synchronize do
@io.close
end
end
def method_missing(method_name, *args, &block)
if @io.respond_to?(method_name)
Expand Down

0 comments on commit ce8804d

Please sign in to comment.