Skip to content

Commit

Permalink
feat: use version from SmartModuleInstance
Browse files Browse the repository at this point in the history
  • Loading branch information
EstebanBorai committed Jul 25, 2023
1 parent ae33a22 commit e0041f3
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 9 deletions.
10 changes: 4 additions & 6 deletions crates/fluvio-smartengine/src/engine/wasmtime/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl SmartModuleChainBuilder {
let init = SmartModuleInit::try_instantiate(&ctx, &mut state)?;
let look_back = SmartModuleLookBack::try_instantiate(&ctx, &mut state)?;
let transform = create_transform(&ctx, config.initial_data, &mut state)?;
let mut instance = SmartModuleInstance::new(ctx, init, look_back, transform);
let mut instance = SmartModuleInstance::new(ctx, init, look_back, transform, version);

instance.call_init(&mut state)?;
instances.push(instance);
Expand Down Expand Up @@ -160,10 +160,8 @@ impl SmartModuleChainInstance {
// encountered error, we stop processing and return partial output
return Ok(output);
} else {
next_input = SmartModuleInput::try_from_records(
output.successes,
DEFAULT_SMARTENGINE_VERSION,
)?;
next_input =
SmartModuleInput::try_from_records(output.successes, instance.version())?;
next_input.set_base_offset(base_offset);
next_input.set_base_timestamp(base_timestamp);
}
Expand Down Expand Up @@ -202,7 +200,7 @@ impl SmartModuleChainInstance {
debug!("look_back on instance");
let records: Vec<Record> = read_fn(lookback).await?;
let input: SmartModuleInput =
SmartModuleInput::try_from_records(records, DEFAULT_SMARTENGINE_VERSION)?;
SmartModuleInput::try_from_records(records, instance.version())?;

metrics.add_bytes_in(input.raw_bytes().len() as u64);
self.store.top_up_fuel();
Expand Down
14 changes: 11 additions & 3 deletions crates/fluvio-smartengine/src/engine/wasmtime/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tracing::debug;
use anyhow::{Error, Result};
use wasmtime::{Memory, Module, Caller, Extern, Instance, Func, AsContextMut, AsContext};

use fluvio_protocol::{Encoder, Decoder};
use fluvio_protocol::{Encoder, Decoder, Version};

use fluvio_smartmodule::dataplane::smartmodule::{
SmartModuleExtraParams, SmartModuleInput, SmartModuleOutput, SmartModuleInitInput,
Expand All @@ -25,6 +25,7 @@ pub(crate) struct SmartModuleInstance {
init: Option<SmartModuleInit>,
look_back: Option<SmartModuleLookBack>,
transform: Box<dyn DowncastableTransform>,
version: Version,
}

impl SmartModuleInstance {
Expand All @@ -44,12 +45,14 @@ impl SmartModuleInstance {
init: Option<SmartModuleInit>,
look_back: Option<SmartModuleLookBack>,
transform: Box<dyn DowncastableTransform>,
version: Version,
) -> Self {
Self {
ctx,
init,
look_back,
transform,
version,
}
}

Expand Down Expand Up @@ -90,13 +93,18 @@ impl SmartModuleInstance {
self.look_back.as_ref()?; // return None if there is no function
self.ctx.lookback
}

/// Retrieves SmartModule Version
pub fn version(&self) -> Version {
self.version
}
}

pub(crate) struct SmartModuleInstanceContext {
instance: Instance,
records_cb: Arc<RecordsCallBack>,
params: SmartModuleExtraParams,
version: i16,
version: Version,
lookback: Option<Lookback>,
}

Expand All @@ -113,7 +121,7 @@ impl SmartModuleInstanceContext {
state: &mut WasmState,
module: Module,
params: SmartModuleExtraParams,
version: i16,
version: Version,
lookback: Option<Lookback>,
) -> Result<Self, EngineError> {
debug!("creating WasmModuleInstance");
Expand Down

0 comments on commit e0041f3

Please sign in to comment.