Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(file source): Handle legacy fingerprint checksums from < v0.14.0 #8225

Merged
merged 5 commits into from
Jul 12, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/file-source/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ features = ["full"]
criterion = "0.3"
quickcheck = "1"
tempfile = "3.1.0"
pretty_assertions = "0.7.2"

[[bench]]
name = "buffer"
Expand Down
207 changes: 190 additions & 17 deletions lib/file-source/src/checkpointer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use super::{fingerprinter::FileFingerprint, FilePosition};
use super::{
fingerprinter::{FileFingerprint, Fingerprinter},
FilePosition,
};
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use glob::glob;
Expand Down Expand Up @@ -154,13 +157,23 @@ impl CheckpointsView {
}
}

fn maybe_upgrade(&self, fresh: impl Iterator<Item = FileFingerprint>) {
for fng in fresh {
if let Some((_, pos)) = self
.checkpoints
.remove(&FileFingerprint::Unknown(fng.as_legacy()))
{
self.update(fng, pos);
fn maybe_upgrade(&self, path: &Path, fng: FileFingerprint, fingerprinter: &Fingerprinter) {
if let Ok(Some(old_checksum)) = fingerprinter.get_bytes_checksum(&path) {
self.update_key(old_checksum, fng)
}

if let Some((_, pos)) = self
.checkpoints
.remove(&FileFingerprint::Unknown(fng.as_legacy()))
{
self.update(fng, pos);
}

if self.checkpoints.get(&fng).is_none() {
if let Ok(Some(fingerprint)) = fingerprinter.get_legacy_checksum(&path) {
if let Some((_, pos)) = self.checkpoints.remove(&fingerprint) {
self.update(fng, pos);
}
}
}
}
Expand Down Expand Up @@ -247,11 +260,15 @@ impl Checkpointer {
self.checkpoints.get(fng)
}

/// Scan through a given list of fresh fingerprints (i.e. not legacy
/// Unknown) to see if any match an existing legacy fingerprint. If so,
/// upgrade the existing fingerprint.
pub fn maybe_upgrade(&mut self, fresh: impl Iterator<Item = FileFingerprint>) {
self.checkpoints.maybe_upgrade(fresh)
/// Scan through a given list of fresh fingerprints to see if any match an existing legacy
/// fingerprint. If so, upgrade the existing fingerprint.
pub fn maybe_upgrade(
&mut self,
path: &Path,
fresh: FileFingerprint,
fingerprinter: &Fingerprinter,
) {
self.checkpoints.maybe_upgrade(path, fresh, fingerprinter)
}

/// Persist the current checkpoints state to disk, making our best effort to
Expand Down Expand Up @@ -380,9 +397,11 @@ impl Checkpointer {
#[cfg(test)]
mod test {
use super::{
super::{FingerprintStrategy, Fingerprinter},
Checkpoint, Checkpointer, FileFingerprint, FilePosition, STABLE_FILE_NAME, TMP_FILE_NAME,
};
use chrono::{Duration, Utc};
use pretty_assertions::assert_eq;
use tempfile::tempdir;

#[test]
Expand Down Expand Up @@ -483,10 +502,59 @@ mod test {
}

#[test]
fn test_checkpointer_fingerprint_upgrades() {
fn test_checkpointer_fingerprint_upgrades_unknown() {
let log_dir = tempdir().unwrap();
let path = log_dir.path().join("test.log");
let data = "hello\n";
std::fs::write(&path, data).unwrap();

let new_fingerprint = FileFingerprint::DevInode(1, 2);
let old_fingerprint = FileFingerprint::Unknown(new_fingerprint.as_legacy());
let position: FilePosition = 1234;
let fingerprinter = Fingerprinter {
strategy: FingerprintStrategy::DevInode,
max_line_length: 1000,
ignore_not_found: false,
};

let data_dir = tempdir().unwrap();
{
let mut chkptr = Checkpointer::new(&data_dir.path());
chkptr.update_checkpoint(old_fingerprint, position);
assert_eq!(chkptr.get_checkpoint(old_fingerprint), Some(position));
chkptr.write_checkpoints().ok();
}
{
let mut chkptr = Checkpointer::new(&data_dir.path());
chkptr.read_checkpoints(None);
assert_eq!(chkptr.get_checkpoint(new_fingerprint), None);

chkptr.maybe_upgrade(&path, new_fingerprint, &fingerprinter);

assert_eq!(chkptr.get_checkpoint(new_fingerprint), Some(position));
assert_eq!(chkptr.get_checkpoint(old_fingerprint), None);
}
}

#[test]
fn test_checkpointer_fingerprint_upgrades_legacy_checksum() {
let log_dir = tempdir().unwrap();
let path = log_dir.path().join("test.log");
let data = "hello\n";
std::fs::write(&path, data).unwrap();

let old_fingerprint = FileFingerprint::FirstLinesChecksum(18057733963141331840);
let new_fingerprint = FileFingerprint::FirstLinesChecksum(17791311590754645022);
let position: FilePosition = 6;

let fingerprinter = Fingerprinter {
strategy: FingerprintStrategy::FirstLinesChecksum {
ignored_header_bytes: 0,
lines: 1,
},
max_line_length: 102400,
ignore_not_found: false,
};

let data_dir = tempdir().unwrap();
{
Expand All @@ -500,7 +568,7 @@ mod test {
chkptr.read_checkpoints(None);
assert_eq!(chkptr.get_checkpoint(new_fingerprint), None);

chkptr.maybe_upgrade(std::iter::once(new_fingerprint));
chkptr.maybe_upgrade(&path, new_fingerprint, &fingerprinter);

assert_eq!(chkptr.get_checkpoint(new_fingerprint), Some(position));
assert_eq!(chkptr.get_checkpoint(old_fingerprint), None);
Expand Down Expand Up @@ -607,7 +675,7 @@ mod test {

let mut buf = vec![0; 1024];
let old = fingerprinter
.get_bytes_checksum(&log_path, &mut buf)
.get_bytes_checksum(&log_path)
.expect("getting old checksum")
.expect("still getting old checksum");

Expand All @@ -630,10 +698,115 @@ mod test {

assert!(chkptr.checkpoints.contains_bytes_checksums());

chkptr.checkpoints.update_key(old, new);
chkptr.maybe_upgrade(&log_path, new, &fingerprinter);

assert!(!chkptr.checkpoints.contains_bytes_checksums());
assert_eq!(Some(1234), chkptr.get_checkpoint(new));
assert_eq!(None, chkptr.get_checkpoint(old));
}

// guards against accidental changes to the checkpoint serialization
#[test]
fn test_checkpointer_serialization() {
let fingerprints = vec![
(
FileFingerprint::DevInode(1, 2),
r#"{"version":"1","checkpoints":[{"fingerprint":{"dev_inode":[1,2]},"position":1234}]}"#,
),
(
FileFingerprint::BytesChecksum(3456),
r#"{"version":"1","checkpoints":[{"fingerprint":{"checksum":3456},"position":1234}]}"#,
),
(
FileFingerprint::FirstLinesChecksum(78910),
r#"{"version":"1","checkpoints":[{"fingerprint":{"first_lines_checksum":78910},"position":1234}]}"#,
),
(
FileFingerprint::Unknown(1337),
r#"{"version":"1","checkpoints":[{"fingerprint":{"unknown":1337},"position":1234}]}"#,
),
];
for (fingerprint, expected) in fingerprints {
let expected: serde_json::Value = serde_json::from_str(expected).unwrap();

let position: FilePosition = 1234;
let data_dir = tempdir().unwrap();
let mut chkptr = Checkpointer::new(&data_dir.path());

chkptr.update_checkpoint(fingerprint, position);
chkptr.write_checkpoints().unwrap();

let got: serde_json::Value = {
let s = std::fs::read_to_string(data_dir.path().join("checkpoints.json")).unwrap();
let mut checkpoints: serde_json::Value = serde_json::from_str(&s).unwrap();
for checkpoint in checkpoints["checkpoints"].as_array_mut().unwrap() {
checkpoint.as_object_mut().unwrap().remove("modified");
}
checkpoints
};

assert_eq!(expected, got);
}
}

// guards against accidental changes to the checkpoint deserialization and tests deserializing
// old checkpoint versions
#[test]
fn test_checkpointer_deserialization() {
let serialized_checkpoints = r#"
{
"version": "1",
"checkpoints": [
{
"fingerprint": { "dev_inode": [ 1, 2 ] },
"position": 1234,
"modified": "2021-07-12T18:19:11.769003Z"
},
{
"fingerprint": { "checksum": 3456 },
"position": 1234,
"modified": "2021-07-12T18:19:11.769003Z"
},
{
"fingerprint": { "first_line_checksum": 1234 },
"position": 1234,
"modified": "2021-07-12T18:19:11.769003Z"
},
{
"fingerprint": { "first_lines_checksum": 78910 },
"position": 1234,
"modified": "2021-07-12T18:19:11.769003Z"
},
{
"fingerprint": { "unknown": 1337 },
"position": 1234,
"modified": "2021-07-12T18:19:11.769003Z"
}
]
}
"#;
let fingerprints = vec![
FileFingerprint::DevInode(1, 2),
FileFingerprint::BytesChecksum(3456),
FileFingerprint::FirstLinesChecksum(1234),
FileFingerprint::FirstLinesChecksum(78910),
FileFingerprint::Unknown(1337),
];

let data_dir = tempdir().unwrap();

let mut chkptr = Checkpointer::new(&data_dir.path());

std::fs::write(
data_dir.path().join("checkpoints.json"),
serialized_checkpoints,
)
.unwrap();

chkptr.read_checkpoints(None);

for fingerprint in fingerprints {
assert_eq!(chkptr.get_checkpoint(fingerprint), Some(1234))
}
}
}
13 changes: 1 addition & 12 deletions lib/file-source/src/file_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,21 +113,10 @@ where
.unwrap_or_else(|_| Utc::now())
});

checkpointer.maybe_upgrade(existing_files.iter().map(|(_, id)| id).cloned());

let checkpoints = checkpointer.view();

let needs_checksum_upgrade = checkpoints.contains_bytes_checksums();

for (path, file_id) in existing_files {
if needs_checksum_upgrade {
if let Ok(Some(old_checksum)) = self
.fingerprinter
.get_bytes_checksum(&path, &mut fingerprint_buffer)
{
checkpoints.update_key(old_checksum, file_id)
}
}
checkpointer.maybe_upgrade(&path, file_id, &self.fingerprinter);

self.watch_new_file(path, file_id, &mut fp_map, &checkpoints, true);
}
Expand Down
38 changes: 31 additions & 7 deletions lib/file-source/src/fingerprinter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ use std::{
use tracing::trace_span;

const FINGERPRINT_CRC: Crc<u64> = Crc::<u64>::new(&crc::CRC_64_ECMA_182);
const LEGACY_FINGERPRINT_CRC: Crc<u64> = Crc::<u64>::new(&crc::CRC_64_XZ);

#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct Fingerprinter {
pub strategy: FingerprintStrategy,
pub max_line_length: usize,
pub ignore_not_found: bool,
}

#[derive(Clone)]
#[derive(Debug, Clone)]
pub enum FingerprintStrategy {
Checksum {
bytes: usize,
Expand All @@ -37,6 +38,7 @@ pub enum FingerprintStrategy {
pub enum FileFingerprint {
#[serde(rename = "checksum")]
BytesChecksum(u64),
#[serde(alias = "first_line_checksum")]
FirstLinesChecksum(u64),
DevInode(u64, u64),
Unknown(u64),
Expand Down Expand Up @@ -136,17 +138,14 @@ impl Fingerprinter {
.flatten()
}

pub fn get_bytes_checksum(
&self,
path: &Path,
buffer: &mut Vec<u8>,
) -> Result<Option<FileFingerprint>, io::Error> {
pub fn get_bytes_checksum(&self, path: &Path) -> Result<Option<FileFingerprint>, io::Error> {
match self.strategy {
FingerprintStrategy::Checksum {
bytes,
ignored_header_bytes,
lines: _,
} => {
let mut buffer: Vec<u8> = Vec::with_capacity(bytes);
buffer.resize(bytes, 0u8);
let mut fp = fs::File::open(path)?;
fp.seek(io::SeekFrom::Start(ignored_header_bytes as u64))?;
Expand All @@ -157,6 +156,31 @@ impl Fingerprinter {
_ => Ok(None),
}
}

/// Calculates checksums using strategy pre-0.14.0
/// https://github.com/timberio/vector/issues/8182
pub fn get_legacy_checksum(&self, path: &Path) -> Result<Option<FileFingerprint>, io::Error> {
match self.strategy {
FingerprintStrategy::Checksum {
ignored_header_bytes,
bytes: _,
lines,
}
| FingerprintStrategy::FirstLinesChecksum {
ignored_header_bytes,
lines,
} => {
let mut buffer: Vec<u8> = Vec::with_capacity(self.max_line_length);
buffer.resize(self.max_line_length, 0u8);
let mut fp = fs::File::open(path)?;
fp.seek(SeekFrom::Start(ignored_header_bytes as u64))?;
fingerprinter_read_until(fp, b'\n', lines, &mut buffer)?;
let fingerprint = LEGACY_FINGERPRINT_CRC.checksum(&buffer[..]);
Ok(Some(FileFingerprint::FirstLinesChecksum(fingerprint)))
}
_ => Ok(None),
}
}
}

fn fingerprinter_read_until(
Expand Down