diff --git a/Cargo.lock b/Cargo.lock index 4fc55e8b5c..4e47668b61 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2925,7 +2925,7 @@ dependencies = [ [[package]] name = "fluvio-sc-schema" -version = "0.19.1" +version = "0.19.2" dependencies = [ "anyhow", "fluvio-controlplane-metadata", diff --git a/crates/fluvio-cli/src/client/topic/describe.rs b/crates/fluvio-cli/src/client/topic/describe.rs index c0afc60251..432fdfb77d 100644 --- a/crates/fluvio-cli/src/client/topic/describe.rs +++ b/crates/fluvio-cli/src/client/topic/describe.rs @@ -48,6 +48,7 @@ mod display { use fluvio::metadata::topic::ReplicaSpec; use comfy_table::Row; + use humantime::format_duration; use serde::Serialize; use fluvio::metadata::objects::Metadata; @@ -145,6 +146,24 @@ mod display { } } + if let Some(dedup) = spec.get_deduplication() { + key_values.push(( + "Deduplication Filter".to_owned(), + Some(dedup.filter.transform.uses.clone()), + )); + key_values.push(( + "Deduplication Count Bound".to_owned(), + Some(dedup.bounds.count) + .filter(|c| *c != 0) + .as_ref() + .map(ToString::to_string), + )); + key_values.push(( + "Deduplication Age Bound".to_owned(), + dedup.bounds.age.map(|a| format_duration(a).to_string()), + )); + }; + key_values.push(( "Status".to_owned(), Some(status.resolution.resolution_label().to_string()), diff --git a/crates/fluvio-cli/src/client/topic/list.rs b/crates/fluvio-cli/src/client/topic/list.rs index 8ed127ef1f..0af1e6188b 100644 --- a/crates/fluvio-cli/src/client/topic/list.rs +++ b/crates/fluvio-cli/src/client/topic/list.rs @@ -87,6 +87,7 @@ mod display { "REPLICAS", "RETENTION TIME", "COMPRESSION", + "DEDUPLICATION", "STATUS", "REASON", ]) @@ -113,6 +114,12 @@ mod display { topic.retention_secs() as u64 ))), Cell::new(topic.get_compression_type()), + Cell::new( + topic + .get_deduplication() + .map(|d| d.filter.transform.uses.as_str()) + .unwrap_or("none"), + ), Cell::new(metadata.status.resolution.to_string()), Cell::new(metadata.status.reason.to_string()), ]) diff --git a/crates/fluvio-connector-package/src/config/mod.rs b/crates/fluvio-connector-package/src/config/mod.rs index 48b8a8598e..6b3a7d2961 100644 --- a/crates/fluvio-connector-package/src/config/mod.rs +++ b/crates/fluvio-connector-package/src/config/mod.rs @@ -478,7 +478,7 @@ mod tests { TopicConfigBuilder, MetaConfigBuilder, MetaConfig, PartitionConfig, RetentionConfig, CompressionConfig, }, - CompressionAlgorithm, + CompressionAlgorithm, Deduplication, Bounds, Filter, Transform, }; use fluvio_smartengine::transformation::{TransformationStep, Lookback}; use pretty_assertions::assert_eq; @@ -556,6 +556,19 @@ mod tests { compression: CompressionConfig { type_: CompressionAlgorithm::Lz4, }, + deduplication: Some(Deduplication { + bounds: Bounds { + count: 100, + age: Some(Duration::from_secs(60)), + }, + filter: Filter { + transform: Transform { + uses: "infinyon/fluvio-smartmodule-filter-lookback@0.1.0" + .to_string(), + with: Default::default(), + }, + }, + }), }, version: "0.1.0".to_string(), producer: Some(ProducerParameters { diff --git a/crates/fluvio-connector-package/test-data/connectors/full-config-v2.yaml b/crates/fluvio-connector-package/test-data/connectors/full-config-v2.yaml index 13a0701e29..a741862fcf 100644 --- a/crates/fluvio-connector-package/test-data/connectors/full-config-v2.yaml +++ b/crates/fluvio-connector-package/test-data/connectors/full-config-v2.yaml @@ -17,6 +17,13 @@ meta: segment-size: 2.0 KB compression: type: Lz4 + deduplication: + bounds: + count: 100 + age: 1m + filter: + transform: + uses: infinyon/fluvio-smartmodule-filter-lookback@0.1.0 producer: linger: 1ms batch-size: "44.0 MB" diff --git a/crates/fluvio-controlplane-metadata/src/partition/replica.rs b/crates/fluvio-controlplane-metadata/src/partition/replica.rs index bfed5dea5e..339f3a537f 100644 --- a/crates/fluvio-controlplane-metadata/src/partition/replica.rs +++ b/crates/fluvio-controlplane-metadata/src/partition/replica.rs @@ -6,10 +6,10 @@ use fluvio_protocol::{Encoder, Decoder}; use fluvio_types::SpuId; use crate::partition::ReplicaKey; -use crate::core::{MetadataItem}; +use crate::core::MetadataItem; use crate::store::MetadataStoreObject; use crate::partition::PartitionSpec; -use crate::topic::{CleanupPolicy, TopicStorageConfig, CompressionAlgorithm}; +use crate::topic::{CleanupPolicy, TopicStorageConfig, CompressionAlgorithm, Deduplication}; use super::store::*; /// Metadata about Replica send from SC @@ -22,6 +22,7 @@ pub struct Replica { pub cleanup_policy: Option, pub storage: Option, pub compression_type: CompressionAlgorithm, + pub deduplication: Option, } impl Replica { @@ -64,6 +65,7 @@ where cleanup_policy: spec.cleanup_policy, storage: spec.storage, compression_type: spec.compression_type, + deduplication: spec.deduplication, } } } diff --git a/crates/fluvio-controlplane-metadata/src/partition/spec.rs b/crates/fluvio-controlplane-metadata/src/partition/spec.rs index 54b591edc6..317db7e8a4 100644 --- a/crates/fluvio-controlplane-metadata/src/partition/spec.rs +++ b/crates/fluvio-controlplane-metadata/src/partition/spec.rs @@ -7,7 +7,7 @@ use fluvio_types::SpuId; use fluvio_protocol::{Encoder, Decoder}; -use crate::topic::{CleanupPolicy, TopicStorageConfig, TopicSpec, CompressionAlgorithm}; +use crate::topic::{CleanupPolicy, TopicStorageConfig, TopicSpec, CompressionAlgorithm, Deduplication}; /// Spec for Partition /// Each partition has replicas spread among SPU @@ -28,6 +28,8 @@ pub struct PartitionSpec { #[cfg_attr(feature = "use_serde", serde(default))] #[fluvio(min_version = 6)] pub compression_type: CompressionAlgorithm, + #[fluvio(min_version = 12)] + pub deduplication: Option, } impl PartitionSpec { @@ -49,6 +51,7 @@ impl PartitionSpec { cleanup_policy: topic.get_clean_policy().cloned(), storage: topic.get_storage().cloned(), compression_type: topic.get_compression_type().clone(), + deduplication: topic.get_deduplication().cloned(), } } diff --git a/crates/fluvio-controlplane-metadata/src/topic/config.rs b/crates/fluvio-controlplane-metadata/src/topic/config.rs index 2af7bc0906..159e57fa10 100644 --- a/crates/fluvio-controlplane-metadata/src/topic/config.rs +++ b/crates/fluvio-controlplane-metadata/src/topic/config.rs @@ -7,7 +7,7 @@ use crate::topic::{ ReplicaSpec, TopicReplicaParam, SegmentBasedPolicy, CleanupPolicy, TopicStorageConfig, }; -use super::{TopicSpec, PartitionMap, CompressionAlgorithm}; +use super::{TopicSpec, PartitionMap, CompressionAlgorithm, deduplication::Deduplication}; const DEFAULT_PARTITION_COUNT: PartitionCount = 1; const DEFAULT_REPLICATION_FACTOR: ReplicationFactor = 1; @@ -38,6 +38,13 @@ pub struct TopicConfig { #[builder(default)] #[cfg_attr(feature = "use_serde", serde(default))] pub compression: CompressionConfig, + + #[builder(default)] + #[cfg_attr( + feature = "use_serde", + serde(default, skip_serializing_if = "Option::is_none") + )] + pub deduplication: Option, } #[derive(Debug, Default, Builder, Clone, PartialEq, Eq)] @@ -156,6 +163,7 @@ impl From for TopicSpec { }; topic_spec.set_compression_type(config.compression.type_); + topic_spec.set_deduplication(config.deduplication); if segment_size.is_some() || max_partition_size.is_some() { topic_spec.set_storage(TopicStorageConfig { @@ -174,6 +182,8 @@ fn default_version() -> String { #[cfg(test)] mod tests { + use crate::topic::deduplication::{Bounds, Filter, Transform}; + use super::*; #[cfg(feature = "use_serde")] @@ -198,6 +208,13 @@ retention: segment-size: 2.0 KB compression: type: Lz4 +deduplication: + bounds: + count: 100 + age: 1m + filter: + transform: + uses: infinyon/dedup-filter@0.1.0 "#; //when @@ -292,6 +309,7 @@ compression: segment_size: Some(2000), max_partition_size: Some(1000), }); + test_spec.set_deduplication(Some(test_deduplication())); assert_eq!(spec, test_spec); } @@ -319,6 +337,22 @@ compression: compression: CompressionConfig { type_: CompressionAlgorithm::Lz4, }, + deduplication: Some(test_deduplication()), + } + } + + fn test_deduplication() -> Deduplication { + Deduplication { + bounds: Bounds { + count: 100, + age: Some(Duration::from_secs(60)), + }, + filter: Filter { + transform: Transform { + uses: "infinyon/dedup-filter@0.1.0".to_string(), + with: Default::default(), + }, + }, } } } diff --git a/crates/fluvio-controlplane-metadata/src/topic/deduplication.rs b/crates/fluvio-controlplane-metadata/src/topic/deduplication.rs new file mode 100644 index 0000000000..852e648027 --- /dev/null +++ b/crates/fluvio-controlplane-metadata/src/topic/deduplication.rs @@ -0,0 +1,65 @@ +use std::{time::Duration, collections::BTreeMap}; + +use derive_builder::Builder; +use fluvio_protocol::{Encoder, Decoder}; + +#[derive(Debug, Default, Builder, Clone, PartialEq, Eq, Encoder, Decoder)] +#[cfg_attr( + feature = "use_serde", + derive(serde::Serialize, serde::Deserialize), + serde(rename_all = "kebab-case") +)] +pub struct Deduplication { + pub bounds: Bounds, + pub filter: Filter, +} + +#[derive(Debug, Default, Builder, Clone, PartialEq, Eq, Encoder, Decoder)] +#[cfg_attr( + feature = "use_serde", + derive(serde::Serialize, serde::Deserialize), + serde(rename_all = "kebab-case") +)] +pub struct Bounds { + #[cfg_attr(feature = "use_serde", serde(default, skip_serializing_if = "is_zero"))] + pub count: u64, + #[cfg_attr( + feature = "use_serde", + serde( + default, + skip_serializing_if = "Option::is_none", + with = "humantime_serde" + ) + )] + pub age: Option, +} + +#[derive(Debug, Default, Builder, Clone, PartialEq, Eq, Encoder, Decoder)] +#[cfg_attr( + feature = "use_serde", + derive(serde::Serialize, serde::Deserialize), + serde(rename_all = "kebab-case") +)] +pub struct Filter { + pub transform: Transform, +} + +#[derive(Debug, Default, Builder, Clone, PartialEq, Eq, Encoder, Decoder)] +#[cfg_attr( + feature = "use_serde", + derive(serde::Serialize, serde::Deserialize), + serde(rename_all = "kebab-case") +)] +pub struct Transform { + pub uses: String, + #[cfg_attr( + feature = "use_serde", + serde(default, skip_serializing_if = "BTreeMap::is_empty") + )] + pub with: BTreeMap, +} + +#[cfg(feature = "use_serde")] +fn is_zero(val: &u64) -> bool { + *val == 0 +} diff --git a/crates/fluvio-controlplane-metadata/src/topic/mod.rs b/crates/fluvio-controlplane-metadata/src/topic/mod.rs index 2ae2ccfc9b..f71f8e59e3 100644 --- a/crates/fluvio-controlplane-metadata/src/topic/mod.rs +++ b/crates/fluvio-controlplane-metadata/src/topic/mod.rs @@ -1,10 +1,12 @@ mod spec; mod status; +mod deduplication; pub mod store; pub mod config; pub use self::spec::*; pub use self::status::*; +pub use self::deduplication::*; pub const PENDING_REASON: &str = "waiting for live spus"; diff --git a/crates/fluvio-controlplane-metadata/src/topic/spec.rs b/crates/fluvio-controlplane-metadata/src/topic/spec.rs index 795768b124..8904c88ecb 100644 --- a/crates/fluvio-controlplane-metadata/src/topic/spec.rs +++ b/crates/fluvio-controlplane-metadata/src/topic/spec.rs @@ -19,6 +19,8 @@ use fluvio_types::{ReplicaMap, SpuId}; use fluvio_types::{PartitionId, PartitionCount, ReplicationFactor, IgnoreRackAssignment}; use fluvio_protocol::{Encoder, Decoder}; +use super::deduplication::Deduplication; + #[derive(Debug, Clone, PartialEq, Default, Encoder, Decoder)] #[cfg_attr( feature = "use_serde", @@ -34,6 +36,9 @@ pub struct TopicSpec { #[cfg_attr(feature = "use_serde", serde(default))] #[fluvio(min_version = 6)] compression_type: CompressionAlgorithm, + #[cfg_attr(feature = "use_serde", serde(default))] + #[fluvio(min_version = 12)] + deduplication: Option, } impl From for TopicSpec { @@ -108,6 +113,14 @@ impl TopicSpec { self.storage = Some(storage); } + pub fn get_deduplication(&self) -> Option<&Deduplication> { + self.deduplication.as_ref() + } + + pub fn set_deduplication(&mut self, deduplication: Option) { + self.deduplication = deduplication; + } + /// get retention secs that can be displayed pub fn retention_secs(&self) -> u32 { self.get_clean_policy() @@ -660,6 +673,8 @@ pub mod test { use std::io::Cursor; + use crate::topic::{Bounds, Filter, Transform}; + use super::*; #[test] @@ -889,6 +904,36 @@ pub mod test { } } + #[test] + fn test_topic_with_dedup_prev_version_compatibility() { + //given + let prev_version = 11; + let mut topic_spec: TopicSpec = ReplicaSpec::Computed((2, 3, true).into()).into(); + topic_spec.set_deduplication(Some(Deduplication { + bounds: Bounds { + count: 1, + age: None, + }, + filter: Filter { + transform: Transform { + uses: "filter".to_string(), + ..Default::default() + }, + }, + })); + + //when + let mut dest = vec![]; + topic_spec.encode(&mut dest, prev_version).expect("encoded"); + let mut topic_spec_decoded = TopicSpec::default(); + topic_spec_decoded + .decode(&mut Cursor::new(&dest), prev_version) + .expect("decoded"); + + //then + assert!(topic_spec_decoded.deduplication.is_none()); + } + #[test] fn test_partition_map_str() { // Test multiple diff --git a/crates/fluvio-protocol/src/link/error_code.rs b/crates/fluvio-protocol/src/link/error_code.rs index b1b1b60d8d..f5180d939d 100644 --- a/crates/fluvio-protocol/src/link/error_code.rs +++ b/crates/fluvio-protocol/src/link/error_code.rs @@ -182,6 +182,14 @@ pub enum ErrorCode { #[fluvio(tag = 9000)] #[error("a compression error occurred in the SPU")] CompressionError, + + // Deduplication + #[fluvio(tag = 10000)] + #[error("Deduplication SmartModule is not loaded into the cluster")] + DeduplicationSmartModuleNotLoaded, + #[fluvio(tag = 10001)] + #[error("Deduplication SmartModule name is invalid: {0}")] + DeduplicationSmartModuleNameInvalid(String), } impl ErrorCode { diff --git a/crates/fluvio-sc-schema/Cargo.toml b/crates/fluvio-sc-schema/Cargo.toml index 024a6da760..dd55d698c7 100644 --- a/crates/fluvio-sc-schema/Cargo.toml +++ b/crates/fluvio-sc-schema/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fluvio-sc-schema" -version = "0.19.1" +version = "0.19.2" edition = "2021" authors = ["Fluvio Contributors "] description = "Fluvio API for SC" diff --git a/crates/fluvio-sc-schema/src/objects/mod.rs b/crates/fluvio-sc-schema/src/objects/mod.rs index e961601525..91515d3341 100644 --- a/crates/fluvio-sc-schema/src/objects/mod.rs +++ b/crates/fluvio-sc-schema/src/objects/mod.rs @@ -13,7 +13,7 @@ pub use list::*; pub use watch::*; pub use metadata::*; -pub(crate) const COMMON_VERSION: i16 = 11; // from now, we use a single version for all objects +pub(crate) const COMMON_VERSION: i16 = 12; // from now, we use a single version for all objects pub(crate) const DYN_OBJ: i16 = 11; // version indicate dynamic object #[cfg(test)] diff --git a/crates/fluvio-sc-schema/src/objects/test.rs b/crates/fluvio-sc-schema/src/objects/test.rs index 2cc6590018..4ee7b5d722 100644 --- a/crates/fluvio-sc-schema/src/objects/test.rs +++ b/crates/fluvio-sc-schema/src/objects/test.rs @@ -9,7 +9,7 @@ use crate::TryEncodableFrom; use crate::objects::{ Metadata, MetadataUpdate, ListResponse, ObjectApiWatchRequest, ObjectApiListResponse, - ClassicObjectApiListRequest, + ClassicObjectApiListRequest, DYN_OBJ, }; use crate::topic::TopicSpec; @@ -22,17 +22,17 @@ fn test_encoding_compatibility() { let raw_req: ListRequest = ListRequest::new("test", false); // upcast let list_request = - ObjectApiListRequest::try_encode_from(raw_req, COMMON_VERSION - 1).expect("encoded"); + ObjectApiListRequest::try_encode_from(raw_req, DYN_OBJ - 1).expect("encoded"); let mut new_dest = vec![]; list_request - .encode(&mut new_dest, COMMON_VERSION - 1) + .encode(&mut new_dest, DYN_OBJ - 1) .expect("encoding"); let raw_req2: ListRequest = ListRequest::new("test", false); let old_topic_request = ClassicObjectApiListRequest::Topic(raw_req2); let mut old_dest: Vec = vec![]; old_topic_request - .encode(&mut old_dest, COMMON_VERSION - 1) + .encode(&mut old_dest, DYN_OBJ - 1) .expect("encoding"); // assert_eq!(new_dest.len(),20); @@ -66,12 +66,11 @@ fn test_req_old_to_new() { let old_topic_request = ClassicObjectApiListRequest::Topic(raw_req); let mut dest = vec![]; old_topic_request - .encode(&mut dest, COMMON_VERSION - 1) + .encode(&mut dest, DYN_OBJ - 1) .expect("encoding"); let new_topic_request = - ObjectApiListRequest::decode_from(&mut Cursor::new(dest), COMMON_VERSION - 1) - .expect("decode"); + ObjectApiListRequest::decode_from(&mut Cursor::new(dest), DYN_OBJ - 1).expect("decode"); let downcast = new_topic_request.downcast().expect("downcast") as Option>; diff --git a/crates/fluvio-sc/src/services/private_api/private_server.rs b/crates/fluvio-sc/src/services/private_api/private_server.rs index 083b26f9dc..543d2dbcd6 100644 --- a/crates/fluvio-sc/src/services/private_api/private_server.rs +++ b/crates/fluvio-sc/src/services/private_api/private_server.rs @@ -122,8 +122,8 @@ async fn dispatch_loop( use futures_util::stream::StreamExt; send_spu_spec_changes(&mut spu_spec_listener, &mut sink, spu_id).await?; - send_replica_spec_changes(&mut partition_spec_listener, &mut sink, spu_id).await?; send_smartmodule_changes(&mut sm_spec_listener, &mut sink, spu_id).await?; + send_replica_spec_changes(&mut partition_spec_listener, &mut sink, spu_id).await?; trace!(spu_id, "waiting for SPU channel"); diff --git a/crates/fluvio-sc/src/services/public_api/topic/create.rs b/crates/fluvio-sc/src/services/public_api/topic/create.rs index e355c3404a..7033e5ad3d 100644 --- a/crates/fluvio-sc/src/services/public_api/topic/create.rs +++ b/crates/fluvio-sc/src/services/public_api/topic/create.rs @@ -9,6 +9,7 @@ //! Assigned Topics allow the users to apply their custom-defined replica assignment. //! +use fluvio_controlplane_metadata::smartmodule::SmartModulePackageKey; use fluvio_sc_schema::objects::CreateRequest; use tracing::{info, debug, trace, instrument}; use anyhow::{anyhow, Result}; @@ -103,6 +104,31 @@ async fn validate_topic_request(name: &str, topic_spec: &TopicSpec, metadata: &C ); } + // check if deduplication filter is present + if let Some(deduplication) = topic_spec.get_deduplication() { + let sm_name = deduplication.filter.transform.uses.as_str(); + let sm_fqdn = match SmartModulePackageKey::from_qualified_name(sm_name) { + Ok(fqdn) => fqdn.store_id(), + Err(err) => { + return Status::new( + sm_name.to_string(), + ErrorCode::DeduplicationSmartModuleNameInvalid(err.to_string()), + Some(err.to_string()), + ) + } + }; + if !metadata.smartmodules().store().contains_key(&sm_fqdn).await { + return Status::new( + sm_name.to_string(), + ErrorCode::DeduplicationSmartModuleNotLoaded, + Some(format!( + "{}\nHint: try `fluvio hub download {sm_name}` and repeat this operation", + ErrorCode::DeduplicationSmartModuleNotLoaded + )), + ); + } + } + match topic_spec.replicas() { ReplicaSpec::Computed(param) => { let next_state = validate_computed_topic_parameters(param); diff --git a/crates/fluvio-spu-schema/src/lib.rs b/crates/fluvio-spu-schema/src/lib.rs index b5634a65c8..5c16ed9321 100644 --- a/crates/fluvio-spu-schema/src/lib.rs +++ b/crates/fluvio-spu-schema/src/lib.rs @@ -17,4 +17,4 @@ pub use fluvio_protocol::link::versions::{ApiVersions, ApiVersionsRequest, ApiVe pub use isolation::*; /// Default API version for all API -pub(crate) const COMMON_VERSION: i16 = 21; +pub const COMMON_VERSION: i16 = 21; diff --git a/k8-util/helm/fluvio-sys/templates/crd_partition.yaml b/k8-util/helm/fluvio-sys/templates/crd_partition.yaml index 0bd3f6fd6e..b89de6ae1f 100644 --- a/k8-util/helm/fluvio-sys/templates/crd_partition.yaml +++ b/k8-util/helm/fluvio-sys/templates/crd_partition.yaml @@ -54,6 +54,33 @@ spec: - Snappy - Lz4 - Zstd + deduplication: + type: object + nullable: true + properties: + filter: + type: object + nullable: false + properties: + transform: + type: object + properties: + uses: + type: string + nullable: false + with: + type: object + x-kubernetes-preserve-unknown-fields: true + bounds: + type: object + nullable: false + properties: + count: + type: integer + minimum: 0 + age: + type: string + nullable: true status: type: object x-kubernetes-preserve-unknown-fields: true @@ -100,4 +127,7 @@ spec: type: string description: Size jsonPath: .status.size - + - name: Deduplication Filter + type: string + description: Deduplication + jsonPath: .spec.deduplication.filter.transform.uses diff --git a/k8-util/helm/fluvio-sys/templates/crd_topic.yaml b/k8-util/helm/fluvio-sys/templates/crd_topic.yaml index 060310a0db..efc4985852 100644 --- a/k8-util/helm/fluvio-sys/templates/crd_topic.yaml +++ b/k8-util/helm/fluvio-sys/templates/crd_topic.yaml @@ -90,6 +90,33 @@ spec: maxPartitionSize: type: integer minimum: 2048 + deduplication: + type: object + nullable: true + properties: + filter: + type: object + nullable: false + properties: + transform: + type: object + properties: + uses: + type: string + nullable: false + with: + type: object + x-kubernetes-preserve-unknown-fields: true + bounds: + type: object + nullable: false + properties: + count: + type: integer + minimum: 0 + age: + type: string + nullable: true subresources: status: {} additionalPrinterColumns: @@ -117,6 +144,10 @@ spec: type: integer description: Max Partition Size jsonPath: .spec.storage.maxPartitionSize + - name: Deduplication Filter + type: string + description: Deduplication + jsonPath: .spec.deduplication.filter.transform.uses conversion: # None conversion assumes the same schema for all versions and only sets the apiVersion # field of custom resources to the proper value diff --git a/smartmodule/examples/Cargo.lock b/smartmodule/examples/Cargo.lock index 3885b56f19..ee830b3d53 100644 --- a/smartmodule/examples/Cargo.lock +++ b/smartmodule/examples/Cargo.lock @@ -176,7 +176,7 @@ dependencies = [ [[package]] name = "fluvio-protocol" -version = "0.10.2" +version = "0.10.4" dependencies = [ "bytes", "content_inspector", @@ -227,11 +227,11 @@ dependencies = [ [[package]] name = "fluvio-smartmodule" -version = "0.7.0" +version = "0.7.1" dependencies = [ "eyre", - "fluvio-protocol 0.10.2", - "fluvio-smartmodule-derive 0.5.1", + "fluvio-protocol 0.10.4", + "fluvio-smartmodule-derive 0.6.0", "thiserror", "tracing", ] @@ -240,14 +240,14 @@ dependencies = [ name = "fluvio-smartmodule-aggregate" version = "0.0.0" dependencies = [ - "fluvio-smartmodule 0.7.0", + "fluvio-smartmodule 0.7.1", ] [[package]] name = "fluvio-smartmodule-array-map-array" version = "0.0.0" dependencies = [ - "fluvio-smartmodule 0.7.0", + "fluvio-smartmodule 0.7.1", "serde_json", ] @@ -255,7 +255,7 @@ dependencies = [ name = "fluvio-smartmodule-array-map-object" version = "0.0.0" dependencies = [ - "fluvio-smartmodule 0.7.0", + "fluvio-smartmodule 0.7.1", "serde_json", ] @@ -272,7 +272,7 @@ dependencies = [ [[package]] name = "fluvio-smartmodule-derive" -version = "0.5.1" +version = "0.6.0" dependencies = [ "proc-macro2", "quote", @@ -283,44 +283,42 @@ dependencies = [ name = "fluvio-smartmodule-filter" version = "0.0.0" dependencies = [ - "fluvio-smartmodule 0.7.0", + "fluvio-smartmodule 0.7.1", ] [[package]] name = "fluvio-smartmodule-filter-init" version = "0.0.0" dependencies = [ - "fluvio-smartmodule 0.7.0", - "once_cell", + "fluvio-smartmodule 0.7.1", ] [[package]] name = "fluvio-smartmodule-filter-lookback" version = "0.0.0" dependencies = [ - "fluvio-smartmodule 0.7.0", + "fluvio-smartmodule 0.7.1", ] [[package]] name = "fluvio-smartmodule-filter-map" version = "0.0.0" dependencies = [ - "fluvio-smartmodule 0.7.0", + "fluvio-smartmodule 0.7.1", ] [[package]] name = "fluvio-smartmodule-filter-param" version = "0.0.0" dependencies = [ - "fluvio-smartmodule 0.7.0", - "once_cell", + "fluvio-smartmodule 0.7.1", ] [[package]] name = "fluvio-smartmodule-map" version = "0.0.0" dependencies = [ - "fluvio-smartmodule 0.7.0", + "fluvio-smartmodule 0.7.1", ] [[package]] @@ -345,7 +343,7 @@ dependencies = [ name = "fluvio-wasm-aggregate-average" version = "0.0.0" dependencies = [ - "fluvio-smartmodule 0.7.0", + "fluvio-smartmodule 0.7.1", "serde", "serde_json", ] @@ -354,7 +352,7 @@ dependencies = [ name = "fluvio-wasm-aggregate-json" version = "0.0.0" dependencies = [ - "fluvio-smartmodule 0.7.0", + "fluvio-smartmodule 0.7.1", "serde", "serde_json", ] @@ -363,14 +361,14 @@ dependencies = [ name = "fluvio-wasm-aggregate-sum" version = "0.0.0" dependencies = [ - "fluvio-smartmodule 0.7.0", + "fluvio-smartmodule 0.7.1", ] [[package]] name = "fluvio-wasm-array-map-reddit" version = "0.0.0" dependencies = [ - "fluvio-smartmodule 0.7.0", + "fluvio-smartmodule 0.7.1", "serde", "serde_json", ] @@ -379,7 +377,7 @@ dependencies = [ name = "fluvio-wasm-filter-json" version = "0.0.0" dependencies = [ - "fluvio-smartmodule 0.7.0", + "fluvio-smartmodule 0.7.1", "serde", "serde_json", ] @@ -388,7 +386,7 @@ dependencies = [ name = "fluvio-wasm-filter-odd" version = "0.0.0" dependencies = [ - "fluvio-smartmodule 0.7.0", + "fluvio-smartmodule 0.7.1", "thiserror", ] @@ -396,7 +394,7 @@ dependencies = [ name = "fluvio-wasm-filter-regex" version = "0.0.0" dependencies = [ - "fluvio-smartmodule 0.7.0", + "fluvio-smartmodule 0.7.1", "regex", ] @@ -411,14 +409,14 @@ dependencies = [ name = "fluvio-wasm-map-double" version = "0.0.0" dependencies = [ - "fluvio-smartmodule 0.7.0", + "fluvio-smartmodule 0.7.1", ] [[package]] name = "fluvio-wasm-map-json" version = "0.0.0" dependencies = [ - "fluvio-smartmodule 0.7.0", + "fluvio-smartmodule 0.7.1", "serde", "serde_json", "serde_yaml", @@ -428,7 +426,7 @@ dependencies = [ name = "fluvio-wasm-map-regex" version = "0.0.0" dependencies = [ - "fluvio-smartmodule 0.7.0", + "fluvio-smartmodule 0.7.1", "once_cell", "regex", ] @@ -832,8 +830,7 @@ dependencies = [ name = "sm-integer-sum-aggegrate" version = "0.0.0" dependencies = [ - "fluvio-smartmodule 0.7.0", - "once_cell", + "fluvio-smartmodule 0.7.1", ] [[package]] diff --git a/tests/cli/fluvio_smoke_tests/topic-basic.bats b/tests/cli/fluvio_smoke_tests/topic-basic.bats index d1cd65e85d..a6bef28d03 100644 --- a/tests/cli/fluvio_smoke_tests/topic-basic.bats +++ b/tests/cli/fluvio_smoke_tests/topic-basic.bats @@ -17,6 +17,9 @@ setup_file() { TOPIC_CONFIG_PATH="$TEST_DIR/$TOPIC_NAME.yaml" export TOPIC_CONFIG_PATH + DEDUP_FILTER_NAME="dedup-filter" + export DEDUP_FILTER_NAME + cat <$TOPIC_CONFIG_PATH version: 0.1.0 meta: @@ -31,8 +34,17 @@ retention: segment_size: 2.0 KB compression: type: Lz4 +deduplication: + bounds: + count: 100 + age: 1m + filter: + transform: + uses: $DEDUP_FILTER_NAME EOF + run timeout 15s "$FLUVIO_BIN" sm create --wasm-file smartmodule/examples/target/wasm32-unknown-unknown/release/fluvio_smartmodule_filter.wasm "$DEDUP_FILTER_NAME" + assert_success } # Create topic @@ -154,3 +166,20 @@ EOF assert_success } +# Describe topic contains Deduplication info +@test "Describe a topic with deduplication info" { + if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on fluvio cli stable version" + fi + if [ "$FLUVIO_CLUSTER_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on cluster stable version" + fi + debug_msg "Topic name: $TOPIC_NAME" + run timeout 15s "$FLUVIO_BIN" topic describe "$TOPIC_NAME" + debug_msg "status: $status" + debug_msg "output: ${lines[0]}" + assert_success + assert_output --partial "Deduplication Filter:$DEDUP_FILTER_NAME" + assert_output --partial "Deduplication Count Bound:10" + assert_output --partial "Deduplication Age Bound:1" +}