Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - add: fluvio-schema, batch id #3283

Closed
wants to merge 4 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 133 additions & 21 deletions crates/fluvio-protocol/src/record/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ use super::Record;
use super::Offset;
use super::Size;

pub const COMPRESSION_CODEC_MASK: i16 = 0x07;
const ATTR_SCHEMA_PRESENT: i16 = 0x10;
const ATTR_COMPRESSION_CODEC_MASK: i16 = 0x07;
pub const NO_TIMESTAMP: i64 = -1;

const SCHEMA_ID_NULL: SchemaId = SchemaId(0u32);

pub trait BatchRecords: Default + Debug + Encoder + Decoder + Send + Sync {
/// how many bytes does record wants to process
#[deprecated]
Expand Down Expand Up @@ -67,11 +70,25 @@ pub const BATCH_PREAMBLE_SIZE: usize = size_of::<Offset>() // Offset

pub const BATCH_FILE_HEADER_SIZE: usize = BATCH_PREAMBLE_SIZE + BATCH_HEADER_SIZE;

#[derive(Clone, Default, Debug, Encoder, PartialEq)]
pub struct SchemaId(u32);

impl Decoder for SchemaId {
sehz marked this conversation as resolved.
Show resolved Hide resolved
fn decode<T: Buf>(&mut self, src: &mut T, version: Version) -> Result<(), Error> {
let mut sid: u32 = 0;
sid.decode(src, version)?;
self.0 = sid;
Ok(())
}
}

#[derive(Default, Debug)]
pub struct Batch<R = MemoryRecords> {
pub base_offset: Offset,
pub batch_len: i32, // only for decoding
pub header: BatchHeader,
// only encoded if schema_id is indicated in the header attr
pub schema_id: SchemaId,
records: R,
}

Expand Down Expand Up @@ -162,6 +179,15 @@ impl<R> Batch<R> {
pub fn batch_len(&self) -> i32 {
self.batch_len
}

pub fn schema_id(&self) -> SchemaId {
self.schema_id.clone()
}

pub fn set_schema_id(&mut self, sid: SchemaId) {
self.header.set_schema_id();
self.schema_id = sid;
}
}

#[cfg(feature = "compress")]
Expand All @@ -173,6 +199,7 @@ impl TryFrom<Batch<RawRecords>> for Batch {
base_offset: batch.base_offset,
batch_len: (BATCH_HEADER_SIZE + records.write_size(0)) as i32,
header: batch.header,
schema_id: SCHEMA_ID_NULL,
records,
})
}
Expand All @@ -189,11 +216,13 @@ impl TryFrom<Batch> for Batch<RawRecords> {
let compressed_records = compression.compress(&buf)?;
let compressed_records_len = compressed_records.len() as i32;
let records = RawRecords(compressed_records);
let schema_id = f.schema_id();

Ok(Batch {
base_offset: f.base_offset,
batch_len: compressed_records_len,
header: f.header,
schema_id,
records,
})
}
Expand All @@ -205,7 +234,16 @@ where
{
/// check if batch is valid after decoded
pub fn validate_decoding(&self) -> bool {
self.batch_len == (BATCH_HEADER_SIZE + self.records.write_size(0)) as i32
self.batch_len == self.calc_batch_len()
}

fn calc_batch_len(&self) -> i32 {
let blen = if self.header.has_schema() {
BATCH_HEADER_SIZE + self.records.write_size(0) + size_of::<SchemaId>()
} else {
BATCH_HEADER_SIZE + self.records.write_size(0)
};
blen as i32
}
}

Expand Down Expand Up @@ -233,12 +271,12 @@ impl Batch {
/// add new record, this will update the offset to correct
pub fn add_record(&mut self, record: Record) {
self.add_records(&mut vec![record]);
self.batch_len = (BATCH_HEADER_SIZE + self.records.write_size(0)) as i32;
self.batch_len = self.calc_batch_len();
}

pub fn add_records(&mut self, records: &mut Vec<Record>) {
self.records.append(records);
self.batch_len = (BATCH_HEADER_SIZE + self.records.write_size(0)) as i32;
self.batch_len = self.calc_batch_len();
self.update_offset_deltas();
}

Expand Down Expand Up @@ -302,7 +340,7 @@ impl<T: Into<MemoryRecords>> From<T> for Batch {

batch.records = records;
let len = batch.records.len() as i32;
batch.batch_len = (BATCH_HEADER_SIZE + batch.records.write_size(0)) as i32;
batch.batch_len = batch.calc_batch_len();
batch.header.last_offset_delta = if len > 0 { len - 1 } else { len };
batch
}
Expand All @@ -318,15 +356,22 @@ where
{
trace!("decoding batch");
self.decode_from_file_buf(src, version)?;

let batch_len = self.batch_len as usize - BATCH_HEADER_SIZE;
let mut buf = src.take(batch_len);
if buf.remaining() < batch_len {
let rec_len = if self.header.has_schema() {
let mut sid = SchemaId(0);
sid.decode(src, version)?;
self.schema_id = sid;
trace!(schema_id=?self.schema_id);
self.batch_len as usize - BATCH_HEADER_SIZE - size_of::<SchemaId>()
} else {
self.batch_len as usize - BATCH_HEADER_SIZE
};
let mut buf = src.take(rec_len);
if buf.remaining() < rec_len {
return Err(Error::new(
std::io::ErrorKind::UnexpectedEof,
format!(
"not enough buf records, expected: {}, found: {}",
batch_len,
rec_len,
buf.remaining()
),
));
Expand All @@ -352,7 +397,7 @@ where
{
trace!("Encoding Batch");
self.base_offset.encode(dest, version)?;
let batch_len: i32 = (BATCH_HEADER_SIZE + self.records.write_size(version)) as i32;
let batch_len: i32 = self.calc_batch_len();
batch_len.encode(dest, version)?;

// encode parts of header
Expand All @@ -368,6 +413,9 @@ where
self.header.producer_id.encode(buf, version)?;
self.header.producer_epoch.encode(buf, version)?;
self.header.first_sequence.encode(buf, version)?;
if self.header.has_schema() {
self.schema_id.encode(buf, version)?;
}
self.records.encode(buf, version)?;

let crc = crc32c::crc32c(&out);
Expand All @@ -383,6 +431,7 @@ impl<R: Clone> Clone for Batch<R> {
base_offset: self.base_offset,
batch_len: self.batch_len,
header: self.header.clone(),
schema_id: self.schema_id.clone(),
records: self.records.clone(),
}
}
Expand All @@ -407,13 +456,23 @@ pub struct BatchHeader {

impl BatchHeader {
fn get_compression(&self) -> Result<Compression, CompressionError> {
let compression_bits = self.attributes & COMPRESSION_CODEC_MASK;
let compression_bits = self.attributes & ATTR_COMPRESSION_CODEC_MASK;
Compression::try_from(compression_bits as i8)
}

pub fn set_compression(&mut self, compression: Compression) {
let compression_bits = compression as i16 & COMPRESSION_CODEC_MASK;
self.attributes = (self.attributes & !COMPRESSION_CODEC_MASK) | compression_bits;
let compression_bits = compression as i16 & ATTR_COMPRESSION_CODEC_MASK;
self.attributes = (self.attributes & !ATTR_COMPRESSION_CODEC_MASK) | compression_bits;
}

pub fn has_schema(&self) -> bool {
let schema_present = self.attributes & ATTR_SCHEMA_PRESENT;
schema_present != 0
}

/// set has schema id attr flag
pub fn set_schema_id(&mut self) {
self.attributes |= ATTR_SCHEMA_PRESENT;
}
}
impl Default for BatchHeader {
Expand All @@ -435,16 +494,16 @@ impl Default for BatchHeader {

pub const BATCH_HEADER_SIZE: usize = size_of::<i32>() // partition leader epoch
+ size_of::<u8>() // magic
+ size_of::<i32>() //crc
+ size_of::<i16>() // i16
+ size_of::<i32>() // crc
+ size_of::<i16>() // i16 attributes
+ size_of::<i32>() // last offset delta
+ size_of::<i64>() // first_timestamp
+ size_of::<i64>() // max_time_stamp
+ size_of::<i64>() //producer id
+ size_of::<i64>() // producer id
+ size_of::<i16>() // produce_epoch
+ size_of::<i32>(); // first sequence

/// used for modifying timestamp as producer
/// used for modifying timestamp as producer
pub trait ProducerBatchHeader {
fn set_first_timestamp(&mut self, timestamp: Timestamp);

Expand Down Expand Up @@ -485,7 +544,7 @@ mod test {
}

#[test]
fn test_encode_and_decode_batch() -> Result<(), IoError> {
fn test_encode_and_decode_batch_basic() -> Result<(), IoError> {
let value = vec![0x74, 0x65, 0x73, 0x74];
let record = Record {
value: RecordData::from(value),
Expand All @@ -497,7 +556,11 @@ mod test {
batch.header.max_time_stamp = 1555478494747;

let bytes = batch.as_bytes(0)?;
println!("batch raw bytes: {:#X?}", bytes.as_ref());
println!(
"batch raw bytes (len {}): {:#X?}",
bytes.len(),
bytes.as_ref()
);

let batch = Batch::<MemoryRecords>::decode_from(&mut Cursor::new(bytes), 0)?;
println!("batch: {batch:#?}");
Expand All @@ -508,7 +571,7 @@ mod test {
let b = decoded_record.value.as_ref();
assert_eq!(b, b"test");
assert!(batch.validate_decoding());

assert!(!batch.header.has_schema());
Ok(())
}

Expand All @@ -529,6 +592,46 @@ mod test {
00c0 14 00 00 00 01 08 74 65 73 74 00
*/

#[test]
fn test_encode_and_decode_batch_w_schemaid() -> Result<(), IoError> {
let value = vec![0x74, 0x65, 0x73, 0x74];
let record = Record {
value: RecordData::from(value),
..Default::default()
};
let mut batch = Batch::<MemoryRecords>::default();
batch.records.push(record);
batch.header.first_timestamp = 1555478494747;
batch.header.max_time_stamp = 1555478494747;

const TEST_SCHEMA_ID: u32 = 42;
let sid = SchemaId(TEST_SCHEMA_ID);
batch.set_schema_id(sid);

let bytes = batch.as_bytes(0)?;
println!(
"batch raw bytes (len {}): {:#X?}",
bytes.len(),
bytes.as_ref()
);

let batch = Batch::<MemoryRecords>::decode_from(&mut Cursor::new(bytes), 0)?;
println!("batch: {batch:#?}");

let decoded_record = batch.records.get(0).unwrap();
println!("record crc: {}", batch.header.crc);
assert_eq!(batch.header.crc, 2943551365);
let b = decoded_record.value.as_ref();
assert_eq!(b, b"test");
assert!(batch.validate_decoding());

assert!(batch.header.has_schema());
let got_sid = batch.schema_id();
assert_eq!(SchemaId(TEST_SCHEMA_ID), got_sid);

Ok(())
}

#[test]
fn test_batch_offset_delta() {
let mut batch = Batch::<MemoryRecords>::default();
Expand Down Expand Up @@ -804,4 +907,13 @@ mod test {
assert_ne!(not_compressed.batch_len(), compressed.batch_len());
assert!(not_compressed.batch_len() > compressed.batch_len());
}

#[test]
fn batch_header_id_set() {
let mut batch = Batch::from(vec![Record::default(), Record::default()]);

assert!(!batch.header.has_schema());
batch.header.set_schema_id();
assert!(batch.header.has_schema());
}
}