Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change read mode to immediately stop consuming lines when shutting down #322

Merged
merged 2 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
@@ -1,3 +1,6 @@
## 4.4.6
- Change read mode to immediately stop consuming buffered lines when shutdown is requested [#322](https://github.com/logstash-plugins/logstash-input-file/pull/322)

## 4.4.5
- Handle EOF when checking archive validity [#321](https://github.com/logstash-plugins/logstash-input-file/pull/321)

Expand Down
1 change: 1 addition & 0 deletions lib/filewatch/read_mode/handlers/read_file.rb
Expand Up @@ -54,6 +54,7 @@ def controlled_read(watched_file, loop_control)
# sincedb position is independent from the watched_file bytes_read
delta = line.bytesize + @settings.delimiter_byte_size
sincedb_collection.increment(watched_file.sincedb_key, delta)
break if quit?
end
rescue EOFError => e
log_error("controlled_read: eof error reading file", watched_file, e)
Expand Down
2 changes: 1 addition & 1 deletion logstash-input-file.gemspec
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-input-file'
s.version = '4.4.5'
s.version = '4.4.6'
s.licenses = ['Apache-2.0']
s.summary = "Streams events from files"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
46 changes: 19 additions & 27 deletions spec/inputs/file_read_spec.rb
Expand Up @@ -181,25 +181,27 @@
end

context "for a compressed file" do
let(:tmp_directory) { Stud::Temporary.directory }
let(:all_files_path) { fixture_dir.join("compressed.*.*") }
let(:gz_file_path) { fixture_dir.join('compressed.log.gz') }
let(:gzip_file_path) { fixture_dir.join('compressed.log.gzip') }
let(:sincedb_path) { ::File.join(tmp_directory, "sincedb.db") }
let(:log_completed_path) { ::File.join(tmp_directory, "completed.log") }

it "the file is read" do
file_path = fixture_dir.join('compressed.log.gz')
file_path2 = fixture_dir.join('compressed.log.gzip')
FileInput.make_fixture_current(file_path.to_path)
FileInput.make_fixture_current(file_path2.to_path)
tmpfile_path = fixture_dir.join("compressed.*.*")
directory = Stud::Temporary.directory
sincedb_path = ::File.join(directory, "readmode_C_sincedb.txt")
log_completed_path = ::File.join(directory, "C_completed.txt")
FileInput.make_fixture_current(gz_file_path.to_path)
FileInput.make_fixture_current(gzip_file_path.to_path)

conf = <<-CONFIG
input {
file {
type => "blah"
path => "#{tmpfile_path}"
path => "#{all_files_path}"
sincedb_path => "#{sincedb_path}"
mode => "read"
file_completed_action => "log"
file_completed_log_path => "#{log_completed_path}"
exit_after_read => true
}
}
CONFIG
Expand All @@ -216,17 +218,11 @@
end

it "the corrupted file is untouched" do
directory = Stud::Temporary.directory
file_path = fixture_dir.join('compressed.log.gz')
corrupted_file_path = ::File.join(directory, 'corrupted.gz')
FileUtils.cp(file_path, corrupted_file_path)
corrupted_file_path = ::File.join(tmp_directory, 'corrupted.gz')
FileUtils.cp(gz_file_path, corrupted_file_path)

FileInput.corrupt_gzip(corrupted_file_path)

log_completed_path = ::File.join(directory, "C_completed.txt")
f = File.new(log_completed_path, "w")
f.close()

conf = <<-CONFIG
input {
file {
Expand All @@ -236,28 +232,23 @@
file_completed_action => "log_and_delete"
file_completed_log_path => "#{log_completed_path}"
check_archive_validity => true
exit_after_read => true
}
}
CONFIG

events = input(conf) do |pipeline, queue|
input(conf) do |pipeline, queue|
wait(1)
expect(IO.read(log_completed_path)).to be_empty
end
end

it "the truncated file is untouched" do
directory = Stud::Temporary.directory
file_path = fixture_dir.join('compressed.log.gz')
truncated_file_path = ::File.join(directory, 'truncated.gz')
FileUtils.cp(file_path, truncated_file_path)
truncated_file_path = ::File.join(tmp_directory, 'truncated.gz')
FileUtils.cp(gz_file_path, truncated_file_path)

FileInput.truncate_gzip(truncated_file_path)

log_completed_path = ::File.join(directory, "C_completed.txt")
f = File.new(log_completed_path, "w")
f.close()

conf = <<-CONFIG
input {
file {
Expand All @@ -267,11 +258,12 @@
file_completed_action => "log_and_delete"
file_completed_log_path => "#{log_completed_path}"
check_archive_validity => true
exit_after_read => true
}
}
CONFIG

events = input(conf) do |pipeline, queue|
input(conf) do |pipeline, queue|
wait(1)
expect(IO.read(log_completed_path)).to be_empty
end
Expand Down