Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - fix: replica assignment #3422

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,8 @@ jobs:
name: CLI smoke test (${{ matrix.test }}) CLI (${{ matrix.cli_version }}) on cluster (${{ matrix.cluster_version }})
needs: build_image
runs-on: ${{ matrix.os }}
env:
SPU: 2
strategy:
# fail-fast: false
matrix:
Expand Down Expand Up @@ -1004,7 +1006,7 @@ jobs:
run: |
FLUVIO_CLUSTER_RELEASE_CHANNEL=stable
echo "FLUVIO_CLUSTER_RELEASE_CHANNEL=${FLUVIO_CLUSTER_RELEASE_CHANNEL}" >> $GITHUB_ENV
fluvio cluster start
fluvio cluster start --spu ${{ env.SPU }}

# download and start cluster for dev
- name: Deploy dev cluster on k8
Expand All @@ -1023,7 +1025,7 @@ jobs:
docker image tag infinyon/fluvio:${{ github.sha }}-x86_64-unknown-linux-musl infinyon/fluvio:${{ github.sha }}
docker image save infinyon/fluvio:${{ github.sha }} --output /tmp/infinyon-fluvio.tar
k3d image import -k /tmp/infinyon-fluvio.tar -c fluvio
fluvio-dev cluster start --develop
fluvio-dev cluster start --develop --spu ${{ env.SPU }}

- name: Build smartmodules for E2E test
run: |
Expand Down
19 changes: 5 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -137,20 +137,20 @@ fluvio_ws_stream_wasm = "0.7.0"
trybuild = { git = "https://github.com/infinyon/trybuild", branch = "check_option" }

# internal fluvio dep
fluvio = { version = "0.19.0", path = "crates/fluvio" }
fluvio = { version = "0.20.0", path = "crates/fluvio" }
fluvio-auth = { path = "crates/fluvio-auth" }
fluvio-channel = { path = "crates/fluvio-channel" }
fluvio-cli-common = { path = "crates/fluvio-cli-common"}
fluvio-compression = { version = "0.3", path = "crates/fluvio-compression" }
fluvio-connector-package = { path = "crates/fluvio-connector-package/" }
fluvio-controlplane = { path = "crates/fluvio-controlplane" }
fluvio-controlplane-metadata = { version = "0.22.0", default-features = false, path = "crates/fluvio-controlplane-metadata" }
fluvio-controlplane-metadata = { version = "0.23.0", default-features = false, path = "crates/fluvio-controlplane-metadata" }
fluvio-hub-util = { path = "crates/fluvio-hub-util" }
fluvio-extension-common = { path = "crates/fluvio-extension-common", default-features = false }
fluvio-package-index = { version = "0.7.0", path = "crates/fluvio-package-index", default-features = false }
fluvio-protocol = { version = "0.10.4", path = "crates/fluvio-protocol" }
fluvio-spu-schema = { version = "0.14.0", path = "crates/fluvio-spu-schema", default-features = false }
fluvio-sc-schema = { version = "0.19.0", path = "crates/fluvio-sc-schema", default-features = false }
fluvio-sc-schema = { version = "0.20.0", path = "crates/fluvio-sc-schema", default-features = false }
fluvio-service = { path = "crates/fluvio-service" }
fluvio-socket = { version = "0.14.3", path = "crates/fluvio-socket", default-features = false }
fluvio-smartengine = { version = "0.7.0", path = "crates/fluvio-smartengine", default-features = false }
Expand Down
17 changes: 17 additions & 0 deletions crates/fluvio-cli/src/client/topic/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,4 +235,21 @@ mod load {
.map_err(|err| IoError::new(ErrorKind::InvalidData, format!("{err}")))
}
}

#[cfg(test)]
mod test {

use fluvio_controlplane_metadata::topic::PartitionMaps;

use super::PartitionLoad;

#[test]
fn test_replica_map_file() {
let p_map = PartitionMaps::file_decode("test-data/topics/replica_assignment.json")
.expect("v1 not found");
assert_eq!(p_map.maps().len(), 3);
assert_eq!(p_map.maps()[0].id, 0);
assert_eq!(p_map.maps()[0].replicas, vec![5001, 5002, 5003]);
}
}
}
26 changes: 26 additions & 0 deletions crates/fluvio-cli/test-data/topics/replica_assignment.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[
{
"id": 0,
"replicas": [
5001,
5002,
5003
]
},
{
"id": 1,
"replicas": [
5002,
5003,
5001
]
},
{
"id": 2,
"replicas": [
5003,
5001,
5002
]
}
]
28 changes: 0 additions & 28 deletions crates/fluvio-cli/test-data/topics/valid_assignment.json

This file was deleted.

25 changes: 0 additions & 25 deletions crates/fluvio-cli/test-data/topics/valid_kf_assignment.json

This file was deleted.

4 changes: 2 additions & 2 deletions crates/fluvio-controlplane-metadata/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "fluvio-controlplane-metadata"
edition = "2021"
version = "0.22.4"
version = "0.23.0"
authors = ["Fluvio Contributors <team@fluvio.io>"]
description = "Metadata definition for Fluvio control plane"
repository = "https://github.com/infinyon/fluvio"
Expand Down Expand Up @@ -44,6 +44,6 @@ fluvio-protocol = { workspace = true, features = [ "record",] }


[dev-dependencies]
serde_yaml = { workspace = true }
fluvio-future = { workspace = true, features = ["fixture"] }
serde_json = { workspace = true }

23 changes: 23 additions & 0 deletions crates/fluvio-controlplane-metadata/src/topic/k8.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,26 @@ impl Spec for TopicSpec {
&TOPIC_V2_API
}
}

#[cfg(test)]
mod test_spec {

use std::{io::BufReader, fs::File};

use fluvio_stream_model::k8_types::K8Obj;

use crate::topic::ReplicaSpec;

use super::TopicSpec;

type K8TopicSpec = K8Obj<TopicSpec>;

#[test]
fn read_k8_topic_partition_assignment_json() {
let reader: BufReader<File> =
BufReader::new(File::open("tests/topic_assignment.json").expect("spec"));
let topic: K8TopicSpec = serde_json::from_reader(reader).expect("failed to parse topic");
assert_eq!(topic.metadata.name, "test3");
assert!(matches!(topic.spec.replicas(), ReplicaSpec::Assigned(_)));
}
}
34 changes: 18 additions & 16 deletions crates/fluvio-controlplane-metadata/src/topic/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ impl std::fmt::Display for ReplicaSpec {

impl Default for ReplicaSpec {
fn default() -> Self {
Self::Assigned(PartitionMaps::default())
Self::Computed(TopicReplicaParam::default())
}
}

Expand Down Expand Up @@ -364,14 +364,16 @@ impl std::fmt::Display for TopicReplicaParam {

/// Hack: field instead of new type to get around encode and decode limitations
#[derive(Debug, Default, Clone, Eq, PartialEq, Encoder, Decoder)]
#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
pub struct PartitionMaps {
maps: Vec<PartitionMap>,
}
#[cfg_attr(
feature = "use_serde",
derive(serde::Serialize, serde::Deserialize),
serde(transparent)
)]
pub struct PartitionMaps(Vec<PartitionMap>);

impl From<Vec<PartitionMap>> for PartitionMaps {
fn from(maps: Vec<PartitionMap>) -> Self {
Self { maps }
Self(maps)
}
}

Expand All @@ -387,26 +389,26 @@ impl From<Vec<(PartitionId, Vec<SpuId>)>> for PartitionMaps {

impl std::fmt::Display for PartitionMaps {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "partition map:{})", self.maps.len())
write!(f, "partition map:{})", self.0.len())
}
}

impl PartitionMaps {
pub fn maps(&self) -> &Vec<PartitionMap> {
&self.maps
&self.0
}

pub fn maps_owned(self) -> Vec<PartitionMap> {
self.maps
self.0
}

fn partition_count(&self) -> PartitionCount {
self.maps.len() as PartitionCount
self.0.len() as PartitionCount
}

fn replication_factor(&self) -> Option<ReplicationFactor> {
// compute replication form replica map
self.maps
self.0
.first()
.map(|partition| partition.replicas.len() as ReplicationFactor)
}
Expand All @@ -415,7 +417,7 @@ impl PartitionMaps {
use std::fmt::Write;

let mut res = String::new();
for partition in &self.maps {
for partition in &self.0 {
write!(res, "{}:{:?}, ", partition.id, partition.replicas).unwrap();
// ok to unwrap since this will not fail
}
Expand All @@ -433,7 +435,7 @@ impl PartitionMaps {
pub fn unique_spus_in_partition_map(&self) -> Vec<SpuId> {
let mut spu_ids: Vec<SpuId> = vec![];

for partition in &self.maps {
for partition in &self.0 {
for spu in &partition.replicas {
if !spu_ids.contains(spu) {
spu_ids.push(*spu);
Expand All @@ -448,7 +450,7 @@ impl PartitionMaps {
pub fn partition_map_to_replica_map(&self) -> ReplicaMap {
let mut replica_map: ReplicaMap = BTreeMap::new();

for partition in &self.maps {
for partition in &self.0 {
replica_map.insert(partition.id as PartitionId, partition.replicas.clone());
}

Expand All @@ -459,7 +461,7 @@ impl PartitionMaps {
#[allow(clippy::explicit_counter_loop)]
pub fn valid_partition_map(&self) -> Result<(), Error> {
// there must be at least one partition in the partition map
if self.maps.is_empty() {
if self.0.is_empty() {
return Err(Error::new(
ErrorKind::InvalidInput,
"no assigned partitions found",
Expand All @@ -477,7 +479,7 @@ impl PartitionMaps {
// - all elements must be positive integers
let mut id = 0;
let mut replica_len = 0;
for partition in &self.maps {
for partition in &self.0 {
if id == 0 {
// id must be 0
if partition.id != id {
Expand Down
22 changes: 22 additions & 0 deletions crates/fluvio-controlplane-metadata/tests/topic_assignment.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"apiVersion": "fluvio.infinyon.com/v2",
"kind": "Topic",
"metadata": {
"name": "test3",
"namespace": "default"
},
"spec": {
"compressionType": "Any",
"replicas": {
"assigned": [
{
"id": 0,
"replicas": [
5001,
5002
]
}
]
}
}
}
Loading
Loading