diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c4b9aeb5f3..3426cb0a15 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -930,6 +930,8 @@ jobs: name: CLI smoke test (${{ matrix.test }}) CLI (${{ matrix.cli_version }}) on cluster (${{ matrix.cluster_version }}) needs: build_image runs-on: ${{ matrix.os }} + env: + SPU: 2 strategy: # fail-fast: false matrix: @@ -1004,7 +1006,7 @@ jobs: run: | FLUVIO_CLUSTER_RELEASE_CHANNEL=stable echo "FLUVIO_CLUSTER_RELEASE_CHANNEL=${FLUVIO_CLUSTER_RELEASE_CHANNEL}" >> $GITHUB_ENV - fluvio cluster start + fluvio cluster start --spu ${{ env.SPU }} # download and start cluster for dev - name: Deploy dev cluster on k8 @@ -1023,7 +1025,7 @@ jobs: docker image tag infinyon/fluvio:${{ github.sha }}-x86_64-unknown-linux-musl infinyon/fluvio:${{ github.sha }} docker image save infinyon/fluvio:${{ github.sha }} --output /tmp/infinyon-fluvio.tar k3d image import -k /tmp/infinyon-fluvio.tar -c fluvio - fluvio-dev cluster start --develop + fluvio-dev cluster start --develop --spu ${{ env.SPU }} - name: Build smartmodules for E2E test run: | diff --git a/Cargo.lock b/Cargo.lock index bcd46405e4..ffd7b02d90 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2288,7 +2288,7 @@ dependencies = [ [[package]] name = "fluvio" -version = "0.19.4" +version = "0.20.0" dependencies = [ "anyhow", "async-channel", @@ -2639,7 +2639,7 @@ dependencies = [ [[package]] name = "fluvio-controlplane-metadata" -version = "0.22.4" +version = "0.23.0" dependencies = [ "anyhow", "async-trait", @@ -2657,6 +2657,7 @@ dependencies = [ "lenient_semver", "semver 1.0.18", "serde", + "serde_json", "serde_yaml 0.9.25", "thiserror", "toml 0.7.6", @@ -2775,7 +2776,7 @@ dependencies = [ "toml 0.7.6", "tracing", "tracing-subscriber", - "wasmparser 0.108.0", + "wasmparser 0.110.0", ] [[package]] @@ -2884,7 +2885,7 @@ dependencies = [ [[package]] name = "fluvio-sc-schema" -version = "0.19.2" +version = "0.20.0" dependencies = [ "anyhow", "fluvio-controlplane-metadata", @@ -7788,16 +7789,6 @@ dependencies = [ "semver 1.0.18", ] -[[package]] -name = "wasmparser" -version = "0.108.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76c956109dcb41436a39391139d9b6e2d0a5e0b158e1293ef352ec977e5e36c5" -dependencies = [ - "indexmap 2.0.0", - "semver 1.0.18", -] - [[package]] name = "wasmparser" version = "0.110.0" diff --git a/Cargo.toml b/Cargo.toml index eea991d5a7..8b0695f883 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -137,20 +137,20 @@ fluvio_ws_stream_wasm = "0.7.0" trybuild = { git = "https://github.com/infinyon/trybuild", branch = "check_option" } # internal fluvio dep -fluvio = { version = "0.19.0", path = "crates/fluvio" } +fluvio = { version = "0.20.0", path = "crates/fluvio" } fluvio-auth = { path = "crates/fluvio-auth" } fluvio-channel = { path = "crates/fluvio-channel" } fluvio-cli-common = { path = "crates/fluvio-cli-common"} fluvio-compression = { version = "0.3", path = "crates/fluvio-compression" } fluvio-connector-package = { path = "crates/fluvio-connector-package/" } fluvio-controlplane = { path = "crates/fluvio-controlplane" } -fluvio-controlplane-metadata = { version = "0.22.0", default-features = false, path = "crates/fluvio-controlplane-metadata" } +fluvio-controlplane-metadata = { version = "0.23.0", default-features = false, path = "crates/fluvio-controlplane-metadata" } fluvio-hub-util = { path = "crates/fluvio-hub-util" } fluvio-extension-common = { path = "crates/fluvio-extension-common", default-features = false } fluvio-package-index = { version = "0.7.0", path = "crates/fluvio-package-index", default-features = false } fluvio-protocol = { version = "0.10.4", path = "crates/fluvio-protocol" } fluvio-spu-schema = { version = "0.14.0", path = "crates/fluvio-spu-schema", default-features = false } -fluvio-sc-schema = { version = "0.19.0", path = "crates/fluvio-sc-schema", default-features = false } +fluvio-sc-schema = { version = "0.20.0", path = "crates/fluvio-sc-schema", default-features = false } fluvio-service = { path = "crates/fluvio-service" } fluvio-socket = { version = "0.14.3", path = "crates/fluvio-socket", default-features = false } fluvio-smartengine = { version = "0.7.0", path = "crates/fluvio-smartengine", default-features = false } diff --git a/crates/fluvio-cli/src/client/topic/create.rs b/crates/fluvio-cli/src/client/topic/create.rs index b5d334cef3..228c21e2d1 100644 --- a/crates/fluvio-cli/src/client/topic/create.rs +++ b/crates/fluvio-cli/src/client/topic/create.rs @@ -235,4 +235,21 @@ mod load { .map_err(|err| IoError::new(ErrorKind::InvalidData, format!("{err}"))) } } + + #[cfg(test)] + mod test { + + use fluvio_controlplane_metadata::topic::PartitionMaps; + + use super::PartitionLoad; + + #[test] + fn test_replica_map_file() { + let p_map = PartitionMaps::file_decode("test-data/topics/replica_assignment.json") + .expect("v1 not found"); + assert_eq!(p_map.maps().len(), 3); + assert_eq!(p_map.maps()[0].id, 0); + assert_eq!(p_map.maps()[0].replicas, vec![5001, 5002, 5003]); + } + } } diff --git a/crates/fluvio-cli/test-data/topics/replica_assignment.json b/crates/fluvio-cli/test-data/topics/replica_assignment.json new file mode 100644 index 0000000000..d191a4ba85 --- /dev/null +++ b/crates/fluvio-cli/test-data/topics/replica_assignment.json @@ -0,0 +1,26 @@ +[ + { + "id": 0, + "replicas": [ + 5001, + 5002, + 5003 + ] + }, + { + "id": 1, + "replicas": [ + 5002, + 5003, + 5001 + ] + }, + { + "id": 2, + "replicas": [ + 5003, + 5001, + 5002 + ] + } +] \ No newline at end of file diff --git a/crates/fluvio-cli/test-data/topics/valid_assignment.json b/crates/fluvio-cli/test-data/topics/valid_assignment.json deleted file mode 100644 index e2a7460615..0000000000 --- a/crates/fluvio-cli/test-data/topics/valid_assignment.json +++ /dev/null @@ -1,28 +0,0 @@ -{ - "partitions": [ - { - "id": 0, - "replicas": [ - 5001, - 5002, - 5003 - ] - }, - { - "id": 1, - "replicas": [ - 5002, - 5003, - 5001 - ] - }, - { - "id": 2, - "replicas": [ - 5003, - 5001, - 5002 - ] - } - ] -} \ No newline at end of file diff --git a/crates/fluvio-cli/test-data/topics/valid_kf_assignment.json b/crates/fluvio-cli/test-data/topics/valid_kf_assignment.json deleted file mode 100644 index 13fe61754f..0000000000 --- a/crates/fluvio-cli/test-data/topics/valid_kf_assignment.json +++ /dev/null @@ -1,25 +0,0 @@ -{ - "partitions": [ - { - "id": 0, - "replicas": [ - 1, - 2 - ] - }, - { - "id": 1, - "replicas": [ - 2, - 3 - ] - }, - { - "id": 2, - "replicas": [ - 3, - 1 - ] - } - ] -} \ No newline at end of file diff --git a/crates/fluvio-controlplane-metadata/Cargo.toml b/crates/fluvio-controlplane-metadata/Cargo.toml index 1d01b26332..3b03d5776a 100644 --- a/crates/fluvio-controlplane-metadata/Cargo.toml +++ b/crates/fluvio-controlplane-metadata/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "fluvio-controlplane-metadata" edition = "2021" -version = "0.22.4" +version = "0.23.0" authors = ["Fluvio Contributors "] description = "Metadata definition for Fluvio control plane" repository = "https://github.com/infinyon/fluvio" @@ -44,6 +44,6 @@ fluvio-protocol = { workspace = true, features = [ "record",] } [dev-dependencies] -serde_yaml = { workspace = true } fluvio-future = { workspace = true, features = ["fixture"] } +serde_json = { workspace = true } diff --git a/crates/fluvio-controlplane-metadata/src/topic/k8.rs b/crates/fluvio-controlplane-metadata/src/topic/k8.rs index 3b0f43e088..7dcd257deb 100644 --- a/crates/fluvio-controlplane-metadata/src/topic/k8.rs +++ b/crates/fluvio-controlplane-metadata/src/topic/k8.rs @@ -23,3 +23,26 @@ impl Spec for TopicSpec { &TOPIC_V2_API } } + +#[cfg(test)] +mod test_spec { + + use std::{io::BufReader, fs::File}; + + use fluvio_stream_model::k8_types::K8Obj; + + use crate::topic::ReplicaSpec; + + use super::TopicSpec; + + type K8TopicSpec = K8Obj; + + #[test] + fn read_k8_topic_partition_assignment_json() { + let reader: BufReader = + BufReader::new(File::open("tests/topic_assignment.json").expect("spec")); + let topic: K8TopicSpec = serde_json::from_reader(reader).expect("failed to parse topic"); + assert_eq!(topic.metadata.name, "test3"); + assert!(matches!(topic.spec.replicas(), ReplicaSpec::Assigned(_))); + } +} diff --git a/crates/fluvio-controlplane-metadata/src/topic/spec.rs b/crates/fluvio-controlplane-metadata/src/topic/spec.rs index 8904c88ecb..c72e9f06ff 100644 --- a/crates/fluvio-controlplane-metadata/src/topic/spec.rs +++ b/crates/fluvio-controlplane-metadata/src/topic/spec.rs @@ -189,7 +189,7 @@ impl std::fmt::Display for ReplicaSpec { impl Default for ReplicaSpec { fn default() -> Self { - Self::Assigned(PartitionMaps::default()) + Self::Computed(TopicReplicaParam::default()) } } @@ -364,14 +364,16 @@ impl std::fmt::Display for TopicReplicaParam { /// Hack: field instead of new type to get around encode and decode limitations #[derive(Debug, Default, Clone, Eq, PartialEq, Encoder, Decoder)] -#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))] -pub struct PartitionMaps { - maps: Vec, -} +#[cfg_attr( + feature = "use_serde", + derive(serde::Serialize, serde::Deserialize), + serde(transparent) +)] +pub struct PartitionMaps(Vec); impl From> for PartitionMaps { fn from(maps: Vec) -> Self { - Self { maps } + Self(maps) } } @@ -387,26 +389,26 @@ impl From)>> for PartitionMaps { impl std::fmt::Display for PartitionMaps { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "partition map:{})", self.maps.len()) + write!(f, "partition map:{})", self.0.len()) } } impl PartitionMaps { pub fn maps(&self) -> &Vec { - &self.maps + &self.0 } pub fn maps_owned(self) -> Vec { - self.maps + self.0 } fn partition_count(&self) -> PartitionCount { - self.maps.len() as PartitionCount + self.0.len() as PartitionCount } fn replication_factor(&self) -> Option { // compute replication form replica map - self.maps + self.0 .first() .map(|partition| partition.replicas.len() as ReplicationFactor) } @@ -415,7 +417,7 @@ impl PartitionMaps { use std::fmt::Write; let mut res = String::new(); - for partition in &self.maps { + for partition in &self.0 { write!(res, "{}:{:?}, ", partition.id, partition.replicas).unwrap(); // ok to unwrap since this will not fail } @@ -433,7 +435,7 @@ impl PartitionMaps { pub fn unique_spus_in_partition_map(&self) -> Vec { let mut spu_ids: Vec = vec![]; - for partition in &self.maps { + for partition in &self.0 { for spu in &partition.replicas { if !spu_ids.contains(spu) { spu_ids.push(*spu); @@ -448,7 +450,7 @@ impl PartitionMaps { pub fn partition_map_to_replica_map(&self) -> ReplicaMap { let mut replica_map: ReplicaMap = BTreeMap::new(); - for partition in &self.maps { + for partition in &self.0 { replica_map.insert(partition.id as PartitionId, partition.replicas.clone()); } @@ -459,7 +461,7 @@ impl PartitionMaps { #[allow(clippy::explicit_counter_loop)] pub fn valid_partition_map(&self) -> Result<(), Error> { // there must be at least one partition in the partition map - if self.maps.is_empty() { + if self.0.is_empty() { return Err(Error::new( ErrorKind::InvalidInput, "no assigned partitions found", @@ -477,7 +479,7 @@ impl PartitionMaps { // - all elements must be positive integers let mut id = 0; let mut replica_len = 0; - for partition in &self.maps { + for partition in &self.0 { if id == 0 { // id must be 0 if partition.id != id { diff --git a/crates/fluvio-controlplane-metadata/tests/topic_assignment.json b/crates/fluvio-controlplane-metadata/tests/topic_assignment.json new file mode 100644 index 0000000000..4322f958b5 --- /dev/null +++ b/crates/fluvio-controlplane-metadata/tests/topic_assignment.json @@ -0,0 +1,22 @@ +{ + "apiVersion": "fluvio.infinyon.com/v2", + "kind": "Topic", + "metadata": { + "name": "test3", + "namespace": "default" + }, + "spec": { + "compressionType": "Any", + "replicas": { + "assigned": [ + { + "id": 0, + "replicas": [ + 5001, + 5002 + ] + } + ] + } + } +} \ No newline at end of file diff --git a/crates/fluvio-controlplane-metadata/tests/topic_assignment.yaml b/crates/fluvio-controlplane-metadata/tests/topic_assignment.yaml new file mode 100644 index 0000000000..b52b880973 --- /dev/null +++ b/crates/fluvio-controlplane-metadata/tests/topic_assignment.yaml @@ -0,0 +1,14 @@ +# This is to create CRD but not for testing +apiVersion: fluvio.infinyon.com/v2 +kind: Topic +metadata: + name: test3 + namespace: default +spec: + replicas: + assigned: + - id: 0 + replicas: + - 5001 + - 5002 + compressionType: Any diff --git a/crates/fluvio-controlplane-metadata/tests/topic_computed.yaml b/crates/fluvio-controlplane-metadata/tests/topic_computed.yaml new file mode 100644 index 0000000000..eba71f9f3b --- /dev/null +++ b/crates/fluvio-controlplane-metadata/tests/topic_computed.yaml @@ -0,0 +1,12 @@ +apiVersion: fluvio.infinyon.com/v2 +kind: Topic +metadata: + name: test3 + namespace: default +spec: + compressionType: Any + replicas: + computed: + ignoreRackAssignment: false + partitions: 1 + replicationFactor: 1 diff --git a/crates/fluvio-sc-schema/Cargo.toml b/crates/fluvio-sc-schema/Cargo.toml index dd55d698c7..a5bbadd733 100644 --- a/crates/fluvio-sc-schema/Cargo.toml +++ b/crates/fluvio-sc-schema/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fluvio-sc-schema" -version = "0.19.2" +version = "0.20.0" edition = "2021" authors = ["Fluvio Contributors "] description = "Fluvio API for SC" diff --git a/crates/fluvio/Cargo.toml b/crates/fluvio/Cargo.toml index 86e3ee6762..f571291d8a 100644 --- a/crates/fluvio/Cargo.toml +++ b/crates/fluvio/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fluvio" -version = "0.19.4" +version = "0.20.0" edition = "2021" license = "Apache-2.0" authors = ["Fluvio Contributors "] diff --git a/k8-util/helm/fluvio-sys/Chart.yaml b/k8-util/helm/fluvio-sys/Chart.yaml index b8e7b2c5a7..fef43b02cf 100644 --- a/k8-util/helm/fluvio-sys/Chart.yaml +++ b/k8-util/helm/fluvio-sys/Chart.yaml @@ -2,4 +2,4 @@ apiVersion: v2 name: fluvio-sys description: A Helm chart for Fluvio type: application -version: 0.9.12 +version: 0.9.13 diff --git a/k8-util/helm/fluvio-sys/templates/crd_topic.yaml b/k8-util/helm/fluvio-sys/templates/crd_topic.yaml index efc4985852..0605b65c1a 100644 --- a/k8-util/helm/fluvio-sys/templates/crd_topic.yaml +++ b/k8-util/helm/fluvio-sys/templates/crd_topic.yaml @@ -45,22 +45,17 @@ spec: items: type: object required: - - partition + - id + - replicas properties: - partition: - type: object - required: - - id - - replicas - properties: - id: - type: integer - minimum: 0 - replicas: - type: array - items: - type: integer - minimum: 0 + id: + type: integer + minimum: 0 + replicas: + type: array + items: + type: integer + minimum: 0 oneOf: - required: ["computed"] - required: ["assigned"] diff --git a/tests/cli/fluvio_smoke_tests/topic-basic.bats b/tests/cli/fluvio_smoke_tests/topic-basic.bats index a6bef28d03..2b19996228 100644 --- a/tests/cli/fluvio_smoke_tests/topic-basic.bats +++ b/tests/cli/fluvio_smoke_tests/topic-basic.bats @@ -17,6 +17,12 @@ setup_file() { TOPIC_CONFIG_PATH="$TEST_DIR/$TOPIC_NAME.yaml" export TOPIC_CONFIG_PATH + REPLICA_CONFIG_PATH="$TEST_HELPER_DIR/replica.json" + export REPLICA_CONFIG_PATH + + TOPIC_NAME_REPLICA=$(random_string) + export TOPIC_NAME_REPLICA + DEDUP_FILTER_NAME="dedup-filter" export DEDUP_FILTER_NAME @@ -57,6 +63,23 @@ EOF assert_success } +# Create topic with replic assigmment +@test "Create a topic with replica assignment" { + # skip stable since it format changes + if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on fluvio cli stable version" + fi + if [ "$FLUVIO_CLUSTER_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on cluster stable version" + fi + debug_msg "Topic name: $TOPIC_NAME_REPLICA" + run timeout 15s "$FLUVIO_BIN" topic create "$TOPIC_NAME_REPLICA" --replica-assignment "$REPLICA_CONFIG_PATH" + #debug_msg "command $BATS_RUN_COMMAND" # This doesn't do anything. + debug_msg "status: $status" + debug_msg "output: ${lines[0]}" + assert_success +} + # Create topic - Negative test @test "Attempt to create a topic with same name" { debug_msg "Topic name: $TOPIC_NAME" diff --git a/tests/cli/test_helper/replica.json b/tests/cli/test_helper/replica.json new file mode 100644 index 0000000000..ee2755451f --- /dev/null +++ b/tests/cli/test_helper/replica.json @@ -0,0 +1,9 @@ +[ + { + "id": 0, + "replicas": [ + 0, + 1 + ] + } +] \ No newline at end of file