Skip to content

Commit

Permalink
Merge pull request #1 from juicedata/write-buffer
Browse files Browse the repository at this point in the history
add write buffer
  • Loading branch information
davies authored Sep 17, 2018
2 parents 9317d03 + 10c4f93 commit 08f233c
Showing 1 changed file with 67 additions and 12 deletions.
79 changes: 67 additions & 12 deletions lib/logstash/outputs/file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class LogStash::Outputs::File < LogStash::Outputs::Base
# 0 will flush on every message.
config :flush_interval, :validate => :number, :default => 2

config :buffer_size, :validate => :number, :default => 1048576

# Gzip the output stream before writing to disk.
config :gzip, :validate => :boolean, :default => false

Expand Down Expand Up @@ -140,7 +142,7 @@ def multi_receive_encoded(events_and_encoded)
fd.write(chunks.last)
else
# append to the file
chunks.each {|chunk| fd.write(chunk) }
fd.write(chunks.join(""))
end
fd.flush unless @flusher && @flusher.alive?
end
Expand Down Expand Up @@ -249,19 +251,22 @@ def deleted?(path)

private
def open(path)
if !deleted?(path) && cached?(path)
if cached?(path)
return @files[path]
end
@files[path] = IOWriter.new(open_fd(path), @buffer_size, lambda { recreate(path) })
end

if deleted?(path)
if @create_if_deleted
@logger.debug("Required path was deleted, creating the file again", :path => path)
@files.delete(path)
else
return @files[path] if cached?(path)
end
private
def recreate(path)
if @create_if_deleted && deleted?(path)
@logger.debug("Required path was deleted, creating the file again", :path => path)
open_fd(path)
end
end

private
def open_fd(path)
@logger.info("Opening file", :path => path)

dir = File.dirname(path)
Expand All @@ -288,7 +293,7 @@ def open(path)
if gzip
fd = Zlib::GzipWriter.new(fd)
end
@files[path] = IOWriter.new(fd)
fd
end

##
Expand Down Expand Up @@ -367,19 +372,69 @@ def run

# wrapper class
class IOWriter
def initialize(io)
def initialize(io, buffer_size, recreate)
@io = io
@buffers = []
@size = 0
@limit = buffer_size
@recreate = recreate
end

private
def reopen
fd = @recreate.call
if !(fd.nil?)
@io.flush
if @io.class == Zlib::GzipWriter
@io.to_io.flush
end
@io.close
@io = fd
end
end

public
def write(*args)
@io.write(*args)
args.each do |arg|
@size += arg.bytesize
@buffers.push(arg)
end
if (@limit >= 0) and (@size > @limit)
do_write
end
@active = true
end

private
def do_write
if @size > 0
reopen
@io.write(@buffers.join(""))
@buffers = []
@size = 0
end
end

public
def flush
do_write
@io.flush
if @io.class == Zlib::GzipWriter
@io.to_io.flush
end
end

public
def truncate(length)
reopen
@io.truncate(length)
end

public
def close
flush
@io.close
end
def method_missing(method_name, *args, &block)
if @io.respond_to?(method_name)

Expand Down

0 comments on commit 08f233c

Please sign in to comment.