From e917651cae688ccea7f049fc92559ccdeb801059 Mon Sep 17 00:00:00 2001 From: Esteban Borai Date: Tue, 11 Jul 2023 18:11:55 -0400 Subject: [PATCH] feat: batch timestamp for `SmartModuleInput` --- crates/fluvio-smartmodule/src/input.rs | 67 ++++++++++--------- .../src/server/stream_fetch.rs | 3 + 2 files changed, 40 insertions(+), 30 deletions(-) diff --git a/crates/fluvio-smartmodule/src/input.rs b/crates/fluvio-smartmodule/src/input.rs index f94706deff..f3ae398748 100644 --- a/crates/fluvio-smartmodule/src/input.rs +++ b/crates/fluvio-smartmodule/src/input.rs @@ -6,7 +6,8 @@ use std::io::Cursor; use fluvio_protocol::record::Offset; use fluvio_protocol::{Encoder, Decoder, record::Record}; -pub const SMARTMODULE_LOOKBACK_WITH_AGE: i16 = 21; +/// SmartModule Version with support for Lookback and Batch Base Timestamp +pub const SMARTMODULE_LOOKBACK_WITH_AGE_AND_BASE_TIMESTAMP: i16 = 21; #[derive(Debug, Default, Clone, Encoder, Decoder)] pub struct SmartModuleExtraParams { @@ -52,7 +53,7 @@ impl SmartModuleExtraParams { #[derive(Debug, Default, Clone, Encoder, Decoder, PartialEq, Eq)] pub struct Lookback { pub last: u64, - #[fluvio(min_version = 21)] + #[fluvio(min_version = SMARTMODULE_LOOKBACK_WITH_AGE_AND_BASE_TIMESTAMP)] pub age: Option, } @@ -75,22 +76,27 @@ impl Lookback { /// A single SmartModule input record #[derive(Debug, Default, Clone, Encoder, Decoder)] pub struct SmartModuleInput { - /// The base offset of this batch of records - base_offset: Offset, /// encoded version of Record raw_bytes: Vec, /// This is deprecrated, extra parameters should not be passed, they will be removed in the future #[deprecated] + #[fluvio(max_version = SMARTMODULE_LOOKBACK_WITH_AGE_AND_BASE_TIMESTAMP)] params: SmartModuleExtraParams, - #[fluvio(min_version = 16)] + #[fluvio(min_version = 16, max_version = 21)] join_record: Vec, + /// The base offset of this batch of records + base_offset: Offset, + /// The creation timestamp of this batch of records + #[fluvio(min_version = SMARTMODULE_LOOKBACK_WITH_AGE_AND_BASE_TIMESTAMP)] + base_timestamp: i64, } impl SmartModuleInput { - pub fn new(raw_bytes: Vec, base_offset: Offset) -> Self { + pub fn new(raw_bytes: Vec, base_offset: Offset, base_timestamp: i64) -> Self { Self { - base_offset, raw_bytes, + base_offset, + base_timestamp, ..Default::default() } } @@ -103,6 +109,14 @@ impl SmartModuleInput { self.base_offset = base_offset; } + pub fn base_timestamp(&self) -> i64 { + self.base_timestamp + } + + pub fn set_base_timestamp(&mut self, base_timestamp: i64) { + self.base_timestamp = base_timestamp; + } + pub fn raw_bytes(&self) -> &[u8] { &self.raw_bytes } @@ -114,13 +128,16 @@ impl SmartModuleInput { pub fn parts(self) -> (Vec, Vec) { (self.raw_bytes, self.join_record) } -} -impl TryFrom> for SmartModuleInput { - type Error = std::io::Error; - fn try_from(records: Vec) -> Result { + pub fn try_into_records(mut self, version: i16) -> Result, std::io::Error> { + Decoder::decode_from(&mut Cursor::new(&mut self.raw_bytes), version) + } + + pub fn try_from_records(records: Vec, version: i16) -> Result { let mut raw_bytes = Vec::new(); - records.encode(&mut raw_bytes, SMARTMODULE_LOOKBACK_WITH_AGE)?; + + records.encode(&mut raw_bytes, version)?; + Ok(SmartModuleInput { raw_bytes, ..Default::default() @@ -128,25 +145,15 @@ impl TryFrom> for SmartModuleInput { } } -impl TryInto> for SmartModuleInput { - type Error = std::io::Error; - - fn try_into(mut self) -> Result, Self::Error> { - Decoder::decode_from( - &mut Cursor::new(&mut self.raw_bytes), - SMARTMODULE_LOOKBACK_WITH_AGE, - ) - } -} - impl Display for SmartModuleInput { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( f, - "SmartModuleInput {{ base_offset: {:?}, record_data: {:?}, join_data: {:#?} }}", - self.base_offset, + "SmartModuleInput {{ record_data: {:?}, join_data: {:#?}, base_offset: {:?}, base_timestamp: {:?}, }}", self.raw_bytes.len(), - self.join_record.len() + self.join_record.len(), + self.base_offset, + self.base_timestamp, ) } } @@ -180,12 +187,12 @@ mod tests { ]; //when - let sm_input: SmartModuleInput = records - .try_into() - .expect("records to input conversion failed"); + let sm_input: SmartModuleInput = + SmartModuleInput::try_from_records(records, SMARTMODULE_BASE_TIMESTAMP) + .expect("records to input conversion failed"); let records_decoded: Vec = sm_input - .try_into() + .try_into_records(SMARTMODULE_BASE_TIMESTAMP) .expect("input to records conversion failed"); //then diff --git a/crates/fluvio-spu-schema/src/server/stream_fetch.rs b/crates/fluvio-spu-schema/src/server/stream_fetch.rs index 0031550ca6..3767f72b87 100644 --- a/crates/fluvio-spu-schema/src/server/stream_fetch.rs +++ b/crates/fluvio-spu-schema/src/server/stream_fetch.rs @@ -48,6 +48,9 @@ pub const CHAIN_SMARTMODULE_API: i16 = 18; pub const SMARTMODULE_LOOKBACK: i16 = 20; +/// Version with support for Lookback and Batch Base Timestamp +pub const SMARTMODULE_BASE_TIMESTMAP: i16 = 21; + /// Fetch records continuously /// Output will be send back as stream #[allow(deprecated)]