Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - feat: added topic deduplication mechanism 1/2 #3392

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions crates/fluvio-cli/src/client/topic/describe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ mod display {

use fluvio::metadata::topic::ReplicaSpec;
use comfy_table::Row;
use humantime::format_duration;
use serde::Serialize;

use fluvio::metadata::objects::Metadata;
Expand Down Expand Up @@ -145,6 +146,24 @@ mod display {
}
}

if let Some(dedup) = spec.get_deduplication() {
key_values.push((
"Deduplication Filter".to_owned(),
Some(dedup.filter.transform.uses.clone()),
));
key_values.push((
"Deduplication Count Bound".to_owned(),
Some(dedup.bounds.count)
.filter(|c| *c != 0)
.as_ref()
.map(ToString::to_string),
));
key_values.push((
"Deduplication Age Bound".to_owned(),
dedup.bounds.age.map(|a| format_duration(a).to_string()),
));
};

key_values.push((
"Status".to_owned(),
Some(status.resolution.resolution_label().to_string()),
Expand Down
7 changes: 7 additions & 0 deletions crates/fluvio-cli/src/client/topic/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ mod display {
"REPLICAS",
"RETENTION TIME",
"COMPRESSION",
"DEDUPLICATION",
"STATUS",
"REASON",
])
Expand All @@ -113,6 +114,12 @@ mod display {
topic.retention_secs() as u64
))),
Cell::new(topic.get_compression_type()),
Cell::new(
topic
.get_deduplication()
.map(|d| d.filter.transform.uses.as_str())
.unwrap_or("none"),
),
Cell::new(metadata.status.resolution.to_string()),
Cell::new(metadata.status.reason.to_string()),
])
Expand Down
15 changes: 14 additions & 1 deletion crates/fluvio-connector-package/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ mod tests {
TopicConfigBuilder, MetaConfigBuilder, MetaConfig, PartitionConfig, RetentionConfig,
CompressionConfig,
},
CompressionAlgorithm,
CompressionAlgorithm, Deduplication, Bounds, Filter, Transform,
};
use fluvio_smartengine::transformation::{TransformationStep, Lookback};
use pretty_assertions::assert_eq;
Expand Down Expand Up @@ -556,6 +556,19 @@ mod tests {
compression: CompressionConfig {
type_: CompressionAlgorithm::Lz4,
},
deduplication: Some(Deduplication {
bounds: Bounds {
count: 100,
age: Some(Duration::from_secs(60)),
},
filter: Filter {
transform: Transform {
uses: "infinyon/fluvio-smartmodule-filter-lookback@0.1.0"
.to_string(),
with: Default::default(),
},
},
}),
},
version: "0.1.0".to_string(),
producer: Some(ProducerParameters {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ meta:
segment-size: 2.0 KB
compression:
type: Lz4
deduplication:
bounds:
count: 100
age: 1m
filter:
transform:
uses: infinyon/fluvio-smartmodule-filter-lookback@0.1.0
producer:
linger: 1ms
batch-size: "44.0 MB"
Expand Down
6 changes: 4 additions & 2 deletions crates/fluvio-controlplane-metadata/src/partition/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use fluvio_protocol::{Encoder, Decoder};
use fluvio_types::SpuId;

use crate::partition::ReplicaKey;
use crate::core::{MetadataItem};
use crate::core::MetadataItem;
use crate::store::MetadataStoreObject;
use crate::partition::PartitionSpec;
use crate::topic::{CleanupPolicy, TopicStorageConfig, CompressionAlgorithm};
use crate::topic::{CleanupPolicy, TopicStorageConfig, CompressionAlgorithm, Deduplication};
use super::store::*;

/// Metadata about Replica send from SC
Expand All @@ -22,6 +22,7 @@ pub struct Replica {
pub cleanup_policy: Option<CleanupPolicy>,
pub storage: Option<TopicStorageConfig>,
pub compression_type: CompressionAlgorithm,
pub deduplication: Option<Deduplication>,
}

impl Replica {
Expand Down Expand Up @@ -64,6 +65,7 @@ where
cleanup_policy: spec.cleanup_policy,
storage: spec.storage,
compression_type: spec.compression_type,
deduplication: spec.deduplication,
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion crates/fluvio-controlplane-metadata/src/partition/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use fluvio_types::SpuId;
use fluvio_protocol::{Encoder, Decoder};

use crate::topic::{CleanupPolicy, TopicStorageConfig, TopicSpec, CompressionAlgorithm};
use crate::topic::{CleanupPolicy, TopicStorageConfig, TopicSpec, CompressionAlgorithm, Deduplication};

/// Spec for Partition
/// Each partition has replicas spread among SPU
Expand All @@ -28,6 +28,8 @@ pub struct PartitionSpec {
#[cfg_attr(feature = "use_serde", serde(default))]
#[fluvio(min_version = 6)]
pub compression_type: CompressionAlgorithm,
#[fluvio(min_version = 12)]
pub deduplication: Option<Deduplication>,
}

impl PartitionSpec {
Expand All @@ -49,6 +51,7 @@ impl PartitionSpec {
cleanup_policy: topic.get_clean_policy().cloned(),
storage: topic.get_storage().cloned(),
compression_type: topic.get_compression_type().clone(),
deduplication: topic.get_deduplication().cloned(),
}
}

Expand Down
36 changes: 35 additions & 1 deletion crates/fluvio-controlplane-metadata/src/topic/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::topic::{
ReplicaSpec, TopicReplicaParam, SegmentBasedPolicy, CleanupPolicy, TopicStorageConfig,
};

use super::{TopicSpec, PartitionMap, CompressionAlgorithm};
use super::{TopicSpec, PartitionMap, CompressionAlgorithm, deduplication::Deduplication};

const DEFAULT_PARTITION_COUNT: PartitionCount = 1;
const DEFAULT_REPLICATION_FACTOR: ReplicationFactor = 1;
Expand Down Expand Up @@ -38,6 +38,13 @@ pub struct TopicConfig {
#[builder(default)]
#[cfg_attr(feature = "use_serde", serde(default))]
pub compression: CompressionConfig,

#[builder(default)]
#[cfg_attr(
feature = "use_serde",
serde(default, skip_serializing_if = "Option::is_none")
)]
pub deduplication: Option<Deduplication>,
}

#[derive(Debug, Default, Builder, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -156,6 +163,7 @@ impl From<TopicConfig> for TopicSpec {
};

topic_spec.set_compression_type(config.compression.type_);
topic_spec.set_deduplication(config.deduplication);

if segment_size.is_some() || max_partition_size.is_some() {
topic_spec.set_storage(TopicStorageConfig {
Expand All @@ -174,6 +182,8 @@ fn default_version() -> String {

#[cfg(test)]
mod tests {
use crate::topic::deduplication::{Bounds, Filter, Transform};

use super::*;

#[cfg(feature = "use_serde")]
Expand All @@ -198,6 +208,13 @@ retention:
segment-size: 2.0 KB
compression:
type: Lz4
deduplication:
bounds:
count: 100
age: 1m
filter:
transform:
uses: infinyon/dedup-filter@0.1.0
"#;

//when
Expand Down Expand Up @@ -292,6 +309,7 @@ compression:
segment_size: Some(2000),
max_partition_size: Some(1000),
});
test_spec.set_deduplication(Some(test_deduplication()));

assert_eq!(spec, test_spec);
}
Expand Down Expand Up @@ -319,6 +337,22 @@ compression:
compression: CompressionConfig {
type_: CompressionAlgorithm::Lz4,
},
deduplication: Some(test_deduplication()),
}
}

fn test_deduplication() -> Deduplication {
Deduplication {
bounds: Bounds {
count: 100,
age: Some(Duration::from_secs(60)),
},
filter: Filter {
transform: Transform {
uses: "infinyon/dedup-filter@0.1.0".to_string(),
with: Default::default(),
},
},
}
}
}
65 changes: 65 additions & 0 deletions crates/fluvio-controlplane-metadata/src/topic/deduplication.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use std::{time::Duration, collections::BTreeMap};

use derive_builder::Builder;
use fluvio_protocol::{Encoder, Decoder};

#[derive(Debug, Default, Builder, Clone, PartialEq, Eq, Encoder, Decoder)]
#[cfg_attr(
feature = "use_serde",
derive(serde::Serialize, serde::Deserialize),
serde(rename_all = "kebab-case")
)]
pub struct Deduplication {
pub bounds: Bounds,
pub filter: Filter,
}

#[derive(Debug, Default, Builder, Clone, PartialEq, Eq, Encoder, Decoder)]
#[cfg_attr(
feature = "use_serde",
derive(serde::Serialize, serde::Deserialize),
serde(rename_all = "kebab-case")
)]
pub struct Bounds {
#[cfg_attr(feature = "use_serde", serde(default, skip_serializing_if = "is_zero"))]
pub count: u64,
#[cfg_attr(
feature = "use_serde",
serde(
default,
skip_serializing_if = "Option::is_none",
with = "humantime_serde"
)
)]
pub age: Option<Duration>,
}
Comment on lines +23 to +35
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not something to change for this PR, but I wonder if there is some commonality with Lookback logic we can reuse between the two (slightly different) uses as a generic "record (pre-?)selection"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although Bounds and Lookback are common now, they might differ later. Bound covers both preselection(Lookback) and limits for runtime stream processing (via SmartModule init params).


#[derive(Debug, Default, Builder, Clone, PartialEq, Eq, Encoder, Decoder)]
#[cfg_attr(
feature = "use_serde",
derive(serde::Serialize, serde::Deserialize),
serde(rename_all = "kebab-case")
)]
pub struct Filter {
pub transform: Transform,
}

#[derive(Debug, Default, Builder, Clone, PartialEq, Eq, Encoder, Decoder)]
#[cfg_attr(
feature = "use_serde",
derive(serde::Serialize, serde::Deserialize),
serde(rename_all = "kebab-case")
)]
pub struct Transform {
pub uses: String,
#[cfg_attr(
feature = "use_serde",
serde(default, skip_serializing_if = "BTreeMap::is_empty")
)]
pub with: BTreeMap<String, String>,
}

#[cfg(feature = "use_serde")]
fn is_zero(val: &u64) -> bool {
*val == 0
}
2 changes: 2 additions & 0 deletions crates/fluvio-controlplane-metadata/src/topic/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
mod spec;
mod status;
mod deduplication;
pub mod store;
pub mod config;

pub use self::spec::*;
pub use self::status::*;
pub use self::deduplication::*;

pub const PENDING_REASON: &str = "waiting for live spus";

Expand Down
Loading
Loading