Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(file source): Handle legacy fingerprint checksums from < v0.14.0 #8225

Merged
merged 5 commits into from
Jul 12, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 102 additions & 20 deletions lib/file-source/src/checkpointer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use super::{fingerprinter::FileFingerprint, FilePosition};
use super::{
fingerprinter::{FileFingerprint, Fingerprinter},
FilePosition,
};
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use glob::glob;
Expand Down Expand Up @@ -80,12 +83,6 @@ impl CheckpointsView {
}
}

pub fn contains_bytes_checksums(&self) -> bool {
self.checkpoints
.iter()
.any(|entry| matches!(entry.key(), FileFingerprint::BytesChecksum(_)))
}

pub fn remove_expired(&self) {
let now = Utc::now();

Expand All @@ -110,6 +107,12 @@ impl CheckpointsView {
}
}

pub fn contains_bytes_checksums(&self) -> bool {
self.checkpoints
.iter()
.any(|entry| matches!(entry.key(), FileFingerprint::BytesChecksum(_)))
}

jszwedko marked this conversation as resolved.
Show resolved Hide resolved
fn load(&self, checkpoint: Checkpoint) {
self.checkpoints
.insert(checkpoint.fingerprint, checkpoint.position);
Expand Down Expand Up @@ -154,13 +157,32 @@ impl CheckpointsView {
}
}

fn maybe_upgrade(&self, fresh: impl Iterator<Item = FileFingerprint>) {
for fng in fresh {
if let Some((_, pos)) = self
.checkpoints
.remove(&FileFingerprint::Unknown(fng.as_legacy()))
fn maybe_upgrade(
&self,
path: &Path,
fng: FileFingerprint,
fingerprinter: &Fingerprinter,
fingerprint_buffer: &mut Vec<u8>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This buffer does not appear to be used by any of the callers after calling the function. Could it be created within this function to avoid the additional parameter?

In fact, it doesn't look like the contents are used within this function either. I don't think this function is on the hot path (startup only) so reusing the buffer shouldn't be critical to performance. Am I missing something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I was following the pattern of the other functions, but think we can just create the buffer in here since it is only called once at start-up.

Copy link
Member Author

@jszwedko jszwedko Jul 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I realized the default buffer size 1 MB so I may replace this. I can see why we'd want to reuse them in-case there are a lot of files to avoid allocating 1 MB each time.

) {
if let Ok(Some(old_checksum)) = fingerprinter.get_bytes_checksum(&path, fingerprint_buffer)
{
self.update_key(old_checksum, fng)
}

if let Some((_, pos)) = self
.checkpoints
.remove(&FileFingerprint::Unknown(fng.as_legacy()))
{
self.update(fng, pos);
}

if self.checkpoints.get(&fng).is_none() {
if let Ok(Some(fingerprint)) =
fingerprinter.get_legacy_checksum(&path, fingerprint_buffer)
{
self.update(fng, pos);
if let Some((_, pos)) = self.checkpoints.remove(&fingerprint) {
self.update(fng, pos);
}
}
}
}
Expand Down Expand Up @@ -247,11 +269,17 @@ impl Checkpointer {
self.checkpoints.get(fng)
}

/// Scan through a given list of fresh fingerprints (i.e. not legacy
/// Unknown) to see if any match an existing legacy fingerprint. If so,
/// upgrade the existing fingerprint.
pub fn maybe_upgrade(&mut self, fresh: impl Iterator<Item = FileFingerprint>) {
self.checkpoints.maybe_upgrade(fresh)
/// Scan through a given list of fresh fingerprints to see if any match an existing legacy
/// fingerprint. If so, upgrade the existing fingerprint.
pub fn maybe_upgrade(
&mut self,
path: &Path,
fresh: FileFingerprint,
fingerprinter: &Fingerprinter,
fingerprint_buffer: &mut Vec<u8>,
) {
self.checkpoints
.maybe_upgrade(path, fresh, fingerprinter, fingerprint_buffer)
}

/// Persist the current checkpoints state to disk, making our best effort to
Expand Down Expand Up @@ -380,6 +408,7 @@ impl Checkpointer {
#[cfg(test)]
mod test {
use super::{
super::{FingerprintStrategy, Fingerprinter},
Checkpoint, Checkpointer, FileFingerprint, FilePosition, STABLE_FILE_NAME, TMP_FILE_NAME,
};
use chrono::{Duration, Utc};
Expand Down Expand Up @@ -484,9 +513,62 @@ mod test {

#[test]
fn test_checkpointer_fingerprint_upgrades() {
let log_dir = tempdir().unwrap();
let path = log_dir.path().join("test.log");
let data = "hello\n";
std::fs::write(&path, data).unwrap();

let new_fingerprint = FileFingerprint::DevInode(1, 2);
let old_fingerprint = FileFingerprint::Unknown(new_fingerprint.as_legacy());
let position: FilePosition = 1234;
let fingerprinter = Fingerprinter {
strategy: FingerprintStrategy::DevInode,
max_line_length: 1000,
ignore_not_found: false,
};

let mut buf = Vec::new();

let data_dir = tempdir().unwrap();
{
let mut chkptr = Checkpointer::new(&data_dir.path());
chkptr.update_checkpoint(old_fingerprint, position);
assert_eq!(chkptr.get_checkpoint(old_fingerprint), Some(position));
chkptr.write_checkpoints().ok();
}
{
let mut chkptr = Checkpointer::new(&data_dir.path());
chkptr.read_checkpoints(None);
assert_eq!(chkptr.get_checkpoint(new_fingerprint), None);

chkptr.maybe_upgrade(&path, new_fingerprint, &fingerprinter, &mut buf);

assert_eq!(chkptr.get_checkpoint(new_fingerprint), Some(position));
assert_eq!(chkptr.get_checkpoint(old_fingerprint), None);
}
}

#[test]
fn test_checkpointer_fingerprint_upgrades_legacy_checksum() {
let log_dir = tempdir().unwrap();
let path = log_dir.path().join("test.log");
let data = "hello\n";
std::fs::write(&path, data).unwrap();

let old_fingerprint = FileFingerprint::FirstLinesChecksum(18057733963141331840);
let new_fingerprint = FileFingerprint::FirstLinesChecksum(17791311590754645022);
let position: FilePosition = 6;

let fingerprinter = Fingerprinter {
strategy: FingerprintStrategy::FirstLinesChecksum {
ignored_header_bytes: 0,
lines: 1,
},
max_line_length: 1000,
ignore_not_found: false,
};

let mut buf = Vec::new();

let data_dir = tempdir().unwrap();
{
Expand All @@ -500,7 +582,7 @@ mod test {
chkptr.read_checkpoints(None);
assert_eq!(chkptr.get_checkpoint(new_fingerprint), None);

chkptr.maybe_upgrade(std::iter::once(new_fingerprint));
chkptr.maybe_upgrade(&path, new_fingerprint, &fingerprinter, &mut buf);

assert_eq!(chkptr.get_checkpoint(new_fingerprint), Some(position));
assert_eq!(chkptr.get_checkpoint(old_fingerprint), None);
Expand Down Expand Up @@ -630,7 +712,7 @@ mod test {

assert!(chkptr.checkpoints.contains_bytes_checksums());

chkptr.checkpoints.update_key(old, new);
chkptr.maybe_upgrade(&log_path, new, &fingerprinter, &mut buf);

assert!(!chkptr.checkpoints.contains_bytes_checksums());
assert_eq!(Some(1234), chkptr.get_checkpoint(new));
Expand Down
18 changes: 6 additions & 12 deletions lib/file-source/src/file_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,21 +113,15 @@ where
.unwrap_or_else(|_| Utc::now())
});

checkpointer.maybe_upgrade(existing_files.iter().map(|(_, id)| id).cloned());

let checkpoints = checkpointer.view();

let needs_checksum_upgrade = checkpoints.contains_bytes_checksums();

for (path, file_id) in existing_files {
if needs_checksum_upgrade {
if let Ok(Some(old_checksum)) = self
.fingerprinter
.get_bytes_checksum(&path, &mut fingerprint_buffer)
{
checkpoints.update_key(old_checksum, file_id)
}
}
checkpointer.maybe_upgrade(
&path,
file_id,
&self.fingerprinter,
&mut fingerprint_buffer,
);

self.watch_new_file(path, file_id, &mut fp_map, &checkpoints, true);
}
Expand Down
33 changes: 31 additions & 2 deletions lib/file-source/src/fingerprinter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ use std::{
use tracing::trace_span;

const FINGERPRINT_CRC: Crc<u64> = Crc::<u64>::new(&crc::CRC_64_ECMA_182);
const LEGACY_FINGERPRINT_CRC: Crc<u64> = Crc::<u64>::new(&crc::CRC_64_XZ);

#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct Fingerprinter {
pub strategy: FingerprintStrategy,
pub max_line_length: usize,
pub ignore_not_found: bool,
}

#[derive(Clone)]
#[derive(Debug, Clone)]
pub enum FingerprintStrategy {
Checksum {
bytes: usize,
Expand All @@ -37,6 +38,7 @@ pub enum FingerprintStrategy {
pub enum FileFingerprint {
#[serde(rename = "checksum")]
BytesChecksum(u64),
#[serde(alias = "first_line_checksum")]
FirstLinesChecksum(u64),
DevInode(u64, u64),
Unknown(u64),
Expand Down Expand Up @@ -157,6 +159,33 @@ impl Fingerprinter {
_ => Ok(None),
}
}

/// Calculates checksums using strategy pre-0.14.0
pub fn get_legacy_checksum(
&self,
path: &Path,
buffer: &mut Vec<u8>,
) -> Result<Option<FileFingerprint>, io::Error> {
match self.strategy {
FingerprintStrategy::Checksum {
ignored_header_bytes,
bytes: _,
lines,
}
| FingerprintStrategy::FirstLinesChecksum {
ignored_header_bytes,
lines,
} => {
buffer.resize(self.max_line_length, 0u8);
let mut fp = fs::File::open(path)?;
fp.seek(SeekFrom::Start(ignored_header_bytes as u64))?;
fingerprinter_read_until(fp, b'\n', lines, buffer)?;
let fingerprint = LEGACY_FINGERPRINT_CRC.checksum(&buffer[..]);
Ok(Some(FileFingerprint::FirstLinesChecksum(fingerprint)))
}
_ => Ok(None),
}
}
}

fn fingerprinter_read_until(
Expand Down