Skip to content

Commit

Permalink
feat: inject timestamp to SmartModuleInput
Browse files Browse the repository at this point in the history
  • Loading branch information
EstebanBorai committed Jul 15, 2023
1 parent 6c08774 commit 6834a09
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 9 deletions.
23 changes: 22 additions & 1 deletion crates/fluvio-smartmodule/src/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,17 @@ pub struct SmartModuleInput {
params: SmartModuleExtraParams,
#[fluvio(min_version = 16)]
join_record: Vec<u8>,
/// The base timestamp of this batch of records
#[fluvio(min_version = 21)]
base_timestamp: i64,
}

impl SmartModuleInput {
pub fn new(raw_bytes: Vec<u8>, base_offset: Offset) -> Self {
pub fn new(raw_bytes: Vec<u8>, base_offset: Offset, base_timestamp: i64) -> Self {
Self {
base_offset,
raw_bytes,
base_timestamp,
..Default::default()
}
}
Expand All @@ -103,6 +107,14 @@ impl SmartModuleInput {
self.base_offset = base_offset;
}

pub fn base_timestamp(&self) -> Offset {
self.base_timestamp
}

pub fn set_base_timestamp(&mut self, base_timestamp: i64) {
self.base_timestamp = base_timestamp;
}

pub fn raw_bytes(&self) -> &[u8] {
&self.raw_bytes
}
Expand Down Expand Up @@ -193,4 +205,13 @@ mod tests {
assert_eq!(records_decoded[1].value.as_ref(), b"fruit");
assert_eq!(records_decoded[2].value.as_ref(), b"banana");
}

#[test]
fn sets_the_provided_value_as_timestamp() {
let mut sm_input = SmartModuleInput::new(vec![0, 1, 2, 3], 0);

sm_input.set_base_timestamp(1234);
assert_eq!(sm_input.base_timestamp, 1234);
assert_eq!(sm_input.base_timestamp(), 1234);
}
}
16 changes: 10 additions & 6 deletions crates/fluvio-spu/src/smartengine/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ use std::time::Instant;
use std::io::Error as IoError;

use anyhow::Error;
use fluvio_compression::{Compression, CompressionError};
use fluvio_protocol::record::{RecordSet, RawRecords};
use fluvio_smartengine::metrics::SmartModuleChainMetrics;
use tracing::{instrument, debug, trace};

use fluvio_compression::{Compression, CompressionError};
use fluvio_protocol::record::{RecordSet, RawRecords};
use fluvio_protocol::Encoder;
use fluvio_protocol::{
record::{Batch, MemoryRecords, Offset},
link::smartmodule::SmartModuleTransformRuntimeError,
};
use fluvio_smartengine::metrics::SmartModuleChainMetrics;
use fluvio_smartengine::SmartModuleChainInstance;
use fluvio_smartmodule::dataplane::smartmodule::SmartModuleInput;

Expand All @@ -22,6 +22,8 @@ pub(crate) trait SmartModuleInputBatch {

fn base_offset(&self) -> Offset;

fn base_timestamp(&self) -> i64;

fn offset_delta(&self) -> i32;

fn get_compression(&self) -> Result<Compression, CompressionError>;
Expand Down Expand Up @@ -62,9 +64,11 @@ pub(crate) fn process_batch<R: SmartModuleInputBatch>(
);

let now = Instant::now();

let input = SmartModuleInput::new(input_batch.records().clone(), input_batch.base_offset());

let input = SmartModuleInput::new(
input_batch.records().clone(),
input_batch.base_offset(),
input_batch.base_timestamp(),
);
let output = sm_chain_instance.process(input, metric)?;

debug!(smartmodule_execution_time = %now.elapsed().as_millis());
Expand Down
4 changes: 4 additions & 0 deletions crates/fluvio-spu/src/smartengine/file_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ impl SmartModuleInputBatch for FileBatch {
self.batch.base_offset
}

fn base_timestamp(&self) -> i64 {
self.batch.get_base_timestamp()
}

fn offset_delta(&self) -> i32 {
self.batch.header.last_offset_delta
}
Expand Down
7 changes: 6 additions & 1 deletion crates/fluvio-spu/src/smartengine/produce_batch.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::io::{Error as IoError, ErrorKind};

use fluvio_protocol::record::{Batch, RawRecords, Offset};
use fluvio_compression::{Compression, CompressionError};

use super::batch::SmartModuleInputBatch;
use fluvio_compression::{Compression, CompressionError};

#[derive(Debug)]
pub struct ProduceBatch<'a> {
Expand All @@ -25,6 +26,10 @@ impl<'a> SmartModuleInputBatch for ProduceBatch<'a> {
self.batch.base_offset
}

fn base_timestamp(&self) -> i64 {
self.batch.get_base_timestamp()
}

fn offset_delta(&self) -> i32 {
self.batch.header.last_offset_delta
}
Expand Down
10 changes: 9 additions & 1 deletion crates/fluvio/src/producer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,16 +440,24 @@ impl TopicProducer {
let mut entries = vec![record];

use std::convert::TryFrom;

use chrono::Utc;

use fluvio_smartmodule::dataplane::smartmodule::SmartModuleInput;


let metrics = self.metrics.chain_metrics();

if let Some(
smart_chain_ref
) = &self.sm_chain {
let mut sm_chain = smart_chain_ref.write().await;
let mut sm_input = SmartModuleInput::try_from(entries)?;
let current_time = Utc::now().timestamp_millis();

sm_input.set_base_timestamp(current_time);

let output = sm_chain.process(SmartModuleInput::try_from(entries)?,metrics).map_err(|e| FluvioError::Other(format!("SmartEngine - {e:?}")))?;
let output = sm_chain.process(sm_input,metrics).map_err(|e| FluvioError::Other(format!("SmartEngine - {e:?}")))?;
entries = output.successes;
}
} else {
Expand Down

0 comments on commit 6834a09

Please sign in to comment.