Skip to content

Commit

Permalink
feat: batch timestamp for SmartModuleInput
Browse files Browse the repository at this point in the history
  • Loading branch information
EstebanBorai committed Jul 13, 2023
1 parent 3aba4f3 commit e917651
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 30 deletions.
67 changes: 37 additions & 30 deletions crates/fluvio-smartmodule/src/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use std::io::Cursor;
use fluvio_protocol::record::Offset;
use fluvio_protocol::{Encoder, Decoder, record::Record};

pub const SMARTMODULE_LOOKBACK_WITH_AGE: i16 = 21;
/// SmartModule Version with support for Lookback and Batch Base Timestamp
pub const SMARTMODULE_LOOKBACK_WITH_AGE_AND_BASE_TIMESTAMP: i16 = 21;

#[derive(Debug, Default, Clone, Encoder, Decoder)]
pub struct SmartModuleExtraParams {
Expand Down Expand Up @@ -52,7 +53,7 @@ impl SmartModuleExtraParams {
#[derive(Debug, Default, Clone, Encoder, Decoder, PartialEq, Eq)]
pub struct Lookback {
pub last: u64,
#[fluvio(min_version = 21)]
#[fluvio(min_version = SMARTMODULE_LOOKBACK_WITH_AGE_AND_BASE_TIMESTAMP)]
pub age: Option<Duration>,
}

Expand All @@ -75,22 +76,27 @@ impl Lookback {
/// A single SmartModule input record
#[derive(Debug, Default, Clone, Encoder, Decoder)]
pub struct SmartModuleInput {
/// The base offset of this batch of records
base_offset: Offset,
/// encoded version of Record
raw_bytes: Vec<u8>,
/// This is deprecrated, extra parameters should not be passed, they will be removed in the future
#[deprecated]
#[fluvio(max_version = SMARTMODULE_LOOKBACK_WITH_AGE_AND_BASE_TIMESTAMP)]
params: SmartModuleExtraParams,
#[fluvio(min_version = 16)]
#[fluvio(min_version = 16, max_version = 21)]
join_record: Vec<u8>,
/// The base offset of this batch of records
base_offset: Offset,
/// The creation timestamp of this batch of records
#[fluvio(min_version = SMARTMODULE_LOOKBACK_WITH_AGE_AND_BASE_TIMESTAMP)]
base_timestamp: i64,
}

impl SmartModuleInput {
pub fn new(raw_bytes: Vec<u8>, base_offset: Offset) -> Self {
pub fn new(raw_bytes: Vec<u8>, base_offset: Offset, base_timestamp: i64) -> Self {
Self {
base_offset,
raw_bytes,
base_offset,
base_timestamp,
..Default::default()
}
}
Expand All @@ -103,6 +109,14 @@ impl SmartModuleInput {
self.base_offset = base_offset;
}

pub fn base_timestamp(&self) -> i64 {
self.base_timestamp
}

pub fn set_base_timestamp(&mut self, base_timestamp: i64) {
self.base_timestamp = base_timestamp;
}

pub fn raw_bytes(&self) -> &[u8] {
&self.raw_bytes
}
Expand All @@ -114,39 +128,32 @@ impl SmartModuleInput {
pub fn parts(self) -> (Vec<u8>, Vec<u8>) {
(self.raw_bytes, self.join_record)
}
}

impl TryFrom<Vec<Record>> for SmartModuleInput {
type Error = std::io::Error;
fn try_from(records: Vec<Record>) -> Result<Self, Self::Error> {
pub fn try_into_records(mut self, version: i16) -> Result<Vec<Record>, std::io::Error> {
Decoder::decode_from(&mut Cursor::new(&mut self.raw_bytes), version)
}

pub fn try_from_records(records: Vec<Record>, version: i16) -> Result<Self, std::io::Error> {
let mut raw_bytes = Vec::new();
records.encode(&mut raw_bytes, SMARTMODULE_LOOKBACK_WITH_AGE)?;

records.encode(&mut raw_bytes, version)?;

Ok(SmartModuleInput {
raw_bytes,
..Default::default()
})
}
}

impl TryInto<Vec<Record>> for SmartModuleInput {
type Error = std::io::Error;

fn try_into(mut self) -> Result<Vec<Record>, Self::Error> {
Decoder::decode_from(
&mut Cursor::new(&mut self.raw_bytes),
SMARTMODULE_LOOKBACK_WITH_AGE,
)
}
}

impl Display for SmartModuleInput {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"SmartModuleInput {{ base_offset: {:?}, record_data: {:?}, join_data: {:#?} }}",
self.base_offset,
"SmartModuleInput {{ record_data: {:?}, join_data: {:#?}, base_offset: {:?}, base_timestamp: {:?}, }}",
self.raw_bytes.len(),
self.join_record.len()
self.join_record.len(),
self.base_offset,
self.base_timestamp,
)
}
}
Expand Down Expand Up @@ -180,12 +187,12 @@ mod tests {
];

//when
let sm_input: SmartModuleInput = records
.try_into()
.expect("records to input conversion failed");
let sm_input: SmartModuleInput =
SmartModuleInput::try_from_records(records, SMARTMODULE_BASE_TIMESTAMP)
.expect("records to input conversion failed");

let records_decoded: Vec<Record> = sm_input
.try_into()
.try_into_records(SMARTMODULE_BASE_TIMESTAMP)
.expect("input to records conversion failed");

//then
Expand Down
3 changes: 3 additions & 0 deletions crates/fluvio-spu-schema/src/server/stream_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ pub const CHAIN_SMARTMODULE_API: i16 = 18;

pub const SMARTMODULE_LOOKBACK: i16 = 20;

/// Version with support for Lookback and Batch Base Timestamp
pub const SMARTMODULE_BASE_TIMESTMAP: i16 = 21;

/// Fetch records continuously
/// Output will be send back as stream
#[allow(deprecated)]
Expand Down

0 comments on commit e917651

Please sign in to comment.