Skip to content

Commit

Permalink
fix: provide base timestamp getters
Browse files Browse the repository at this point in the history
  • Loading branch information
EstebanBorai committed Jul 13, 2023
1 parent e917651 commit 61d854f
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 18 deletions.
6 changes: 4 additions & 2 deletions crates/fluvio-smartengine/src/engine/config.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::time::Duration;

use derive_builder::Builder;
use fluvio_smartmodule::dataplane::smartmodule::SmartModuleExtraParams;
use fluvio_smartmodule::dataplane::smartmodule::{
SmartModuleExtraParams, SMARTMODULE_LOOKBACK_WITH_AGE_AND_BASE_TIMESTAMP,
};

const DEFAULT_SMARTENGINE_VERSION: i16 = 17;
pub const DEFAULT_SMARTENGINE_VERSION: i16 = SMARTMODULE_LOOKBACK_WITH_AGE_AND_BASE_TIMESTAMP;

/// Initial seed data to passed, this will be send back as part of the output
#[derive(Debug, Clone)]
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-smartengine/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub mod metrics;
mod config;
pub use config::{
SmartModuleConfig, SmartModuleConfigBuilder, SmartModuleConfigBuilderError,
SmartModuleInitialData, Lookback,
SmartModuleInitialData, Lookback, DEFAULT_SMARTENGINE_VERSION,
};
mod error;

Expand Down
18 changes: 14 additions & 4 deletions crates/fluvio-smartengine/src/engine/wasmtime/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use wasmtime::{Engine, Module};
use fluvio_smartmodule::dataplane::smartmodule::{SmartModuleInput, SmartModuleOutput};

use crate::SmartModuleConfig;
use crate::engine::config::Lookback;
use crate::engine::config::{Lookback, DEFAULT_SMARTENGINE_VERSION};

use super::init::SmartModuleInit;
use super::instance::{SmartModuleInstance, SmartModuleInstanceContext};
Expand Down Expand Up @@ -138,7 +138,10 @@ impl SmartModuleChainInstance {
// encountered error, we stop processing and return partial output
return Ok(output);
} else {
next_input = output.successes.try_into()?;
next_input = SmartModuleInput::try_from_records(
output.successes,
DEFAULT_SMARTENGINE_VERSION,
)?;
next_input.set_base_offset(base_offset);
}
}
Expand All @@ -153,7 +156,9 @@ impl SmartModuleChainInstance {
debug!(records_out, "sm records out");
Ok(output)
} else {
Ok(SmartModuleOutput::new(input.try_into()?))
Ok(SmartModuleOutput::new(
input.try_into_records(DEFAULT_SMARTENGINE_VERSION)?,
))
}
}

Expand All @@ -170,12 +175,17 @@ impl SmartModuleChainInstance {
for instance in self.instances.iter_mut() {
if let Some(lookback) = instance.lookback() {
debug!("look_back on instance");

let records: Vec<Record> = read_fn(lookback).await?;
let input: SmartModuleInput = SmartModuleInput::try_from(records)?;
let input: SmartModuleInput =
SmartModuleInput::try_from_records(records, DEFAULT_SMARTENGINE_VERSION)?;

metrics.add_bytes_in(input.raw_bytes().len() as u64);
self.store.top_up_fuel();

let result = instance.call_look_back(input, &mut self.store);
let fuel_used = self.store.get_used_fuel();

debug!(fuel_used, "fuel used");
metrics.add_fuel_used(fuel_used);
result?;
Expand Down
8 changes: 7 additions & 1 deletion crates/fluvio-spu/src/smartengine/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub(crate) trait SmartModuleInputBatch {
fn offset_delta(&self) -> i32;

fn get_compression(&self) -> Result<Compression, CompressionError>;

fn base_timestamp(&self) -> i64;
}

#[instrument(skip(sm_chain_instance, input_batches, max_bytes, metric))]
Expand Down Expand Up @@ -51,7 +53,11 @@ pub(crate) fn process_batch<R: SmartModuleInputBatch>(

let now = Instant::now();

let input = SmartModuleInput::new(input_batch.records().clone(), input_batch.base_offset());
let input = SmartModuleInput::new(
input_batch.records().clone(),
input_batch.base_offset(),
input_batch.base_timestamp(),
);

let output = sm_chain_instance.process(input, metric)?;

Expand Down
16 changes: 10 additions & 6 deletions crates/fluvio-spu/src/smartengine/file_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ use std::collections::VecDeque;
use std::os::unix::io::RawFd;
use std::io::{Error as IoError, ErrorKind, Cursor};

use fluvio_protocol::{Decoder, Version};
use fluvio_smartmodule::Record;
use fluvio_types::Timestamp;
use tracing::{warn, debug};
use nix::sys::uio::pread;
use tracing::{warn, debug};

use fluvio_protocol::record::{Batch, Offset, BATCH_FILE_HEADER_SIZE, BATCH_HEADER_SIZE};
use fluvio_future::file_slice::AsyncFileSlice;
use fluvio_compression::{Compression, CompressionError};
use fluvio_future::file_slice::AsyncFileSlice;
use fluvio_protocol::{Decoder, Version};
use fluvio_protocol::record::{Batch, Offset, BATCH_FILE_HEADER_SIZE, BATCH_HEADER_SIZE};
use fluvio_smartmodule::Record;
use fluvio_types::Timestamp;

use super::batch::SmartModuleInputBatch;

Expand All @@ -36,6 +36,10 @@ impl SmartModuleInputBatch for FileBatch {
fn get_compression(&self) -> Result<Compression, CompressionError> {
self.batch.get_compression()
}

fn base_timestamp(&self) -> i64 {
self.batch.get_base_timestamp()
}
}

/// Iterator that returns batch from file
Expand Down
7 changes: 6 additions & 1 deletion crates/fluvio-spu/src/smartengine/produce_batch.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::io::{Error as IoError, ErrorKind};

use fluvio_compression::{Compression, CompressionError};
use fluvio_protocol::record::{Batch, RawRecords, Offset};

use super::batch::SmartModuleInputBatch;
use fluvio_compression::{Compression, CompressionError};

#[derive(Debug)]
pub struct ProduceBatch<'a> {
Expand Down Expand Up @@ -32,6 +33,10 @@ impl<'a> SmartModuleInputBatch for ProduceBatch<'a> {
fn get_compression(&self) -> Result<Compression, CompressionError> {
self.batch.get_compression()
}

fn base_timestamp(&self) -> i64 {
self.batch.get_base_timestamp()
}
}

impl<'a> ProduceBatchIterator<'a> {
Expand Down
4 changes: 2 additions & 2 deletions crates/fluvio/src/producer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,8 @@ impl TopicProducer {
if #[cfg(feature = "smartengine")] {
let mut entries = vec![record];

use std::convert::TryFrom;
use fluvio_smartmodule::dataplane::smartmodule::SmartModuleInput;
use fluvio_smartengine::DEFAULT_SMARTENGINE_VERSION;

let metrics = self.metrics.chain_metrics();

Expand All @@ -449,7 +449,7 @@ impl TopicProducer {
) = &self.sm_chain {
let mut sm_chain = smart_chain_ref.write().await;

let output = sm_chain.process(SmartModuleInput::try_from(entries)?,metrics).map_err(|e| FluvioError::Other(format!("SmartEngine - {e:?}")))?;
let output = sm_chain.process(SmartModuleInput::try_from_records(entries, DEFAULT_SMARTENGINE_VERSION)?,metrics).map_err(|e| FluvioError::Other(format!("SmartEngine - {e:?}")))?;
entries = output.successes;
}
} else {
Expand Down
6 changes: 5 additions & 1 deletion crates/smartmodule-development-kit/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use fluvio_smartengine::metrics::SmartModuleChainMetrics;
use fluvio_smartengine::transformation::TransformationConfig;
use fluvio_smartengine::{
SmartEngine, SmartModuleChainBuilder, SmartModuleConfig, SmartModuleChainInstance, Lookback,
DEFAULT_SMARTENGINE_VERSION,
};
use fluvio_smartmodule::dataplane::smartmodule::SmartModuleInput;
use fluvio_protocol::record::Record;
Expand Down Expand Up @@ -157,7 +158,10 @@ impl TestCmd {

let test_records: Vec<Record> = test_data.into();

let output = chain.process(SmartModuleInput::try_from(test_records)?, &metrics)?;
let output = chain.process(
SmartModuleInput::try_from_records(test_records, DEFAULT_SMARTENGINE_VERSION)?,
&metrics,
)?;

if self.verbose {
println!("{:?} records outputed", output.successes.len());
Expand Down

0 comments on commit 61d854f

Please sign in to comment.