Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - add: fluvio-schema, batch id #3283

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/fluvio-protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,4 @@ fluvio-future = { workspace = true, features = [
"net",
] }
futures = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] }
digikata marked this conversation as resolved.
Show resolved Hide resolved
167 changes: 146 additions & 21 deletions crates/fluvio-protocol/src/record/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use super::Record;
use super::Offset;
use super::Size;

pub const COMPRESSION_CODEC_MASK: i16 = 0x07;
const ATTR_SCHEMA_PRESENT: i16 = 0x10;
const ATTR_COMPRESSION_CODEC_MASK: i16 = 0x07;
pub const NO_TIMESTAMP: i64 = -1;

pub trait BatchRecords: Default + Debug + Encoder + Decoder + Send + Sync {
Expand Down Expand Up @@ -67,11 +68,24 @@ pub const BATCH_PREAMBLE_SIZE: usize = size_of::<Offset>() // Offset

pub const BATCH_FILE_HEADER_SIZE: usize = BATCH_PREAMBLE_SIZE + BATCH_HEADER_SIZE;

#[derive(Clone, Default, Debug, Encoder, PartialEq)]
pub struct SchemaId(u32);

impl Decoder for SchemaId {
sehz marked this conversation as resolved.
Show resolved Hide resolved
fn decode<T: Buf>(&mut self, src: &mut T, version: Version) -> Result<(), Error> {
let mut sid: u32 = 0;
sid.decode(src, version)?;
self.0 = sid;
Ok(())
}
}

#[derive(Default, Debug)]
pub struct Batch<R = MemoryRecords> {
pub base_offset: Offset,
pub batch_len: i32, // only for decoding
pub header: BatchHeader,
pub schema_id: Option<SchemaId>,
digikata marked this conversation as resolved.
Show resolved Hide resolved
records: R,
}

Expand Down Expand Up @@ -162,6 +176,15 @@ impl<R> Batch<R> {
pub fn batch_len(&self) -> i32 {
self.batch_len
}

pub fn schema_id(&self) -> Option<SchemaId> {
self.schema_id.as_ref().cloned()
}

pub fn set_schema_id(&mut self, sid: SchemaId) {
self.header.set_schema_id();
self.schema_id = Some(sid);
}
}

#[cfg(feature = "compress")]
Expand All @@ -173,6 +196,7 @@ impl TryFrom<Batch<RawRecords>> for Batch {
base_offset: batch.base_offset,
batch_len: (BATCH_HEADER_SIZE + records.write_size(0)) as i32,
header: batch.header,
schema_id: None,
records,
})
}
Expand All @@ -189,11 +213,13 @@ impl TryFrom<Batch> for Batch<RawRecords> {
let compressed_records = compression.compress(&buf)?;
let compressed_records_len = compressed_records.len() as i32;
let records = RawRecords(compressed_records);
let schema_id = f.schema_id();

Ok(Batch {
base_offset: f.base_offset,
batch_len: compressed_records_len,
header: f.header,
schema_id,
records,
})
}
Expand All @@ -205,7 +231,16 @@ where
{
/// check if batch is valid after decoded
pub fn validate_decoding(&self) -> bool {
self.batch_len == (BATCH_HEADER_SIZE + self.records.write_size(0)) as i32
self.batch_len == self.calc_batch_len()
}

fn calc_batch_len(&self) -> i32 {
let blen = if self.header.has_schema() {
BATCH_HEADER_SIZE + self.records.write_size(0) + size_of::<SchemaId>()
} else {
BATCH_HEADER_SIZE + self.records.write_size(0)
};
blen as i32
}
}

Expand Down Expand Up @@ -233,12 +268,12 @@ impl Batch {
/// add new record, this will update the offset to correct
pub fn add_record(&mut self, record: Record) {
self.add_records(&mut vec![record]);
self.batch_len = (BATCH_HEADER_SIZE + self.records.write_size(0)) as i32;
self.batch_len = self.calc_batch_len();
}

pub fn add_records(&mut self, records: &mut Vec<Record>) {
self.records.append(records);
self.batch_len = (BATCH_HEADER_SIZE + self.records.write_size(0)) as i32;
self.batch_len = self.calc_batch_len();
self.update_offset_deltas();
}

Expand Down Expand Up @@ -302,7 +337,7 @@ impl<T: Into<MemoryRecords>> From<T> for Batch {

batch.records = records;
let len = batch.records.len() as i32;
batch.batch_len = (BATCH_HEADER_SIZE + batch.records.write_size(0)) as i32;
batch.batch_len = batch.calc_batch_len();
batch.header.last_offset_delta = if len > 0 { len - 1 } else { len };
batch
}
Expand All @@ -318,15 +353,22 @@ where
{
trace!("decoding batch");
self.decode_from_file_buf(src, version)?;

let batch_len = self.batch_len as usize - BATCH_HEADER_SIZE;
let mut buf = src.take(batch_len);
if buf.remaining() < batch_len {
let rec_len = if self.header.has_schema() {
let mut sid = SchemaId(0);
sid.decode(src, version)?;
self.schema_id = Some(sid);
trace!("schema_id: {:?}", self.schema_id());
digikata marked this conversation as resolved.
Show resolved Hide resolved
self.batch_len as usize - BATCH_HEADER_SIZE - size_of::<SchemaId>()
} else {
self.batch_len as usize - BATCH_HEADER_SIZE
};
let mut buf = src.take(rec_len);
if buf.remaining() < rec_len {
return Err(Error::new(
std::io::ErrorKind::UnexpectedEof,
format!(
"not enough buf records, expected: {}, found: {}",
batch_len,
rec_len,
buf.remaining()
),
));
Expand All @@ -352,7 +394,7 @@ where
{
trace!("Encoding Batch");
self.base_offset.encode(dest, version)?;
let batch_len: i32 = (BATCH_HEADER_SIZE + self.records.write_size(version)) as i32;
let batch_len: i32 = self.calc_batch_len();
batch_len.encode(dest, version)?;

// encode parts of header
Expand All @@ -368,6 +410,10 @@ where
self.header.producer_id.encode(buf, version)?;
self.header.producer_epoch.encode(buf, version)?;
self.header.first_sequence.encode(buf, version)?;
if let Some(ref schema_id) = self.schema_id {
// encode schema id
schema_id.encode(buf, version)?;
}
self.records.encode(buf, version)?;

let crc = crc32c::crc32c(&out);
Expand All @@ -383,6 +429,7 @@ impl<R: Clone> Clone for Batch<R> {
base_offset: self.base_offset,
batch_len: self.batch_len,
header: self.header.clone(),
schema_id: self.schema_id.clone(),
records: self.records.clone(),
}
}
Expand All @@ -407,13 +454,23 @@ pub struct BatchHeader {

impl BatchHeader {
fn get_compression(&self) -> Result<Compression, CompressionError> {
let compression_bits = self.attributes & COMPRESSION_CODEC_MASK;
let compression_bits = self.attributes & ATTR_COMPRESSION_CODEC_MASK;
Compression::try_from(compression_bits as i8)
}

pub fn set_compression(&mut self, compression: Compression) {
let compression_bits = compression as i16 & COMPRESSION_CODEC_MASK;
self.attributes = (self.attributes & !COMPRESSION_CODEC_MASK) | compression_bits;
let compression_bits = compression as i16 & ATTR_COMPRESSION_CODEC_MASK;
self.attributes = (self.attributes & !ATTR_COMPRESSION_CODEC_MASK) | compression_bits;
}

pub fn has_schema(&self) -> bool {
let schema_present = self.attributes & ATTR_SCHEMA_PRESENT;
schema_present != 0
}

/// set has schema id attr flag
pub fn set_schema_id(&mut self) {
self.attributes |= ATTR_SCHEMA_PRESENT;
}
}
impl Default for BatchHeader {
Expand All @@ -435,16 +492,16 @@ impl Default for BatchHeader {

pub const BATCH_HEADER_SIZE: usize = size_of::<i32>() // partition leader epoch
+ size_of::<u8>() // magic
+ size_of::<i32>() //crc
+ size_of::<i16>() // i16
+ size_of::<i32>() // crc
+ size_of::<i16>() // i16 attributes
+ size_of::<i32>() // last offset delta
+ size_of::<i64>() // first_timestamp
+ size_of::<i64>() // max_time_stamp
+ size_of::<i64>() //producer id
+ size_of::<i64>() // producer id
+ size_of::<i16>() // produce_epoch
+ size_of::<i32>(); // first sequence

/// used for modifying timestamp as producer
/// used for modifying timestamp as producer
pub trait ProducerBatchHeader {
fn set_first_timestamp(&mut self, timestamp: Timestamp);

Expand Down Expand Up @@ -475,6 +532,16 @@ mod test {
use super::BatchHeader;
use super::BATCH_HEADER_SIZE;

fn rust_log_init() {
let trs = tracing_subscriber::fmt().with_max_level(tracing::Level::INFO);
if std::env::var("RUST_LOG").is_ok() {
trs.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
} else {
trs.init();
}
}

#[test]
fn test_batch_convert_compression_size() {}

Expand All @@ -485,7 +552,7 @@ mod test {
}

#[test]
fn test_encode_and_decode_batch() -> Result<(), IoError> {
fn test_encode_and_decode_batch_basic() -> Result<(), IoError> {
let value = vec![0x74, 0x65, 0x73, 0x74];
let record = Record {
value: RecordData::from(value),
Expand All @@ -497,7 +564,11 @@ mod test {
batch.header.max_time_stamp = 1555478494747;

let bytes = batch.as_bytes(0)?;
println!("batch raw bytes: {:#X?}", bytes.as_ref());
println!(
"batch raw bytes (len {}): {:#X?}",
bytes.len(),
bytes.as_ref()
);

let batch = Batch::<MemoryRecords>::decode_from(&mut Cursor::new(bytes), 0)?;
println!("batch: {batch:#?}");
Expand All @@ -508,7 +579,7 @@ mod test {
let b = decoded_record.value.as_ref();
assert_eq!(b, b"test");
assert!(batch.validate_decoding());

assert!(!batch.header.has_schema());
Ok(())
}

Expand All @@ -529,6 +600,51 @@ mod test {
00c0 14 00 00 00 01 08 74 65 73 74 00
*/

#[test]
fn test_encode_and_decode_batch_w_schemaid() -> Result<(), IoError> {
const USE_RUST_LOG: bool = false;
if USE_RUST_LOG {
rust_log_init();
}

let value = vec![0x74, 0x65, 0x73, 0x74];
let record = Record {
value: RecordData::from(value),
..Default::default()
};
let mut batch = Batch::<MemoryRecords>::default();
batch.records.push(record);
batch.header.first_timestamp = 1555478494747;
batch.header.max_time_stamp = 1555478494747;

const TEST_SCHEMA_ID: u32 = 42;
let sid = SchemaId(TEST_SCHEMA_ID);
batch.set_schema_id(sid);

let bytes = batch.as_bytes(0)?;
println!(
"batch raw bytes (len {}): {:#X?}",
bytes.len(),
bytes.as_ref()
);

let batch = Batch::<MemoryRecords>::decode_from(&mut Cursor::new(bytes), 0)?;
println!("batch: {batch:#?}");

let decoded_record = batch.records.get(0).unwrap();
println!("record crc: {}", batch.header.crc);
assert_eq!(batch.header.crc, 2943551365);
let b = decoded_record.value.as_ref();
assert_eq!(b, b"test");
assert!(batch.validate_decoding());

assert!(batch.header.has_schema());
let got_sid = batch.schema_id();
assert_eq!(Some(SchemaId(TEST_SCHEMA_ID)), got_sid);

Ok(())
}

#[test]
fn test_batch_offset_delta() {
let mut batch = Batch::<MemoryRecords>::default();
Expand Down Expand Up @@ -804,4 +920,13 @@ mod test {
assert_ne!(not_compressed.batch_len(), compressed.batch_len());
assert!(not_compressed.batch_len() > compressed.batch_len());
}

#[test]
fn batch_header_id_set() {
let mut batch = Batch::from(vec![Record::default(), Record::default()]);

assert!(!batch.header.has_schema());
batch.header.set_schema_id();
assert!(batch.header.has_schema());
}
}
Loading