Skip to content

Commit

Permalink
feat: Store precision in WAL for replayability (#24966)
Browse files Browse the repository at this point in the history
Up to this point we assumed that a precision for everything was in nanoseconds.
While we do write and persist data as nanoseconds we made this assumption for
the WAL. However, we store the original line protocol data. If we want it to be
replayable we would need to include the precision and use that when loading the
WAL from disk. This commit changes the code to do that and we can see that that
data is definitely peristed as the WAL is now bigger in the tests.
  • Loading branch information
mgattozzi committed May 8, 2024
1 parent 9354c22 commit 7a2867b
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 6 deletions.
5 changes: 3 additions & 2 deletions influxdb3_write/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ pub struct LpWriteOp {
pub db_name: String,
pub lp: String,
pub default_time: i64,
pub precision: Precision,
}

/// A single write request can have many lines in it. A writer can request to accept all lines that are valid, while
Expand Down Expand Up @@ -533,8 +534,8 @@ impl ParquetFile {
}
}

/// The summary data for a persisted parquet file in a segment.
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
/// The precision of the timestamp
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum Precision {
Auto,
Expand Down
94 changes: 94 additions & 0 deletions influxdb3_write/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,9 @@ fn segment_id_from_file_name(name: &str) -> Result<SegmentId> {
#[cfg(test)]
mod tests {
use super::*;
use crate::Catalog;
use crate::LpWriteOp;
use crate::Precision;

#[test]
fn segment_writer_reader() {
Expand All @@ -673,6 +675,7 @@ mod tests {
db_name: "foo".to_string(),
lp: "cpu host=a val=10i 10".to_string(),
default_time: 1,
precision: Precision::Nanosecond,
});
writer.write_batch(vec![wal_op.clone()]).unwrap();

Expand All @@ -690,6 +693,7 @@ mod tests {
db_name: "foo".to_string(),
lp: "cpu host=a val=10i 10".to_string(),
default_time: 1,
precision: Precision::Nanosecond,
});

// open the file, write and close it
Expand Down Expand Up @@ -729,6 +733,7 @@ mod tests {
db_name: "foo".to_string(),
lp: "cpu host=a val=10i 10".to_string(),
default_time: 1,
precision: Precision::Nanosecond,
});

let wal = WalImpl::new(dir.clone()).unwrap();
Expand All @@ -752,4 +757,93 @@ mod tests {
assert_eq!(batch.ops, vec![wal_op.clone()]);
assert_eq!(batch.sequence_number, SequenceNumber::new(1));
}

#[test]
fn wal_written_and_read_with_different_precisions() {
let dir = test_helpers::tmp_dir().unwrap().into_path();
let wal = WalImpl::new(dir.clone()).unwrap();
let wal_ops = vec![
WalOp::LpWrite(LpWriteOp {
db_name: "foo".to_string(),
lp: "cpu,host=a val=1i 1".to_string(),
default_time: 1,
precision: Precision::Second,
}),
WalOp::LpWrite(LpWriteOp {
db_name: "foo".to_string(),
lp: "cpu,host=b val=2i 1000".to_string(),
default_time: 1,
precision: Precision::Millisecond,
}),
WalOp::LpWrite(LpWriteOp {
db_name: "foo".to_string(),
lp: "cpu,host=c val=3i 1000000".to_string(),
default_time: 1,
precision: Precision::Microsecond,
}),
WalOp::LpWrite(LpWriteOp {
db_name: "foo".to_string(),
lp: "cpu,host=d val=4i 1000000000".to_string(),
default_time: 1,
precision: Precision::Nanosecond,
}),
WalOp::LpWrite(LpWriteOp {
db_name: "foo".to_string(),
lp: "cpu,host=e val=5i 1".to_string(),
default_time: 1,
precision: Precision::Auto,
}),
];

let segment = SegmentId::new(0);
// open the file, write and close it
{
let mut writer = wal
.new_segment_writer(segment, SegmentRange::test_range())
.unwrap();
writer.write_batch(wal_ops).unwrap();
// close the wal
drop(wal);
}

// Reopen the wal and make sure it loads the precision via
// `load_buffer_from_segment`
let catalog = Catalog::default();
let wal = WalImpl::new(dir).unwrap();
let schema = schema::SchemaBuilder::new()
.tag("host")
.influx_column(
"val",
schema::InfluxColumnType::Field(schema::InfluxFieldType::Integer),
)
.timestamp()
.build()
.unwrap();

// Load the data into a buffer.
let buffer = crate::write_buffer::buffer_segment::load_buffer_from_segment(
&catalog,
wal.open_segment_reader(segment).unwrap(),
)
.unwrap()
.0;

// Get the buffer data as record batches
let batch = buffer
.table_record_batches("foo", "cpu", schema.as_arrow(), &[])
.unwrap()
.unwrap();
let mut writer = arrow::json::LineDelimitedWriter::new(Vec::new());
writer.write_batches(&[&batch]).unwrap();
writer.finish().unwrap();

pretty_assertions::assert_eq!(
"{\"host\":\"a\",\"time\":\"1970-01-01T00:00:01\",\"val\":1}\n\
{\"host\":\"b\",\"time\":\"1970-01-01T00:00:01\",\"val\":2}\n\
{\"host\":\"c\",\"time\":\"1970-01-01T00:00:01\",\"val\":3}\n\
{\"host\":\"d\",\"time\":\"1970-01-01T00:00:01\",\"val\":4}\n\
{\"host\":\"e\",\"time\":\"1970-01-01T00:00:01\",\"val\":5}\n",
String::from_utf8(writer.into_inner()).unwrap()
)
}
}
7 changes: 4 additions & 3 deletions influxdb3_write/src/write_buffer/buffer_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use crate::write_buffer::{
};
use crate::{
wal, write_buffer, write_buffer::Result, DatabaseTables, ParquetFile, PersistedSegment,
Persister, Precision, SegmentDuration, SegmentId, SegmentRange, SequenceNumber,
TableParquetFiles, WalOp, WalSegmentReader, WalSegmentWriter,
Persister, SegmentDuration, SegmentId, SegmentRange, SequenceNumber, TableParquetFiles, WalOp,
WalSegmentReader, WalSegmentWriter,
};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
Expand Down Expand Up @@ -196,7 +196,7 @@ pub(crate) fn load_buffer_from_segment(
Time::from_timestamp_nanos(write.default_time),
segment_duration,
false,
Precision::Nanosecond,
write.precision,
)?;

let db_name = &write.db_name;
Expand Down Expand Up @@ -736,6 +736,7 @@ pub(crate) mod tests {
db_name: "db1".to_string(),
lp: lp.to_string(),
default_time: 0,
precision: crate::Precision::Nanosecond,
});

let write_batch = lp_to_write_batch(&catalog, "db1", lp);
Expand Down
9 changes: 8 additions & 1 deletion influxdb3_write/src/write_buffer/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ mod tests {
use crate::persister::PersisterImpl;
use crate::test_helpers::lp_to_write_batch;
use crate::wal::{WalImpl, WalSegmentWriterNoopImpl};
use crate::Precision;
use crate::{
DatabaseTables, LpWriteOp, ParquetFile, SegmentRange, SequenceNumber, TableParquetFiles,
WalOp,
Expand Down Expand Up @@ -180,6 +181,7 @@ mod tests {
db_name: "db1".to_string(),
lp: lp.to_string(),
default_time: 0,
precision: Precision::Nanosecond,
});

let write_batch = lp_to_write_batch(&catalog, "db1", lp);
Expand Down Expand Up @@ -267,6 +269,7 @@ mod tests {
db_name: db_name.to_string(),
lp: lp.to_string(),
default_time: 0,
precision: Precision::Nanosecond,
});

let write_batch = lp_to_write_batch(&catalog, db_name, lp);
Expand Down Expand Up @@ -352,6 +355,7 @@ mod tests {
db_name: db_name.to_string(),
lp: lp.to_string(),
default_time: 0,
precision: Precision::Nanosecond,
});

let write_batch = lp_to_write_batch(&catalog, db_name, lp);
Expand Down Expand Up @@ -379,6 +383,7 @@ mod tests {
db_name: db_name.to_string(),
lp: lp.to_string(),
default_time: 0,
precision: Precision::Nanosecond,
});

let write_batch = lp_to_write_batch(&catalog, db_name, lp);
Expand Down Expand Up @@ -417,7 +422,7 @@ mod tests {
loaded_state.persisted_segments[0],
PersistedSegment {
segment_id,
segment_wal_size_bytes: 227,
segment_wal_size_bytes: 252,
segment_parquet_size_bytes: 3458,
segment_row_count: 3,
segment_min_time: 10,
Expand Down Expand Up @@ -526,6 +531,7 @@ mod tests {
db_name: db_name.to_string(),
lp: lp.to_string(),
default_time: 0,
precision: Precision::Nanosecond,
});

let write_batch = lp_to_write_batch(&catalog, db_name, lp);
Expand All @@ -546,6 +552,7 @@ mod tests {
db_name: db_name.to_string(),
lp: lp.to_string(),
default_time: 0,
precision: Precision::Nanosecond,
});

let write_batch = lp_to_write_batch(&catalog, db_name, lp);
Expand Down
2 changes: 2 additions & 0 deletions influxdb3_write/src/write_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,7 @@ pub(crate) fn validate_or_insert_schema_and_partitions(
db_name: db_name.to_string(),
lp: table_batches.lines.join("\n"),
default_time: ingest_time.timestamp_nanos(),
precision,
}),
starting_catalog_sequence_number,
})
Expand Down Expand Up @@ -928,6 +929,7 @@ mod tests {
db_name: "foo".to_string(),
lp: "cpu bar=1 10".to_string(),
default_time: 123,
precision: Precision::Nanosecond,
})],
};
assert_eq!(batch, expected_batch);
Expand Down

0 comments on commit 7a2867b

Please sign in to comment.