Skip to content

Commit

Permalink
add: fluvio-schema, batch id
Browse files Browse the repository at this point in the history
Add ability to set, encode and decode a schema id to a record batch
  • Loading branch information
digikata committed May 25, 2023
1 parent 4fc5df5 commit 09b482c
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 21 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/fluvio-protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,4 @@ fluvio-future = { workspace = true, features = [
"net",
] }
futures = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] }
167 changes: 146 additions & 21 deletions crates/fluvio-protocol/src/record/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use super::Record;
use super::Offset;
use super::Size;

pub const COMPRESSION_CODEC_MASK: i16 = 0x07;
const ATTR_SCHEMA_PRESENT: i16 = 0x10;
const ATTR_COMPRESSION_CODEC_MASK: i16 = 0x07;
pub const NO_TIMESTAMP: i64 = -1;

pub trait BatchRecords: Default + Debug + Encoder + Decoder + Send + Sync {
Expand Down Expand Up @@ -67,11 +68,24 @@ pub const BATCH_PREAMBLE_SIZE: usize = size_of::<Offset>() // Offset

pub const BATCH_FILE_HEADER_SIZE: usize = BATCH_PREAMBLE_SIZE + BATCH_HEADER_SIZE;

#[derive(Clone, Default, Debug, Encoder, PartialEq)]
pub struct SchemaId(u32);

impl Decoder for SchemaId {
fn decode<T: Buf>(&mut self, src: &mut T, version: Version) -> Result<(), Error> {
let mut sid: u32 = 0;
sid.decode(src, version)?;
self.0 = sid;
Ok(())
}
}

#[derive(Default, Debug)]
pub struct Batch<R = MemoryRecords> {
pub base_offset: Offset,
pub batch_len: i32, // only for decoding
pub header: BatchHeader,
pub schema_id: Option<SchemaId>,
records: R,
}

Expand Down Expand Up @@ -162,6 +176,15 @@ impl<R> Batch<R> {
pub fn batch_len(&self) -> i32 {
self.batch_len
}

pub fn schema_id(&self) -> Option<SchemaId> {
self.schema_id.as_ref().cloned()
}

pub fn set_schema_id(&mut self, sid: SchemaId) {
self.header.set_schema_id();
self.schema_id = Some(sid);
}
}

#[cfg(feature = "compress")]
Expand All @@ -173,6 +196,7 @@ impl TryFrom<Batch<RawRecords>> for Batch {
base_offset: batch.base_offset,
batch_len: (BATCH_HEADER_SIZE + records.write_size(0)) as i32,
header: batch.header,
schema_id: None,
records,
})
}
Expand All @@ -189,11 +213,13 @@ impl TryFrom<Batch> for Batch<RawRecords> {
let compressed_records = compression.compress(&buf)?;
let compressed_records_len = compressed_records.len() as i32;
let records = RawRecords(compressed_records);
let schema_id = f.schema_id();

Ok(Batch {
base_offset: f.base_offset,
batch_len: compressed_records_len,
header: f.header,
schema_id,
records,
})
}
Expand All @@ -205,7 +231,16 @@ where
{
/// check if batch is valid after decoded
pub fn validate_decoding(&self) -> bool {
self.batch_len == (BATCH_HEADER_SIZE + self.records.write_size(0)) as i32
self.batch_len == self.calc_batch_len()
}

fn calc_batch_len(&self) -> i32 {
let blen = if self.header.has_schema() {
BATCH_HEADER_SIZE + self.records.write_size(0) + size_of::<SchemaId>()
} else {
BATCH_HEADER_SIZE + self.records.write_size(0)
};
blen as i32
}
}

Expand Down Expand Up @@ -233,12 +268,12 @@ impl Batch {
/// add new record, this will update the offset to correct
pub fn add_record(&mut self, record: Record) {
self.add_records(&mut vec![record]);
self.batch_len = (BATCH_HEADER_SIZE + self.records.write_size(0)) as i32;
self.batch_len = self.calc_batch_len();
}

pub fn add_records(&mut self, records: &mut Vec<Record>) {
self.records.append(records);
self.batch_len = (BATCH_HEADER_SIZE + self.records.write_size(0)) as i32;
self.batch_len = self.calc_batch_len();
self.update_offset_deltas();
}

Expand Down Expand Up @@ -302,7 +337,7 @@ impl<T: Into<MemoryRecords>> From<T> for Batch {

batch.records = records;
let len = batch.records.len() as i32;
batch.batch_len = (BATCH_HEADER_SIZE + batch.records.write_size(0)) as i32;
batch.batch_len = batch.calc_batch_len();
batch.header.last_offset_delta = if len > 0 { len - 1 } else { len };
batch
}
Expand All @@ -318,15 +353,22 @@ where
{
trace!("decoding batch");
self.decode_from_file_buf(src, version)?;

let batch_len = self.batch_len as usize - BATCH_HEADER_SIZE;
let mut buf = src.take(batch_len);
if buf.remaining() < batch_len {
let rec_len = if self.header.has_schema() {
let mut sid = SchemaId(0);
sid.decode(src, version)?;
self.schema_id = Some(sid);
trace!("schema_id: {:?}", self.schema_id());
self.batch_len as usize - BATCH_HEADER_SIZE - size_of::<SchemaId>()
} else {
self.batch_len as usize - BATCH_HEADER_SIZE
};
let mut buf = src.take(rec_len);
if buf.remaining() < rec_len {
return Err(Error::new(
std::io::ErrorKind::UnexpectedEof,
format!(
"not enough buf records, expected: {}, found: {}",
batch_len,
rec_len,
buf.remaining()
),
));
Expand All @@ -352,7 +394,7 @@ where
{
trace!("Encoding Batch");
self.base_offset.encode(dest, version)?;
let batch_len: i32 = (BATCH_HEADER_SIZE + self.records.write_size(version)) as i32;
let batch_len: i32 = self.calc_batch_len();
batch_len.encode(dest, version)?;

// encode parts of header
Expand All @@ -368,6 +410,10 @@ where
self.header.producer_id.encode(buf, version)?;
self.header.producer_epoch.encode(buf, version)?;
self.header.first_sequence.encode(buf, version)?;
if let Some(ref schema_id) = self.schema_id {
// encode schema id
schema_id.encode(buf, version)?;
}
self.records.encode(buf, version)?;

let crc = crc32c::crc32c(&out);
Expand All @@ -383,6 +429,7 @@ impl<R: Clone> Clone for Batch<R> {
base_offset: self.base_offset,
batch_len: self.batch_len,
header: self.header.clone(),
schema_id: self.schema_id.clone(),
records: self.records.clone(),
}
}
Expand All @@ -407,13 +454,23 @@ pub struct BatchHeader {

impl BatchHeader {
fn get_compression(&self) -> Result<Compression, CompressionError> {
let compression_bits = self.attributes & COMPRESSION_CODEC_MASK;
let compression_bits = self.attributes & ATTR_COMPRESSION_CODEC_MASK;
Compression::try_from(compression_bits as i8)
}

pub fn set_compression(&mut self, compression: Compression) {
let compression_bits = compression as i16 & COMPRESSION_CODEC_MASK;
self.attributes = (self.attributes & !COMPRESSION_CODEC_MASK) | compression_bits;
let compression_bits = compression as i16 & ATTR_COMPRESSION_CODEC_MASK;
self.attributes = (self.attributes & !ATTR_COMPRESSION_CODEC_MASK) | compression_bits;
}

pub fn has_schema(&self) -> bool {
let schema_present = self.attributes & ATTR_SCHEMA_PRESENT;
schema_present != 0
}

/// set has schema id attr flag
pub fn set_schema_id(&mut self) {
self.attributes |= ATTR_SCHEMA_PRESENT;
}
}
impl Default for BatchHeader {
Expand All @@ -435,16 +492,16 @@ impl Default for BatchHeader {

pub const BATCH_HEADER_SIZE: usize = size_of::<i32>() // partition leader epoch
+ size_of::<u8>() // magic
+ size_of::<i32>() //crc
+ size_of::<i16>() // i16
+ size_of::<i32>() // crc
+ size_of::<i16>() // i16 attributes
+ size_of::<i32>() // last offset delta
+ size_of::<i64>() // first_timestamp
+ size_of::<i64>() // max_time_stamp
+ size_of::<i64>() //producer id
+ size_of::<i64>() // producer id
+ size_of::<i16>() // produce_epoch
+ size_of::<i32>(); // first sequence

/// used for modifying timestamp as producer
/// used for modifying timestamp as producer
pub trait ProducerBatchHeader {
fn set_first_timestamp(&mut self, timestamp: Timestamp);

Expand Down Expand Up @@ -475,6 +532,16 @@ mod test {
use super::BatchHeader;
use super::BATCH_HEADER_SIZE;

fn rust_log_init() {
let trs = tracing_subscriber::fmt().with_max_level(tracing::Level::INFO);
if std::env::var("RUST_LOG").is_ok() {
trs.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
} else {
trs.init();
}
}

#[test]
fn test_batch_convert_compression_size() {}

Expand All @@ -485,7 +552,7 @@ mod test {
}

#[test]
fn test_encode_and_decode_batch() -> Result<(), IoError> {
fn test_encode_and_decode_batch_basic() -> Result<(), IoError> {
let value = vec![0x74, 0x65, 0x73, 0x74];
let record = Record {
value: RecordData::from(value),
Expand All @@ -497,7 +564,11 @@ mod test {
batch.header.max_time_stamp = 1555478494747;

let bytes = batch.as_bytes(0)?;
println!("batch raw bytes: {:#X?}", bytes.as_ref());
println!(
"batch raw bytes (len {}): {:#X?}",
bytes.len(),
bytes.as_ref()
);

let batch = Batch::<MemoryRecords>::decode_from(&mut Cursor::new(bytes), 0)?;
println!("batch: {batch:#?}");
Expand All @@ -508,7 +579,7 @@ mod test {
let b = decoded_record.value.as_ref();
assert_eq!(b, b"test");
assert!(batch.validate_decoding());

assert!(!batch.header.has_schema());
Ok(())
}

Expand All @@ -529,6 +600,51 @@ mod test {
00c0 14 00 00 00 01 08 74 65 73 74 00
*/

#[test]
fn test_encode_and_decode_batch_w_schemaid() -> Result<(), IoError> {
const USE_RUST_LOG: bool = false;
if USE_RUST_LOG {
rust_log_init();
}

let value = vec![0x74, 0x65, 0x73, 0x74];
let record = Record {
value: RecordData::from(value),
..Default::default()
};
let mut batch = Batch::<MemoryRecords>::default();
batch.records.push(record);
batch.header.first_timestamp = 1555478494747;
batch.header.max_time_stamp = 1555478494747;

const TEST_SCHEMA_ID: u32 = 42;
let sid = SchemaId(TEST_SCHEMA_ID);
batch.set_schema_id(sid);

let bytes = batch.as_bytes(0)?;
println!(
"batch raw bytes (len {}): {:#X?}",
bytes.len(),
bytes.as_ref()
);

let batch = Batch::<MemoryRecords>::decode_from(&mut Cursor::new(bytes), 0)?;
println!("batch: {batch:#?}");

let decoded_record = batch.records.get(0).unwrap();
println!("record crc: {}", batch.header.crc);
assert_eq!(batch.header.crc, 2943551365);
let b = decoded_record.value.as_ref();
assert_eq!(b, b"test");
assert!(batch.validate_decoding());

assert!(batch.header.has_schema());
let got_sid = batch.schema_id();
assert_eq!(Some(SchemaId(TEST_SCHEMA_ID)), got_sid);

Ok(())
}

#[test]
fn test_batch_offset_delta() {
let mut batch = Batch::<MemoryRecords>::default();
Expand Down Expand Up @@ -804,4 +920,13 @@ mod test {
assert_ne!(not_compressed.batch_len(), compressed.batch_len());
assert!(not_compressed.batch_len() > compressed.batch_len());
}

#[test]
fn batch_header_id_set() {
let mut batch = Batch::from(vec![Record::default(), Record::default()]);

assert_eq!(batch.header.has_schema(), false);
batch.header.set_schema_id();
assert!(batch.header.has_schema());
}
}

0 comments on commit 09b482c

Please sign in to comment.