Skip to content

Commit

Permalink
fix(file source): Handle legacy fingerprint checksums from < v0.14.0 (#…
Browse files Browse the repository at this point in the history
…8225)

* fix(file source): Handle legacy fingerprint checksums from < v0.14.0

Adds logic to convert from checkpoints written by Vector before version 0.14.0 to 0.14.0 checkpoints by seeing if any checkpoints, that don't match an existing file, match if the old checkpoint strategy is used.

Also adds alias for `first_line_checksum` strategy which would have broken checkpoints if released.

Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
  • Loading branch information
jszwedko committed Jul 12, 2021
1 parent b977502 commit 9b7fabf
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 29 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/file-source/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ features = ["full"]
criterion = "0.3"
quickcheck = "1"
tempfile = "3.1.0"
pretty_assertions = "0.7.2"

[[bench]]
name = "buffer"
Expand Down
218 changes: 203 additions & 15 deletions lib/file-source/src/checkpointer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use super::{fingerprinter::FileFingerprint, FilePosition};
use super::{
fingerprinter::{FileFingerprint, Fingerprinter},
FilePosition,
};
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use glob::glob;
Expand Down Expand Up @@ -154,13 +157,32 @@ impl CheckpointsView {
}
}

fn maybe_upgrade(&self, fresh: impl Iterator<Item = FileFingerprint>) {
for fng in fresh {
if let Some((_, pos)) = self
.checkpoints
.remove(&FileFingerprint::Unknown(fng.as_legacy()))
fn maybe_upgrade(
&self,
path: &Path,
fng: FileFingerprint,
fingerprinter: &Fingerprinter,
fingerprint_buffer: &mut Vec<u8>,
) {
if let Ok(Some(old_checksum)) = fingerprinter.get_bytes_checksum(&path, fingerprint_buffer)
{
self.update_key(old_checksum, fng)
}

if let Some((_, pos)) = self
.checkpoints
.remove(&FileFingerprint::Unknown(fng.as_legacy()))
{
self.update(fng, pos);
}

if self.checkpoints.get(&fng).is_none() {
if let Ok(Some(fingerprint)) =
fingerprinter.get_legacy_checksum(&path, fingerprint_buffer)
{
self.update(fng, pos);
if let Some((_, pos)) = self.checkpoints.remove(&fingerprint) {
self.update(fng, pos);
}
}
}
}
Expand Down Expand Up @@ -247,11 +269,17 @@ impl Checkpointer {
self.checkpoints.get(fng)
}

/// Scan through a given list of fresh fingerprints (i.e. not legacy
/// Unknown) to see if any match an existing legacy fingerprint. If so,
/// upgrade the existing fingerprint.
pub fn maybe_upgrade(&mut self, fresh: impl Iterator<Item = FileFingerprint>) {
self.checkpoints.maybe_upgrade(fresh)
/// Scan through a given list of fresh fingerprints to see if any match an existing legacy
/// fingerprint. If so, upgrade the existing fingerprint.
pub fn maybe_upgrade(
&mut self,
path: &Path,
fresh: FileFingerprint,
fingerprinter: &Fingerprinter,
fingerprint_buffer: &mut Vec<u8>,
) {
self.checkpoints
.maybe_upgrade(path, fresh, fingerprinter, fingerprint_buffer)
}

/// Persist the current checkpoints state to disk, making our best effort to
Expand Down Expand Up @@ -380,9 +408,11 @@ impl Checkpointer {
#[cfg(test)]
mod test {
use super::{
super::{FingerprintStrategy, Fingerprinter},
Checkpoint, Checkpointer, FileFingerprint, FilePosition, STABLE_FILE_NAME, TMP_FILE_NAME,
};
use chrono::{Duration, Utc};
use pretty_assertions::assert_eq;
use tempfile::tempdir;

#[test]
Expand Down Expand Up @@ -483,10 +513,63 @@ mod test {
}

#[test]
fn test_checkpointer_fingerprint_upgrades() {
fn test_checkpointer_fingerprint_upgrades_unknown() {
let log_dir = tempdir().unwrap();
let path = log_dir.path().join("test.log");
let data = "hello\n";
std::fs::write(&path, data).unwrap();

let new_fingerprint = FileFingerprint::DevInode(1, 2);
let old_fingerprint = FileFingerprint::Unknown(new_fingerprint.as_legacy());
let position: FilePosition = 1234;
let fingerprinter = Fingerprinter {
strategy: FingerprintStrategy::DevInode,
max_line_length: 1000,
ignore_not_found: false,
};

let mut buf = Vec::new();

let data_dir = tempdir().unwrap();
{
let mut chkptr = Checkpointer::new(&data_dir.path());
chkptr.update_checkpoint(old_fingerprint, position);
assert_eq!(chkptr.get_checkpoint(old_fingerprint), Some(position));
chkptr.write_checkpoints().ok();
}
{
let mut chkptr = Checkpointer::new(&data_dir.path());
chkptr.read_checkpoints(None);
assert_eq!(chkptr.get_checkpoint(new_fingerprint), None);

chkptr.maybe_upgrade(&path, new_fingerprint, &fingerprinter, &mut buf);

assert_eq!(chkptr.get_checkpoint(new_fingerprint), Some(position));
assert_eq!(chkptr.get_checkpoint(old_fingerprint), None);
}
}

#[test]
fn test_checkpointer_fingerprint_upgrades_legacy_checksum() {
let log_dir = tempdir().unwrap();
let path = log_dir.path().join("test.log");
let data = "hello\n";
std::fs::write(&path, data).unwrap();

let old_fingerprint = FileFingerprint::FirstLinesChecksum(18057733963141331840);
let new_fingerprint = FileFingerprint::FirstLinesChecksum(17791311590754645022);
let position: FilePosition = 6;

let fingerprinter = Fingerprinter {
strategy: FingerprintStrategy::FirstLinesChecksum {
ignored_header_bytes: 0,
lines: 1,
},
max_line_length: 102400,
ignore_not_found: false,
};

let mut buf = Vec::new();

let data_dir = tempdir().unwrap();
{
Expand All @@ -500,7 +583,7 @@ mod test {
chkptr.read_checkpoints(None);
assert_eq!(chkptr.get_checkpoint(new_fingerprint), None);

chkptr.maybe_upgrade(std::iter::once(new_fingerprint));
chkptr.maybe_upgrade(&path, new_fingerprint, &fingerprinter, &mut buf);

assert_eq!(chkptr.get_checkpoint(new_fingerprint), Some(position));
assert_eq!(chkptr.get_checkpoint(old_fingerprint), None);
Expand Down Expand Up @@ -630,10 +713,115 @@ mod test {

assert!(chkptr.checkpoints.contains_bytes_checksums());

chkptr.checkpoints.update_key(old, new);
chkptr.maybe_upgrade(&log_path, new, &fingerprinter, &mut buf);

assert!(!chkptr.checkpoints.contains_bytes_checksums());
assert_eq!(Some(1234), chkptr.get_checkpoint(new));
assert_eq!(None, chkptr.get_checkpoint(old));
}

// guards against accidental changes to the checkpoint serialization
#[test]
fn test_checkpointer_serialization() {
let fingerprints = vec![
(
FileFingerprint::DevInode(1, 2),
r#"{"version":"1","checkpoints":[{"fingerprint":{"dev_inode":[1,2]},"position":1234}]}"#,
),
(
FileFingerprint::BytesChecksum(3456),
r#"{"version":"1","checkpoints":[{"fingerprint":{"checksum":3456},"position":1234}]}"#,
),
(
FileFingerprint::FirstLinesChecksum(78910),
r#"{"version":"1","checkpoints":[{"fingerprint":{"first_lines_checksum":78910},"position":1234}]}"#,
),
(
FileFingerprint::Unknown(1337),
r#"{"version":"1","checkpoints":[{"fingerprint":{"unknown":1337},"position":1234}]}"#,
),
];
for (fingerprint, expected) in fingerprints {
let expected: serde_json::Value = serde_json::from_str(expected).unwrap();

let position: FilePosition = 1234;
let data_dir = tempdir().unwrap();
let mut chkptr = Checkpointer::new(&data_dir.path());

chkptr.update_checkpoint(fingerprint, position);
chkptr.write_checkpoints().unwrap();

let got: serde_json::Value = {
let s = std::fs::read_to_string(data_dir.path().join("checkpoints.json")).unwrap();
let mut checkpoints: serde_json::Value = serde_json::from_str(&s).unwrap();
for checkpoint in checkpoints["checkpoints"].as_array_mut().unwrap() {
checkpoint.as_object_mut().unwrap().remove("modified");
}
checkpoints
};

assert_eq!(expected, got);
}
}

// guards against accidental changes to the checkpoint deserialization and tests deserializing
// old checkpoint versions
#[test]
fn test_checkpointer_deserialization() {
let serialized_checkpoints = r#"
{
"version": "1",
"checkpoints": [
{
"fingerprint": { "dev_inode": [ 1, 2 ] },
"position": 1234,
"modified": "2021-07-12T18:19:11.769003Z"
},
{
"fingerprint": { "checksum": 3456 },
"position": 1234,
"modified": "2021-07-12T18:19:11.769003Z"
},
{
"fingerprint": { "first_line_checksum": 1234 },
"position": 1234,
"modified": "2021-07-12T18:19:11.769003Z"
},
{
"fingerprint": { "first_lines_checksum": 78910 },
"position": 1234,
"modified": "2021-07-12T18:19:11.769003Z"
},
{
"fingerprint": { "unknown": 1337 },
"position": 1234,
"modified": "2021-07-12T18:19:11.769003Z"
}
]
}
"#;
let fingerprints = vec![
FileFingerprint::DevInode(1, 2),
FileFingerprint::BytesChecksum(3456),
FileFingerprint::FirstLinesChecksum(1234),
FileFingerprint::FirstLinesChecksum(78910),
FileFingerprint::Unknown(1337),
];

let data_dir = tempdir().unwrap();

let mut chkptr = Checkpointer::new(&data_dir.path());

std::fs::write(
data_dir.path().join("checkpoints.json"),
serialized_checkpoints,
)
.unwrap();

chkptr.read_checkpoints(None);

for fingerprint in fingerprints {
assert_eq!(chkptr.get_checkpoint(fingerprint), Some(1234))
}
}
}
18 changes: 6 additions & 12 deletions lib/file-source/src/file_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,21 +113,15 @@ where
.unwrap_or_else(|_| Utc::now())
});

checkpointer.maybe_upgrade(existing_files.iter().map(|(_, id)| id).cloned());

let checkpoints = checkpointer.view();

let needs_checksum_upgrade = checkpoints.contains_bytes_checksums();

for (path, file_id) in existing_files {
if needs_checksum_upgrade {
if let Ok(Some(old_checksum)) = self
.fingerprinter
.get_bytes_checksum(&path, &mut fingerprint_buffer)
{
checkpoints.update_key(old_checksum, file_id)
}
}
checkpointer.maybe_upgrade(
&path,
file_id,
&self.fingerprinter,
&mut fingerprint_buffer,
);

self.watch_new_file(path, file_id, &mut fp_map, &checkpoints, true);
}
Expand Down
34 changes: 32 additions & 2 deletions lib/file-source/src/fingerprinter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ use std::{
use tracing::trace_span;

const FINGERPRINT_CRC: Crc<u64> = Crc::<u64>::new(&crc::CRC_64_ECMA_182);
const LEGACY_FINGERPRINT_CRC: Crc<u64> = Crc::<u64>::new(&crc::CRC_64_XZ);

#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct Fingerprinter {
pub strategy: FingerprintStrategy,
pub max_line_length: usize,
pub ignore_not_found: bool,
}

#[derive(Clone)]
#[derive(Debug, Clone)]
pub enum FingerprintStrategy {
Checksum {
bytes: usize,
Expand All @@ -37,6 +38,7 @@ pub enum FingerprintStrategy {
pub enum FileFingerprint {
#[serde(rename = "checksum")]
BytesChecksum(u64),
#[serde(alias = "first_line_checksum")]
FirstLinesChecksum(u64),
DevInode(u64, u64),
Unknown(u64),
Expand Down Expand Up @@ -157,6 +159,34 @@ impl Fingerprinter {
_ => Ok(None),
}
}

/// Calculates checksums using strategy pre-0.14.0
/// https://github.com/timberio/vector/issues/8182
pub fn get_legacy_checksum(
&self,
path: &Path,
buffer: &mut Vec<u8>,
) -> Result<Option<FileFingerprint>, io::Error> {
match self.strategy {
FingerprintStrategy::Checksum {
ignored_header_bytes,
bytes: _,
lines,
}
| FingerprintStrategy::FirstLinesChecksum {
ignored_header_bytes,
lines,
} => {
buffer.resize(self.max_line_length, 0u8);
let mut fp = fs::File::open(path)?;
fp.seek(SeekFrom::Start(ignored_header_bytes as u64))?;
fingerprinter_read_until(fp, b'\n', lines, buffer)?;
let fingerprint = LEGACY_FINGERPRINT_CRC.checksum(&buffer[..]);
Ok(Some(FileFingerprint::FirstLinesChecksum(fingerprint)))
}
_ => Ok(None),
}
}
}

fn fingerprinter_read_until(
Expand Down

0 comments on commit 9b7fabf

Please sign in to comment.