Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - feat: consume all partitions by default #3489

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/fluvio-cli-common/src/user_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ mod file_tests {
#[test]
fn file_lines() -> Result<(), ()> {
let mut file = NamedTempFile::new().unwrap();
let data = vec!["123", "abc", "📼🍅🐊"];
let data = ["123", "abc", "📼🍅🐊"];

writeln!(file, "{}", data[0]).unwrap();
writeln!(file, "{}", data[1]).unwrap();
Expand All @@ -173,7 +173,7 @@ mod file_tests {
fn file_whole() -> Result<(), ()> {
let mut file = NamedTempFile::new().unwrap();

let data = vec!["123", "abc", "📼🍅🐊"];
let data = ["123", "abc", "📼🍅🐊"];

writeln!(file, "{}", data[0]).unwrap();
writeln!(file, "{}", data[1]).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-cli/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ fn print_help_hack() -> Result<()> {
std::process::exit(0);
} else if let Some(first_arg) = args.nth(1) {
// We pick help up here as a courtesy
if vec!["-h", "--help", "help"].contains(&first_arg.as_str()) {
if ["-h", "--help", "help"].contains(&first_arg.as_str()) {
HelpOpt {}.process()?;
std::process::exit(0);
}
Expand Down
18 changes: 10 additions & 8 deletions crates/fluvio-cli/src/client/consume/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ mod cmd {
pub topic: String,

/// Partition id
#[arg(short = 'p', long, default_value = "0", value_name = "integer")]
pub partition: PartitionId,
#[arg(short = 'p', long, value_name = "integer")]
pub partition: Vec<PartitionId>,

/// Consume records from all partitions
#[arg(short = 'A', long = "all-partitions", conflicts_with_all = &["partition"])]
Expand Down Expand Up @@ -211,7 +211,7 @@ mod cmd {
#[instrument(
skip(self, fluvio),
name = "Consume",
fields(topic = %self.topic, partition = self.partition),
fields(topic = %self.topic, partition = ?self.partition),
)]
async fn process_client<O: Terminal + Debug + Send + Sync>(
self,
Expand Down Expand Up @@ -251,17 +251,19 @@ mod cmd {
None
};

if self.all_partitions {
if self.all_partitions || self.partition.is_empty() {
let consumer = fluvio
.consumer(PartitionSelectionStrategy::All(self.topic.clone()))
.await?;
self.consume_records(consumer, maybe_tableformat).await?;
} else {
let consumer = fluvio
.consumer(PartitionSelectionStrategy::Multiple(vec![(
self.topic.clone(),
self.partition,
)]))
.consumer(PartitionSelectionStrategy::Multiple(
self.partition
.iter()
.map(|p| (self.topic.clone(), *p))
.collect(),
))
.await?;
self.consume_records(consumer, maybe_tableformat).await?;
};
Expand Down
14 changes: 4 additions & 10 deletions crates/fluvio-connector-package/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ mod v1 {

impl MetaConfigV1 {
pub fn secrets(&self) -> HashSet<SecretConfig> {
HashSet::from_iter(self.secrets.clone().unwrap_or_default().into_iter())
HashSet::from_iter(self.secrets.clone().unwrap_or_default())
}

pub fn direction(&self) -> Direction {
Expand Down Expand Up @@ -184,7 +184,7 @@ mod v2 {

impl MetaConfigV2 {
pub fn secrets(&self) -> HashSet<SecretConfig> {
HashSet::from_iter(self.secrets.clone().unwrap_or_default().into_iter())
HashSet::from_iter(self.secrets.clone().unwrap_or_default())
}

pub fn direction(&self) -> Direction {
Expand Down Expand Up @@ -274,7 +274,7 @@ impl MetaConfig<'_> {

#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
pub struct ConsumerParameters {
#[serde(default, skip_serializing_if = "ConsumerPartitionConfig::is_default")]
#[serde(default)]
pub partition: ConsumerPartitionConfig,
#[serde(
with = "bytesize_serde",
Expand Down Expand Up @@ -397,13 +397,7 @@ pub enum ConsumerPartitionConfig {

impl Default for ConsumerPartitionConfig {
fn default() -> Self {
Self::One(0)
}
}

impl ConsumerPartitionConfig {
pub fn is_default(&self) -> bool {
matches!(self, ConsumerPartitionConfig::One(partition) if partition.eq(&PartitionId::default()))
Self::All
}
}

Expand Down
4 changes: 2 additions & 2 deletions crates/fluvio-hub-protocol/src/package_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ impl From<&SmartModuleVisibility> for PkgVisibility {

#[test]
fn builds_obj_key_from_package_name() {
let pkg_names = vec![
let pkg_names = [
"infinyon/example@0.0.1",
"infinyon/example-sm@0.1.0",
"infinyon/json-sql@0.0.2",
Expand All @@ -275,7 +275,7 @@ fn builds_obj_key_from_package_name() {
"infinyon/test-cli@0.1.0",
"infinyon/regex@0.0.1",
];
let obj_paths = vec![
let obj_paths = [
"infinyon/example-0.0.1.ipkg",
"infinyon/example-sm-0.1.0.ipkg",
"infinyon/json-sql-0.0.2.ipkg",
Expand Down
8 changes: 4 additions & 4 deletions crates/fluvio-hub-util/src/package_meta_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ mod tests {

#[test]
fn builds_obj_key_from_package_name() {
let pkg_names = vec![
let pkg_names = [
"infinyon/example@0.0.1",
"infinyon/example-sm@0.1.0",
"infinyon/json-sql@0.0.2",
Expand All @@ -196,7 +196,7 @@ mod tests {
"infinyon/test-cli@0.1.0",
"infinyon/regex@0.0.1",
];
let obj_paths = vec![
let obj_paths = [
"infinyon/example-0.0.1.ipkg",
"infinyon/example-sm-0.1.0.ipkg",
"infinyon/json-sql-0.0.2.ipkg",
Expand Down Expand Up @@ -414,7 +414,7 @@ mod tests {
/// the current code should be able to load all old versions
#[test]
fn backward_compat() {
let flist = vec![
let flist = [
"tests/apackage/package-meta.yaml",
"tests/apackage/package-meta-v0.1.yaml",
"tests/apackage/package-meta-v0.2-owner.yaml",
Expand All @@ -435,7 +435,7 @@ mod tests {
const TAGGED_PM: &str = "tests/apackage/package-meta-v0.3-targets.yaml";
let msg = format!("couldn't read {TAGGED_PM}");
let pm = read_pkgmeta(TAGGED_PM).expect(&msg);
let tags = vec![
let tags = [
("arch", "aarch64-apple-darwin"),
("arch", "aarch64-unknown-linux-musl"),
]
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-protocol/src/core/bytebuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ impl From<Bytes> for ByteBuf {
impl From<Vec<u8>> for ByteBuf {
fn from(bytes: Vec<u8>) -> Self {
ByteBuf {
inner: Bytes::from_iter(bytes.into_iter()),
inner: Bytes::from_iter(bytes),
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions crates/fluvio-spu/src/smartengine/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ async fn lookback_last_iterator<R: ReplicaStorage>(

let Some(file_slice) = slice.file_slice else {
trace!(?slice);
return Ok(Box::new(std::iter::empty()))
return Ok(Box::new(std::iter::empty()));
};

let batch_iter = FileBatchIterator::from_raw_slice(file_slice);
Expand Down Expand Up @@ -218,7 +218,9 @@ async fn read_batches_by_age<R: ReplicaStorage>(
break;
};
let mut batch_iter = FileBatchIterator::from_raw_slice(file_slice);
let Some(batch) = batch_iter.next() else { break };
let Some(batch) = batch_iter.next() else {
break;
};
let batch = batch?;
trace!(?batch.batch, "next file batch");

Expand Down
4 changes: 2 additions & 2 deletions crates/fluvio-test/src/tests/producer_fail/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub async fn produce_batch(
println!("Got cluster manager");

let value = "a".repeat(5000);
let result: Result<_> = (|| async move {
let result: Result<_> = async move {
let mut results = Vec::new();
for _ in 0..1000 {
let result = producer.send(RecordKey::NULL, value.clone()).await?;
Expand Down Expand Up @@ -86,7 +86,7 @@ pub async fn produce_batch(
producer.flush().await?;

Ok(())
})()
}
.await;

// Ensure that one of the calls returned a failure
Expand Down
51 changes: 51 additions & 0 deletions tests/cli/fluvio_smoke_tests/e2e-basic.bats
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ setup_file() {
export TOPIC_NAME_13
debug_msg "Topic name: $TOPIC_NAME_13"

TOPIC_NAME_14=$(random_string)
export TOPIC_NAME_14
debug_msg "Topic name: $TOPIC_NAME_14"

MESSAGE="$(random_string 7)"
export MESSAGE
debug_msg "$MESSAGE"
Expand Down Expand Up @@ -107,6 +111,19 @@ setup_file() {

teardown_file() {
run timeout 15s "$FLUVIO_BIN" topic delete "$TOPIC_NAME"
run timeout 15s "$FLUVIO_BIN" topic delete "$TOPIC_NAME2"
run timeout 15s "$FLUVIO_BIN" topic delete "$TOPIC_NAME3"
run timeout 15s "$FLUVIO_BIN" topic delete "$TOPIC_NAME4"
run timeout 15s "$FLUVIO_BIN" topic delete "$TOPIC_NAME5"
run timeout 15s "$FLUVIO_BIN" topic delete "$TOPIC_NAME6"
run timeout 15s "$FLUVIO_BIN" topic delete "$TOPIC_NAME7"
run timeout 15s "$FLUVIO_BIN" topic delete "$TOPIC_NAME8"
run timeout 15s "$FLUVIO_BIN" topic delete "$TOPIC_NAME9"
run timeout 15s "$FLUVIO_BIN" topic delete "$TOPIC_NAME10"
run timeout 15s "$FLUVIO_BIN" topic delete "$TOPIC_NAME11"
run timeout 15s "$FLUVIO_BIN" topic delete "$TOPIC_NAME12"
run timeout 15s "$FLUVIO_BIN" topic delete "$TOPIC_NAME13"
run timeout 15s "$FLUVIO_BIN" topic delete "$TOPIC_NAME14"
}

# Create topic
Expand Down Expand Up @@ -138,6 +155,8 @@ teardown_file() {
assert_success
run timeout 15s "$FLUVIO_BIN" topic create "$TOPIC_NAME_13" --compression-type zstd
assert_success
run timeout 15s "$FLUVIO_BIN" topic create "$TOPIC_NAME_14" -p 3
assert_success
}

# Produce message
Expand Down Expand Up @@ -168,6 +187,12 @@ teardown_file() {
assert_success
run bash -c 'echo -e "$AT_LEAST_ONCE_MESSAGE" | timeout 15s "$FLUVIO_BIN" produce "$TOPIC_NAME_12" --delivery-semantic AtLeastOnce'
assert_success
run bash -c 'echo -e "1:1" | timeout 15s "$FLUVIO_BIN" produce "$TOPIC_NAME_14" --key-separator ":"'
assert_success
run bash -c 'echo -e "2:2" | timeout 15s "$FLUVIO_BIN" produce "$TOPIC_NAME_14" --key-separator ":"'
assert_success
run bash -c 'echo -e "3:3" | timeout 15s "$FLUVIO_BIN" produce "$TOPIC_NAME_14" --key-separator ":"'
assert_success
}

# Consume message and compare message
Expand Down Expand Up @@ -270,3 +295,29 @@ teardown_file() {
assert_output --partial "$AT_LEAST_ONCE_MESSAGE"
assert_success
}

@test "Consume all partitions by default" {
if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then
skip "don't run on stable version"
fi
run timeout 15s "$FLUVIO_BIN" consume "$TOPIC_NAME_14" -B -d

assert_output --partial "1"
assert_output --partial "2"
assert_output --partial "3"
assert_success
}

@test "Consume subset of partitions" {
if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then
skip "don't run on stable version"
fi
run timeout 15s "$FLUVIO_BIN" consume "$TOPIC_NAME_14" -p 1 -p 2 -B -d

assert_output --partial "1"
assert_output --partial "2"
refute_output --partial "3"
assert_success
}


Loading