Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
enhancement(file source): Option to remove file after some time of re…
…aching `eof` (vectordotdev#2908)

* Add remove_after option

Signed-off-by: Kruno Tomola Fabro <krunotf@gmail.com>

* Add test

Signed-off-by: Kruno Tomola Fabro <krunotf@gmail.com>

* Add website docs

Signed-off-by: Kruno Tomola Fabro <krunotf@gmail.com>

* Add rate limit

Signed-off-by: ktf <krunotf@gmail.com>

* Add warning

Signed-off-by: ktf <krunotf@gmail.com>
  • Loading branch information
ktff committed Jul 5, 2020
1 parent 02c3022 commit 01b3778
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 6 deletions.
12 changes: 12 additions & 0 deletions .meta/sources/file.toml.erb
Expand Up @@ -240,6 +240,18 @@ Instead of balancing read capacity fairly across all watched files, prioritize \
draining the oldest files before moving on to read data from younger files.\
"""

[sources.file.options.remove_after]
type = "uint"
unit = "seconds"
common = false
examples = [0, 5, 60]
description = """\
Timeout from reaching `eof` after which file will be removed from filesystem, \
unless new data is written in the meantime. \
If not specified, files will not be removed.\
"""
warnings = [{visibility_level = "option", text = "Vector's process must have permission to delete files."}]

[sources.file.fields.log.fields.file]
type = "string"
examples = ["/var/log/nginx.log"]
Expand Down
12 changes: 12 additions & 0 deletions config/vector.spec.toml
Expand Up @@ -207,6 +207,18 @@ dns_servers = ["0.0.0.0:53"]
# * unit: bytes
max_line_bytes = 102400

# Timeout from reaching `eof` after which file will be removed from filesystem,
# unless new data is written in the meantime. If not specified, files will not
# be removed.
#
# * optional
# * no default
# * type: uint
# * unit: seconds
remove_after = 0
remove_after = 5
remove_after = 60

# For files with a stored checkpoint at startup, setting this option to `true`
# will tell Vector to read from the beginning of the file instead of the stored
# checkpoint.
Expand Down
26 changes: 22 additions & 4 deletions lib/file-source/src/file_server.rs
Expand Up @@ -8,10 +8,10 @@ use futures::{
use glob::glob;
use indexmap::IndexMap;
use std::collections::{HashMap, HashSet};
use std::fs::{self, File};
use std::fs::{self, remove_file, File};
use std::io::{self, Read, Seek, Write};
use std::path::{Path, PathBuf};
use std::time;
use std::time::{self, Duration};
use tokio::time::delay_for;
use tracing::field;

Expand All @@ -37,9 +37,10 @@ where
pub ignore_before: Option<time::SystemTime>,
pub max_line_bytes: usize,
pub data_dir: PathBuf,
pub glob_minimum_cooldown: time::Duration,
pub glob_minimum_cooldown: Duration,
pub fingerprinter: Fingerprinter,
pub oldest_first: bool,
pub remove_after: Option<Duration>,
}

/// `FileServer` as Source
Expand Down Expand Up @@ -216,6 +217,23 @@ where
line_buffer.clear();
}
} else {
// Should the file be removed
if let Some(grace_period) = self.remove_after {
if watcher.last_read_success().elapsed() >= grace_period {
// Try to remove
match remove_file(&watcher.path) {
Ok(()) => {
info!(message = "Log file deleted.", path = ?watcher.path);
watcher.set_dead();
}
Err(error) => {
// We will try again after some time.
warn!(message = "Failed deleting log file.",path = ?watcher.path, ?error, rate_limit_secs = 1);
}
}
}
}

break;
}
if bytes_read > self.max_read_bytes {
Expand Down Expand Up @@ -270,7 +288,7 @@ where
// all of these requirements.
match block_on(select(
shutdown,
delay_for(time::Duration::from_millis(backoff as u64)),
delay_for(Duration::from_millis(backoff as u64)),
)) {
Either::Left((_, _)) => return Ok(Shutdown),
Either::Right((_, future)) => shutdown = future,
Expand Down
4 changes: 4 additions & 0 deletions lib/file-source/src/file_watcher.rs
Expand Up @@ -180,6 +180,10 @@ impl FileWatcher {
self.last_read_success = Instant::now();
}

pub fn last_read_success(&self) -> Instant {
self.last_read_success
}

pub fn should_read(&self) -> bool {
self.last_read_success.elapsed() < Duration::from_secs(10)
|| self.last_read_attempt.elapsed() > Duration::from_secs(10)
Expand Down
50 changes: 49 additions & 1 deletion src/sources/file/mod.rs
Expand Up @@ -92,6 +92,7 @@ pub struct FileConfig {
pub multiline: Option<MultilineConfig>,
pub max_read_bytes: usize,
pub oldest_first: bool,
pub remove_after: Option<u64>,
}

#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
Expand Down Expand Up @@ -181,6 +182,7 @@ impl Default for FileConfig {
multiline: None,
max_read_bytes: 2048,
oldest_first: false,
remove_after: None,
}
}
}
Expand Down Expand Up @@ -248,6 +250,7 @@ pub fn file_source(
glob_minimum_cooldown,
fingerprinter: config.fingerprinting.clone().into(),
oldest_first: config.oldest_first,
remove_after: config.remove_after.map(Duration::from_secs),
};

let file_key = config.file_key.clone();
Expand Down Expand Up @@ -352,7 +355,7 @@ mod tests {
event,
shutdown::ShutdownSignal,
sources::file,
test_util::{block_on, runtime, shutdown_on_idle},
test_util::{block_on, runtime, shutdown_on_idle, trace_init},
topology::Config,
};
use futures01::{Future, Stream};
Expand Down Expand Up @@ -1483,4 +1486,49 @@ mod tests {
]
);
}

#[test]
fn remove_file() {
trace_init();
let n = 5;
let remove_after = 2;

let (tx, rx) = futures01::sync::mpsc::channel(2 * n);
let (trigger_shutdown, shutdown, _) = ShutdownSignal::new_wired();

let dir = tempdir().unwrap();
let config = file::FileConfig {
include: vec![dir.path().join("*")],
remove_after: Some(remove_after),
..test_default_file_config(&dir)
};

let source = file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx);
let mut rt = runtime();
rt.spawn(source);

let path = dir.path().join("file");
let mut file = File::create(&path).unwrap();

sleep(); // The files must be observed at their original lengths before writing to them

for i in 0..n {
writeln!(&mut file, "{}", i).unwrap();
}
std::mem::drop(file);

// Wait for remove grace period to end.
std::thread::sleep(Duration::from_secs(remove_after + 1));

drop(trigger_shutdown);
shutdown_on_idle(rt);

let received = wait_with_timeout(rx.collect());
assert_eq!(received.len(), n);

match File::open(&path) {
Ok(_) => panic!("File wasn't removed"),
Err(error) => assert_eq!(error.kind(), std::io::ErrorKind::NotFound),
}
}
}
28 changes: 27 additions & 1 deletion website/docs/reference/sources/file.md
@@ -1,5 +1,5 @@
---
last_modified_on: "2020-05-21"
last_modified_on: "2020-07-03"
delivery_guarantee: "best_effort"
component_title: "File"
description: "The Vector [`file`](#file) source ingests data through one or more local files and outputs `log` events."
Expand Down Expand Up @@ -73,6 +73,7 @@ ingests data through one or more local files and outputs
ignore_older = 86400 # optional, no default, seconds
include = ["/var/log/nginx/*.log"] # required
max_line_bytes = 102400 # optional, default, bytes
remove_after = 0 # optional, no default, seconds
start_at_beginning = false # optional, default

# Context
Expand Down Expand Up @@ -553,6 +554,31 @@ draining the oldest files before moving on to read data from younger files.
See [File Read Order](#file-read-order) for more info.


</Field>
<Field
common={false}
defaultValue={null}
enumValues={null}
examples={[0,5,60]}
groups={[]}
name={"remove_after"}
path={null}
relevantWhen={null}
required={false}
templateable={false}
type={"uint"}
unit={"seconds"}
warnings={[{"visibility_level":"option","text":"Vector's process must have permission to delete files.","option_name":"remove_after"}]}
>
### remove_after

Timeout from reaching `eof` after which file will be removed from filesystem,
unless new data is written in the meantime. If not specified, files will not be
removed.



</Field>
<Field
common={true}
Expand Down

0 comments on commit 01b3778

Please sign in to comment.