Skip to content

Commit

Permalink
feat: reusable generator for sm input
Browse files Browse the repository at this point in the history
  • Loading branch information
EstebanBorai committed Jul 25, 2023
1 parent 8c0472f commit a2140e9
Show file tree
Hide file tree
Showing 18 changed files with 133 additions and 209 deletions.
16 changes: 16 additions & 0 deletions crates/fluvio-smartmodule-derive/src/ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,22 @@ pub enum SmartModuleKind {
FilterMap,
}

impl ToString for SmartModuleKind {
fn to_string(&self) -> String {
let string = match self {
SmartModuleKind::Init => "init",
SmartModuleKind::LookBack => "look_back",
SmartModuleKind::Aggregate => "aggregate",
SmartModuleKind::Filter => "filter",
SmartModuleKind::Map => "map",
SmartModuleKind::ArrayMap => "array_map",
SmartModuleKind::FilterMap => "filter_map",
};

string.to_string()
}
}

impl SmartModuleKind {
fn parse(meta: ParseNestedMeta) -> SynResult<Option<Self>> {
let maybee_ss_type = match &*meta
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ pub fn generate_aggregate_smartmodule(sm_func: &SmartModuleFn) -> TokenStream {

let mut accumulator = smartmodule_input.accumulator;
let base_offset = smartmodule_input.base.base_offset();
let base_timestamp = smartmodule_input.base.base_timestamp();

#records_code

Expand Down
8 changes: 2 additions & 6 deletions crates/fluvio-smartmodule-derive/src/generator/array_map.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,21 @@
use quote::quote;
use proc_macro2::TokenStream;

use crate::SmartModuleFn;
use crate::util::ident;
use crate::{SmartModuleFn, SmartModuleKind};

use super::transform::generate_transform;

pub fn generate_array_map_smartmodule(func: &SmartModuleFn) -> TokenStream {
let user_fn = &func.name;

let function_call = quote!(
super:: #user_fn(&record)
);

generate_transform(
ident("array_map"),
SmartModuleKind::ArrayMap,
func,
quote! {
for mut record in records.into_iter() {
use fluvio_smartmodule::SmartModuleRecord;

let result = #function_call;

match result {
Expand Down
8 changes: 3 additions & 5 deletions crates/fluvio-smartmodule-derive/src/generator/filter.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
use quote::quote;
use proc_macro2::TokenStream;

use crate::SmartModuleFn;
use crate::util::ident;
use crate::{SmartModuleFn, SmartModuleKind};

use super::transform::generate_transform;

pub fn generate_filter_smartmodule(func: &SmartModuleFn) -> TokenStream {
let user_fn = &func.name;

let function_call = quote!(
super:: #user_fn(&record)
);

generate_transform(
ident("filter"),
SmartModuleKind::Filter,
func,
quote! {
for mut record in records.into_iter() {
use fluvio_smartmodule::SmartModuleRecord;
use fluvio_smartmodule::dataplane::smartmodule::SmartModuleTransformRuntimeError;

let result = #function_call;

Expand Down
7 changes: 2 additions & 5 deletions crates/fluvio-smartmodule-derive/src/generator/filter_map.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use quote::quote;
use proc_macro2::TokenStream;

use crate::SmartModuleFn;
use crate::util::ident;
use crate::{SmartModuleFn, SmartModuleKind};

use super::transform::generate_transform;

Expand All @@ -14,12 +13,10 @@ pub fn generate_filter_map_smartmodule(func: &SmartModuleFn) -> TokenStream {
);

generate_transform(
ident("filter_map"),
SmartModuleKind::FilterMap,
func,
quote! {
for mut record in records.into_iter() {
use fluvio_smartmodule::SmartModuleRecord;

let result = #function_call;

match result {
Expand Down
3 changes: 0 additions & 3 deletions crates/fluvio-smartmodule-derive/src/generator/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ pub fn generate_init_smartmodule(func: &SmartModuleFn) -> TokenStream {
};
use fluvio_smartmodule::dataplane::core::{Decoder,Encoder};




let input_data = Vec::from_raw_parts(ptr, len, len);
let mut input = SmartModuleInitInput::default();
if let Err(_err) =
Expand Down
33 changes: 11 additions & 22 deletions crates/fluvio-smartmodule-derive/src/generator/look_back.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use quote::quote;
use proc_macro2::TokenStream;
use crate::SmartModuleFn;
use quote::quote;

use crate::generator::{SmartModuleFn, SmartModuleKind, generate_records_code};

// generate look_back
pub fn generate_look_back_smartmodule(func: &SmartModuleFn) -> TokenStream {
let user_fn = func.name;
let user_code = func.func;
pub fn generate_look_back_smartmodule(sm_func: &SmartModuleFn) -> TokenStream {
let user_fn = sm_func.name;
let user_code = sm_func.func;
let records_code = generate_records_code(sm_func, &SmartModuleKind::LookBack);

quote! {
#[allow(dead_code)]
Expand All @@ -18,36 +20,23 @@ pub fn generate_look_back_smartmodule(func: &SmartModuleFn) -> TokenStream {
#[allow(clippy::missing_safety_doc)]
pub unsafe fn look_back(ptr: *mut u8, len: usize, version: i16) -> i32 {
use fluvio_smartmodule::dataplane::smartmodule::{
SmartModuleInput, SmartModuleLookbackErrorStatus,
SmartModuleLookbackRuntimeError, SmartModuleKind, SmartModuleLookbackOutput,
SmartModuleLookbackErrorStatus,
SmartModuleLookbackRuntimeError, SmartModuleKind, SmartModuleLookbackOutput
};
use fluvio_smartmodule::dataplane::core::{Encoder, Decoder};
use fluvio_smartmodule::dataplane::record::{Record, RecordData};

// DECODING
extern "C" {
fn copy_records(putr: i32, len: i32);
}

let input_data = Vec::from_raw_parts(ptr, len, len);
let mut smartmodule_input = SmartModuleInput::default();
if let Err(_err) = Decoder::decode(&mut smartmodule_input, &mut std::io::Cursor::new(input_data), version) {
return SmartModuleLookbackErrorStatus::DecodingBaseInput as i32;
}

let base_offset = smartmodule_input.base_offset();
let base_timestamp = smartmodule_input.base_timestamp();
let records_input = smartmodule_input.into_raw_bytes();
let mut records: Vec<Record> = vec![];
#records_code

if let Err(_err) = Decoder::decode(&mut records, &mut std::io::Cursor::new(records_input), version) {
return SmartModuleLookbackErrorStatus::DecodingRecords as i32;
};
let base_offset = smartmodule_input.base_offset();

// PROCESSING
for record in records.into_iter() {
use fluvio_smartmodule::SmartModuleRecord;

let result = super:: #user_fn(&record);

if let Err(err) = result {
Expand Down
5 changes: 2 additions & 3 deletions crates/fluvio-smartmodule-derive/src/generator/map.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use quote::quote;
use proc_macro2::TokenStream;

use crate::SmartModuleFn;
use crate::util::ident;
use crate::{SmartModuleFn, SmartModuleKind};

use super::transform::generate_transform;

Expand All @@ -13,7 +12,7 @@ pub fn generate_map_smartmodule(func: &SmartModuleFn) -> TokenStream {
);

generate_transform(
ident("map"),
SmartModuleKind::Map,
func,
quote! {
for mut record in records.into_iter() {
Expand Down
75 changes: 72 additions & 3 deletions crates/fluvio-smartmodule-derive/src/generator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
use proc_macro2::TokenStream;
use crate::{SmartModuleConfig, SmartModuleFn, SmartModuleKind};

mod filter;
mod map;
mod array_map;
Expand All @@ -9,8 +6,15 @@ mod aggregate;
mod init;
mod transform;
mod look_back;

pub mod opt;

use proc_macro2::TokenStream;
use quote::quote;

use crate::ast::RecordKind;
use crate::{SmartModuleConfig, SmartModuleFn, SmartModuleKind};

pub fn generate_smartmodule(config: &SmartModuleConfig, func: &SmartModuleFn) -> TokenStream {
match config.kind.as_ref().expect("Smartmodule type not set") {
SmartModuleKind::Filter => self::filter::generate_filter_smartmodule(func),
Expand All @@ -22,3 +26,68 @@ pub fn generate_smartmodule(config: &SmartModuleConfig, func: &SmartModuleFn) ->
SmartModuleKind::LookBack => self::look_back::generate_look_back_smartmodule(func),
}
}

/// Generates the `SmartModuleFn` records decoding code based on the `RecordKind`
/// provided by the `SmartModuleFn`. This generator needs at local `input_data`
/// variable to be in scope, which is the raw bytes of the `SmartModuleInput`.
///
/// This `TokenStream` expands to two variables:
///
/// - `smartmodule_input`: The decoded `SmartModuleInput` struct from `input_data`
/// - `records`: The decoded `Vec<Record>` or `Vec<SmartModuleRecord>` from `smartmodule_input`
///
pub fn generate_records_code(sm_func: &SmartModuleFn, kind: &SmartModuleKind) -> TokenStream {
// Provides the corresponding error enum for the SmartModuleKind
let sm_input_decode_error_code = match kind {
SmartModuleKind::Init
| SmartModuleKind::ArrayMap
| SmartModuleKind::FilterMap
| SmartModuleKind::Map
| SmartModuleKind::Filter
| SmartModuleKind::Aggregate => quote! {
use fluvio_smartmodule::dataplane::smartmodule::SmartModuleTransformErrorStatus;

return SmartModuleTransformErrorStatus::DecodingBaseInput as i32;
},
SmartModuleKind::LookBack => quote! {
use fluvio_smartmodule::dataplane::smartmodule::SmartModuleLookbackErrorStatus;

return SmartModuleLookbackErrorStatus::DecodingBaseInput as i32;
},
};

let sm_input_code = quote! {
use fluvio_smartmodule::dataplane::record::{Record, RecordData};
use fluvio_smartmodule::dataplane::smartmodule::{SmartModuleInput, SmartModuleRecord};
use fluvio_smartmodule::dataplane::core::{Encoder, Decoder};

let mut smartmodule_input = SmartModuleInput::default();

if let Err(_err) = Decoder::decode(&mut smartmodule_input, &mut std::io::Cursor::new(input_data), version) {
#sm_input_decode_error_code
}
};

match sm_func.record_kind {
RecordKind::LegacyRecord => quote! {
#sm_input_code

let mut records: Vec<Record> = match smartmodule_input.clone().try_into_records(version) {
Ok(records) => records,
Err(_) => {
#sm_input_decode_error_code
}
};
},
RecordKind::SmartModuleRecord => quote! {
#sm_input_code

let mut records: Vec<SmartModuleRecord> = match smartmodule_input.clone().try_into_smartmodule_records(version) {
Ok(records) => records,
Err(_) => {
#sm_input_decode_error_code
}
};
},
}
}
44 changes: 11 additions & 33 deletions crates/fluvio-smartmodule-derive/src/generator/transform.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,21 @@
use quote::quote;
use proc_macro2::TokenStream;
use syn::Ident;

use crate::ast::{SmartModuleFn, RecordKind};
use crate::SmartModuleKind;
use crate::ast::SmartModuleFn;
use crate::generator::generate_records_code;
use crate::util::generate_ident;

pub(crate) fn generate_transform(
name: Ident,
sm_kind: SmartModuleKind,
sm_func: &SmartModuleFn,
transform: TokenStream,
) -> TokenStream {
let user_code = &sm_func.func;
let records_code = match sm_func.record_kind {
RecordKind::LegacyRecord => quote! {
let records: Vec<Record> = match smartmodule_input.try_into_records(version) {
Ok(records) => records,
Err(_) => {
return SmartModuleTransformErrorStatus::DecodingRecords as i32;
}
};
},
RecordKind::SmartModuleRecord => quote! {
let records: Vec<SmartModuleRecord> = match smartmodule_input.try_into_smartmodule_records(version) {
Ok(records) => records,
Err(_) => {
return SmartModuleTransformErrorStatus::DecodingRecords as i32;
}
};
},
};
let name = generate_ident(&sm_kind);
let records_code = generate_records_code(sm_func, &sm_kind);

quote! {

#[allow(dead_code)]
#user_code

Expand All @@ -39,29 +24,22 @@ pub(crate) fn generate_transform(
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe fn #name(ptr: *mut u8, len: usize, version: i16) -> i32 {
use fluvio_smartmodule::dataplane::smartmodule::{
SmartModuleInput, SmartModuleTransformErrorStatus,
SmartModuleTransformRuntimeError, SmartModuleKind, SmartModuleOutput, SmartModuleRecord
use fluvio_smartmodule::dataplane::smartmodule::{SmartModuleTransformErrorStatus,
SmartModuleTransformRuntimeError, SmartModuleKind, SmartModuleOutput
};
use fluvio_smartmodule::dataplane::core::{Encoder, Decoder};
use fluvio_smartmodule::dataplane::record::{Record, RecordData};

// DECODING
extern "C" {
fn copy_records(putr: i32, len: i32);
}

let input_data = Vec::from_raw_parts(ptr, len, len);
let mut smartmodule_input = SmartModuleInput::default();
if let Err(_err) = Decoder::decode(&mut smartmodule_input, &mut std::io::Cursor::new(input_data), version) {
return SmartModuleTransformErrorStatus::DecodingBaseInput as i32;
}

#records_code

let base_offset = smartmodule_input.base_offset();
let base_timestamp = smartmodule_input.base_timestamp();

#records_code

// PROCESSING
let mut output = SmartModuleOutput {
successes: Vec::with_capacity(records.len()),
Expand Down
8 changes: 5 additions & 3 deletions crates/fluvio-smartmodule-derive/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use proc_macro::TokenStream;
use syn::{DeriveInput, ItemFn, parse_macro_input};
use crate::ast::{SmartModuleConfig, SmartModuleFn, SmartModuleKind};
mod ast;
mod util;
mod generator;

use proc_macro::TokenStream;
use syn::{DeriveInput, ItemFn, parse_macro_input};

use crate::ast::{SmartModuleConfig, SmartModuleFn, SmartModuleKind};

#[proc_macro_attribute]
pub fn smartmodule(args: TokenStream, input: TokenStream) -> TokenStream {
use crate::generator::generate_smartmodule;
Expand Down
6 changes: 4 additions & 2 deletions crates/fluvio-smartmodule-derive/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use proc_macro2::Span;

pub(crate) fn ident(ident: &str) -> syn::Ident {
syn::Ident::new(ident, Span::call_site())
use crate::SmartModuleKind;

pub(crate) fn generate_ident(kind: &SmartModuleKind) -> syn::Ident {
syn::Ident::new(&kind.to_string(), Span::call_site())
}
Loading

0 comments on commit a2140e9

Please sign in to comment.