Skip to content

Commit

Permalink
feat: added topic deduplication mechanism 1/2
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Galibey committed Jul 12, 2023
1 parent 86797cd commit 86f762f
Show file tree
Hide file tree
Showing 22 changed files with 363 additions and 46 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions crates/fluvio-cli/src/client/topic/describe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ mod display {

use fluvio::metadata::topic::ReplicaSpec;
use comfy_table::Row;
use humantime::format_duration;
use serde::Serialize;

use fluvio::metadata::objects::Metadata;
Expand Down Expand Up @@ -145,6 +146,24 @@ mod display {
}
}

if let Some(dedup) = spec.get_deduplication() {
key_values.push((
"Deduplication Filter".to_owned(),
Some(dedup.filter.transform.uses.clone()),
));
key_values.push((
"Deduplication Count Bound".to_owned(),
Some(dedup.bounds.count)
.filter(|c| *c != 0)
.as_ref()
.map(ToString::to_string),
));
key_values.push((
"Deduplication Age Bound".to_owned(),
dedup.bounds.age.map(|a| format_duration(a).to_string()),
));
};

key_values.push((
"Status".to_owned(),
Some(status.resolution.resolution_label().to_string()),
Expand Down
7 changes: 7 additions & 0 deletions crates/fluvio-cli/src/client/topic/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ mod display {
"REPLICAS",
"RETENTION TIME",
"COMPRESSION",
"DEDUPLICATION",
"STATUS",
"REASON",
])
Expand All @@ -113,6 +114,12 @@ mod display {
topic.retention_secs() as u64
))),
Cell::new(topic.get_compression_type()),
Cell::new(
topic
.get_deduplication()
.map(|d| d.filter.transform.uses.as_str())
.unwrap_or("none"),
),
Cell::new(metadata.status.resolution.to_string()),
Cell::new(metadata.status.reason.to_string()),
])
Expand Down
15 changes: 14 additions & 1 deletion crates/fluvio-connector-package/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ mod tests {
TopicConfigBuilder, MetaConfigBuilder, MetaConfig, PartitionConfig, RetentionConfig,
CompressionConfig,
},
CompressionAlgorithm,
CompressionAlgorithm, Deduplication, Bounds, Filter, Transform,
};
use fluvio_smartengine::transformation::{TransformationStep, Lookback};
use pretty_assertions::assert_eq;
Expand Down Expand Up @@ -556,6 +556,19 @@ mod tests {
compression: CompressionConfig {
type_: CompressionAlgorithm::Lz4,
},
deduplication: Some(Deduplication {
bounds: Bounds {
count: 100,
age: Some(Duration::from_secs(60)),
},
filter: Filter {
transform: Transform {
uses: "infinyon/fluvio-smartmodule-filter-lookback@0.1.0"
.to_string(),
with: Default::default(),
},
},
}),
},
version: "0.1.0".to_string(),
producer: Some(ProducerParameters {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ meta:
segment-size: 2.0 KB
compression:
type: Lz4
deduplication:
bounds:
count: 100
age: 1m
filter:
transform:
uses: infinyon/fluvio-smartmodule-filter-lookback@0.1.0
producer:
linger: 1ms
batch-size: "44.0 MB"
Expand Down
6 changes: 4 additions & 2 deletions crates/fluvio-controlplane-metadata/src/partition/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use fluvio_protocol::{Encoder, Decoder};
use fluvio_types::SpuId;

use crate::partition::ReplicaKey;
use crate::core::{MetadataItem};
use crate::core::MetadataItem;
use crate::store::MetadataStoreObject;
use crate::partition::PartitionSpec;
use crate::topic::{CleanupPolicy, TopicStorageConfig, CompressionAlgorithm};
use crate::topic::{CleanupPolicy, TopicStorageConfig, CompressionAlgorithm, Deduplication};
use super::store::*;

/// Metadata about Replica send from SC
Expand All @@ -22,6 +22,7 @@ pub struct Replica {
pub cleanup_policy: Option<CleanupPolicy>,
pub storage: Option<TopicStorageConfig>,
pub compression_type: CompressionAlgorithm,
pub deduplication: Option<Deduplication>,
}

impl Replica {
Expand Down Expand Up @@ -64,6 +65,7 @@ where
cleanup_policy: spec.cleanup_policy,
storage: spec.storage,
compression_type: spec.compression_type,
deduplication: spec.deduplication,
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion crates/fluvio-controlplane-metadata/src/partition/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use fluvio_types::SpuId;
use fluvio_protocol::{Encoder, Decoder};

use crate::topic::{CleanupPolicy, TopicStorageConfig, TopicSpec, CompressionAlgorithm};
use crate::topic::{CleanupPolicy, TopicStorageConfig, TopicSpec, CompressionAlgorithm, Deduplication};

/// Spec for Partition
/// Each partition has replicas spread among SPU
Expand All @@ -28,6 +28,8 @@ pub struct PartitionSpec {
#[cfg_attr(feature = "use_serde", serde(default))]
#[fluvio(min_version = 6)]
pub compression_type: CompressionAlgorithm,
#[fluvio(min_version = 12)]
pub deduplication: Option<Deduplication>,
}

impl PartitionSpec {
Expand All @@ -49,6 +51,7 @@ impl PartitionSpec {
cleanup_policy: topic.get_clean_policy().cloned(),
storage: topic.get_storage().cloned(),
compression_type: topic.get_compression_type().clone(),
deduplication: topic.get_deduplication().cloned(),
}
}

Expand Down
36 changes: 35 additions & 1 deletion crates/fluvio-controlplane-metadata/src/topic/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::topic::{
ReplicaSpec, TopicReplicaParam, SegmentBasedPolicy, CleanupPolicy, TopicStorageConfig,
};

use super::{TopicSpec, PartitionMap, CompressionAlgorithm};
use super::{TopicSpec, PartitionMap, CompressionAlgorithm, deduplication::Deduplication};

const DEFAULT_PARTITION_COUNT: PartitionCount = 1;
const DEFAULT_REPLICATION_FACTOR: ReplicationFactor = 1;
Expand Down Expand Up @@ -38,6 +38,13 @@ pub struct TopicConfig {
#[builder(default)]
#[cfg_attr(feature = "use_serde", serde(default))]
pub compression: CompressionConfig,

#[builder(default)]
#[cfg_attr(
feature = "use_serde",
serde(default, skip_serializing_if = "Option::is_none")
)]
pub deduplication: Option<Deduplication>,
}

#[derive(Debug, Default, Builder, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -156,6 +163,7 @@ impl From<TopicConfig> for TopicSpec {
};

topic_spec.set_compression_type(config.compression.type_);
topic_spec.set_deduplication(config.deduplication);

if segment_size.is_some() || max_partition_size.is_some() {
topic_spec.set_storage(TopicStorageConfig {
Expand All @@ -174,6 +182,8 @@ fn default_version() -> String {

#[cfg(test)]
mod tests {
use crate::topic::deduplication::{Bounds, Filter, Transform};

use super::*;

#[cfg(feature = "use_serde")]
Expand All @@ -198,6 +208,13 @@ retention:
segment-size: 2.0 KB
compression:
type: Lz4
deduplication:
bounds:
count: 100
age: 1m
filter:
transform:
uses: infinyon/dedup-filter@0.1.0
"#;

//when
Expand Down Expand Up @@ -292,6 +309,7 @@ compression:
segment_size: Some(2000),
max_partition_size: Some(1000),
});
test_spec.set_deduplication(Some(test_deduplication()));

assert_eq!(spec, test_spec);
}
Expand Down Expand Up @@ -319,6 +337,22 @@ compression:
compression: CompressionConfig {
type_: CompressionAlgorithm::Lz4,
},
deduplication: Some(test_deduplication()),
}
}

fn test_deduplication() -> Deduplication {
Deduplication {
bounds: Bounds {
count: 100,
age: Some(Duration::from_secs(60)),
},
filter: Filter {
transform: Transform {
uses: "infinyon/dedup-filter@0.1.0".to_string(),
with: Default::default(),
},
},
}
}
}
65 changes: 65 additions & 0 deletions crates/fluvio-controlplane-metadata/src/topic/deduplication.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use std::{time::Duration, collections::BTreeMap};

use derive_builder::Builder;
use fluvio_protocol::{Encoder, Decoder};

#[derive(Debug, Default, Builder, Clone, PartialEq, Eq, Encoder, Decoder)]
#[cfg_attr(
feature = "use_serde",
derive(serde::Serialize, serde::Deserialize),
serde(rename_all = "kebab-case")
)]
pub struct Deduplication {
pub bounds: Bounds,
pub filter: Filter,
}

#[derive(Debug, Default, Builder, Clone, PartialEq, Eq, Encoder, Decoder)]
#[cfg_attr(
feature = "use_serde",
derive(serde::Serialize, serde::Deserialize),
serde(rename_all = "kebab-case")
)]
pub struct Bounds {
#[cfg_attr(feature = "use_serde", serde(default, skip_serializing_if = "is_zero"))]
pub count: u64,
#[cfg_attr(
feature = "use_serde",
serde(
default,
skip_serializing_if = "Option::is_none",
with = "humantime_serde"
)
)]
pub age: Option<Duration>,
}

#[derive(Debug, Default, Builder, Clone, PartialEq, Eq, Encoder, Decoder)]
#[cfg_attr(
feature = "use_serde",
derive(serde::Serialize, serde::Deserialize),
serde(rename_all = "kebab-case")
)]
pub struct Filter {
pub transform: Transform,
}

#[derive(Debug, Default, Builder, Clone, PartialEq, Eq, Encoder, Decoder)]
#[cfg_attr(
feature = "use_serde",
derive(serde::Serialize, serde::Deserialize),
serde(rename_all = "kebab-case")
)]
pub struct Transform {
pub uses: String,
#[cfg_attr(
feature = "use_serde",
serde(default, skip_serializing_if = "BTreeMap::is_empty")
)]
pub with: BTreeMap<String, String>,
}

#[cfg(feature = "use_serde")]
fn is_zero(val: &u64) -> bool {
*val == 0
}
2 changes: 2 additions & 0 deletions crates/fluvio-controlplane-metadata/src/topic/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
mod spec;
mod status;
mod deduplication;
pub mod store;
pub mod config;

pub use self::spec::*;
pub use self::status::*;
pub use self::deduplication::*;

pub const PENDING_REASON: &str = "waiting for live spus";

Expand Down
Loading

0 comments on commit 86f762f

Please sign in to comment.