Skip to content

Commit

Permalink
refactor: use common version for all SPU api (#3216)
Browse files Browse the repository at this point in the history
In the SPU Schema, the same version is used for all APIs similar to SC.   In the PR, the SPU schema version is bumped up to 19 so that deprecated fields can be in the next release.   Deprecated fields are hidden in the StreamFetchRequest and converted to the builder pattern.
  • Loading branch information
sehz committed May 5, 2023
1 parent 707d574 commit b155503
Show file tree
Hide file tree
Showing 14 changed files with 235 additions and 263 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ fluvio-hub-util = { path = "crates/fluvio-hub-util" }
fluvio-extension-common = { path = "crates/fluvio-extension-common", default-features = false }
fluvio-package-index = { version = "0.7.0", path = "crates/fluvio-package-index", default-features = false }
fluvio-protocol = { version = "0.9.0", path = "crates/fluvio-protocol" }
fluvio-spu-schema = { version = "0.13.0", path = "crates/fluvio-spu-schema", default-features = false }
fluvio-spu-schema = { version = "0.14.0", path = "crates/fluvio-spu-schema", default-features = false }
fluvio-sc-schema = { version = "0.19.0", path = "crates/fluvio-sc-schema", default-features = false }
fluvio-service = { path = "crates/fluvio-service" }
fluvio-socket = { version = "0.14.2", path = "crates/fluvio-socket", default-features = false }
Expand Down
10 changes: 5 additions & 5 deletions crates/fluvio-spu-schema/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fluvio-spu-schema"
version = "0.13.2"
version = "0.14.0"
edition = "2021"
authors = ["Fluvio Contributors <team@fluvio.io>"]
description = "Fluvio API for SPU"
Expand All @@ -15,13 +15,13 @@ path = "src/lib.rs"
file = ["fluvio-future","fluvio-protocol/store"]

[dependencies]

tracing = { workspace = true }
bytes = { workspace = true }
serde = { workspace = true, features = ['derive'] }
static_assertions = { workspace = true }
derive_builder = { workspace = true }
educe = { version = "0.4.19", features = ["Debug"] }
flate2 = { workspace = true }
serde = { workspace = true, features = ['derive'] }
static_assertions = { workspace = true }
tracing = { workspace = true }

fluvio-types = { workspace = true }
fluvio-future = { workspace = true, optional = true }
Expand Down
4 changes: 3 additions & 1 deletion crates/fluvio-spu-schema/src/client/offset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use fluvio_protocol::{Encoder, Decoder};
use fluvio_protocol::record::ReplicaKey;
use fluvio_protocol::record::Offset;

use crate::COMMON_VERSION;

use super::SpuClientApiKey;

// -----------------------------------
Expand All @@ -29,7 +31,7 @@ pub struct ReplicaOffsetUpdate {

impl Request for ReplicaOffsetUpdateRequest {
const API_KEY: u16 = SpuClientApiKey::ReplicaOffsetUpdate as u16;
const DEFAULT_API_VERSION: i16 = 0;
const DEFAULT_API_VERSION: i16 = COMMON_VERSION;
type Response = ReplicaOffsetUpdateResponse;
}

Expand Down
4 changes: 2 additions & 2 deletions crates/fluvio-spu-schema/src/fetch/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use fluvio_protocol::derive::FluvioDefault;
use fluvio_protocol::record::RecordSet;
use fluvio_types::PartitionId;

use crate::COMMON_VERSION;
use crate::isolation::Isolation;

use super::FetchResponse;
Expand Down Expand Up @@ -51,8 +52,7 @@ where
const API_KEY: u16 = 1;

const MIN_API_VERSION: i16 = 0;
const MAX_API_VERSION: i16 = 10;
const DEFAULT_API_VERSION: i16 = 10;
const DEFAULT_API_VERSION: i16 = COMMON_VERSION;

type Response = FetchResponse<R>;
}
Expand Down
3 changes: 3 additions & 0 deletions crates/fluvio-spu-schema/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,6 @@ pub mod errors {

pub use fluvio_protocol::link::versions::{ApiVersions, ApiVersionsRequest, ApiVersionsResponse};
pub use isolation::*;

/// Default API version for all API
pub(crate) const COMMON_VERSION: i16 = 19;
1 change: 0 additions & 1 deletion crates/fluvio-spu-schema/src/produce/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ where
const API_KEY: u16 = 0;

const MIN_API_VERSION: i16 = 0;
const MAX_API_VERSION: i16 = PRODUCER_TRANSFORMATION_API_VERSION;
const DEFAULT_API_VERSION: i16 = PRODUCER_TRANSFORMATION_API_VERSION;

type Response = ProduceResponse;
Expand Down
3 changes: 2 additions & 1 deletion crates/fluvio-spu-schema/src/server/fetch_offset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use fluvio_protocol::record::ReplicaKey;

use fluvio_types::PartitionId;

use crate::COMMON_VERSION;
use crate::errors::ErrorCode;
use super::SpuServerApiKey;

Expand All @@ -27,7 +28,7 @@ pub struct FetchOffsetsRequest {

impl Request for FetchOffsetsRequest {
const API_KEY: u16 = SpuServerApiKey::FetchOffsets as u16;
const DEFAULT_API_VERSION: i16 = 0;
const DEFAULT_API_VERSION: i16 = COMMON_VERSION;
type Response = FetchOffsetsResponse;
}

Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-spu-schema/src/server/smartmodule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use fluvio_smartmodule::dataplane::smartmodule::SmartModuleExtraParams;
since = "0.10.0",
note = "will be removed in the next version. Use SmartModuleInvocation instead "
)]
pub struct LegacySmartModulePayload {
pub(crate) struct LegacySmartModulePayload {
pub wasm: SmartModuleWasmCompressed,
pub kind: SmartModuleKind,
pub params: SmartModuleExtraParams,
Expand Down
49 changes: 34 additions & 15 deletions crates/fluvio-spu-schema/src/server/stream_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@ use std::fmt::{Debug};
use std::marker::PhantomData;

use educe::Educe;
use derive_builder::Builder;

use fluvio_protocol::record::RawRecords;
use fluvio_protocol::{Encoder, Decoder};
use fluvio_protocol::api::Request;

use fluvio_protocol::record::RecordSet;

use fluvio_smartmodule::dataplane::smartmodule::SmartModuleExtraParams;
use fluvio_types::PartitionId;
use fluvio_types::{PartitionId, defaults::FLUVIO_CLIENT_MAX_FETCH_BYTES};

use crate::COMMON_VERSION;
use crate::fetch::FetchablePartitionResponse;
use crate::isolation::Isolation;

Expand Down Expand Up @@ -48,43 +49,61 @@ pub const CHAIN_SMARTMODULE_API: i16 = 18;
/// Fetch records continuously
/// Output will be send back as stream
#[allow(deprecated)]
#[derive(Decoder, Encoder, Default, Educe)]
#[derive(Decoder, Encoder, Builder, Default, Educe)]
#[builder(setter(into))]
#[educe(Debug)]
pub struct StreamFetchRequest<R> {
pub topic: String,
#[builder(default = "0")]
pub partition: PartitionId,
#[builder(default = "0")]
pub fetch_offset: i64,
#[builder(default = "FLUVIO_CLIENT_MAX_FETCH_BYTES")]
pub max_bytes: i32,
#[builder(default = "Isolation::ReadUncommitted")]
pub isolation: Isolation,
/// no longer used, but keep to avoid breaking compatibility, this will not be honored
// TODO: remove in 0.10
// these private fields will be removed
#[educe(Debug(ignore))]
#[fluvio(min_version = 11)]
pub wasm_module: Vec<u8>,
// TODO: remove in 0.10
#[builder(setter(skip))]
#[fluvio(min_version = 11, max_version = 18)]
wasm_module: Vec<u8>,
#[builder(setter(skip))]
#[fluvio(min_version = 12, max_version = 18)]
pub wasm_payload: Option<LegacySmartModulePayload>,
wasm_payload: Option<LegacySmartModulePayload>,
#[builder(setter(skip))]
#[fluvio(min_version = 16, max_version = 18)]
pub smartmodule: Option<SmartModuleInvocation>,
smartmodule: Option<SmartModuleInvocation>,
#[builder(setter(skip))]
#[fluvio(min_version = 16, max_version = 18)]
pub derivedstream: Option<DerivedStreamInvocation>,
derivedstream: Option<DerivedStreamInvocation>,
#[builder(default)]
#[fluvio(min_version = 18)]
pub smartmodules: Vec<SmartModuleInvocation>,
pub data: PhantomData<R>,
#[builder(setter(skip))]
data: PhantomData<R>,
}

impl<R> StreamFetchRequest<R>
where
R: Clone,
{
pub fn builder() -> StreamFetchRequestBuilder<R> {
StreamFetchRequestBuilder::default()
}
}

impl<R> Request for StreamFetchRequest<R>
where
R: Debug + Decoder + Encoder,
{
const API_KEY: u16 = SpuServerApiKey::StreamFetch as u16;
const DEFAULT_API_VERSION: i16 = CHAIN_SMARTMODULE_API;
const DEFAULT_API_VERSION: i16 = COMMON_VERSION;
type Response = StreamFetchResponse<R>;
}

///
#[derive(Debug, Default, Clone, Encoder, Decoder)]
pub struct DerivedStreamInvocation {
pub(crate) struct DerivedStreamInvocation {
pub stream: String,
pub params: SmartModuleExtraParams,
}
Expand Down
Loading

0 comments on commit b155503

Please sign in to comment.