Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove ConsumerRebalanceListener from ConsumerVerticleContext #3207

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.vertx.core.Vertx;
import java.util.Objects;
import java.util.function.BiFunction;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,6 +41,7 @@ public interface Initializer extends BiFunction<Vertx, ConsumerVerticle, Future<
private final ConsumerVerticleContext consumerVerticleContext;

ReactiveKafkaConsumer<Object, CloudEvent> consumer;
ConsumerRebalanceListener consumerRebalanceListener;
RecordDispatcher recordDispatcher;
private AsyncCloseable closeable;

Expand Down Expand Up @@ -85,6 +87,10 @@ public void setCloser(AsyncCloseable closeable) {
this.closeable = closeable;
}

public void setRebalanceListener(ConsumerRebalanceListener consumerRebalanceListener) {
this.consumerRebalanceListener = consumerRebalanceListener;
}

void exceptionHandler(Throwable cause) {
logger.error("Consumer exception {}", consumerVerticleContext.getLoggingKeyValue(), cause);

Expand All @@ -98,5 +104,9 @@ protected ConsumerVerticleContext getConsumerVerticleContext() {
return consumerVerticleContext;
}

protected ConsumerRebalanceListener getConsumerRebalanceListener() {
return consumerRebalanceListener;
}

public abstract PartitionRevokedHandler getPartitionRevokedHandler();
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ public OrderedConsumerVerticle(final ConsumerVerticleContext context, final Init

@Override
void startConsumer(Promise<Void> startPromise) {
Objects.requireNonNull(getConsumerVerticleContext().getConsumerRebalanceListener());
Objects.requireNonNull(getConsumerRebalanceListener());
// We need to sub first, then we can start the polling loop
this.consumer
.subscribe(
Set.copyOf(getConsumerVerticleContext().getResource().getTopicsList()),
getConsumerVerticleContext().getConsumerRebalanceListener())
getConsumerRebalanceListener())
.onFailure(startPromise::fail)
.onSuccess(v -> {
if (this.pollTimer.compareAndSet(-1, 0)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ public UnorderedConsumerVerticle(final ConsumerVerticleContext context, final In

@Override
void startConsumer(final Promise<Void> startPromise) {
Objects.requireNonNull(getConsumerVerticleContext().getConsumerRebalanceListener());
Objects.requireNonNull(getConsumerRebalanceListener());

this.consumer
.subscribe(
Set.copyOf(getConsumerVerticleContext().getResource().getTopicsList()),
getConsumerVerticleContext().getConsumerRebalanceListener())
getConsumerRebalanceListener())
.onFailure(startPromise::tryFail)
.onSuccess(startPromise::tryComplete)
.onSuccess(v -> poll());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ private void build(final Vertx vertx, final ConsumerVerticle consumerVerticle, f

final var partitionRevokedHandlers =
List.of(consumerVerticle.getPartitionRevokedHandler(), offsetManager.getPartitionRevokedHandler());
this.consumerVerticleContext.withConsumerRebalanceListener(createRebalanceListener(partitionRevokedHandlers));
consumerVerticle.setRebalanceListener(createRebalanceListener(partitionRevokedHandlers));
}

private ConsumerVerticle createConsumerVerticle(final ConsumerVerticle.Initializer initializer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -64,8 +63,6 @@ public class ConsumerVerticleContext {

private Tags tags;

private ConsumerRebalanceListener consumerRebalanceListener;

public ConsumerVerticleContext withConsumerConfigs(final Map<String, Object> consumerConfigs) {
this.consumerConfigs = new HashMap<>(consumerConfigs);
return this;
Expand Down Expand Up @@ -157,16 +154,6 @@ public ConsumerVerticleContext withProducerFactory(final ProducerFactory<String,
return this;
}

public ConsumerVerticleContext withConsumerRebalanceListener(
final ConsumerRebalanceListener consumerRebalanceListener) {
this.consumerRebalanceListener = consumerRebalanceListener;
return this;
}

public ConsumerRebalanceListener getConsumerRebalanceListener() {
return consumerRebalanceListener;
}

public DataPlaneContract.Resource getResource() {
return resource;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.vertx.micrometer.backends.BackendRegistries;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -89,7 +90,13 @@ public void subscribedToTopic(final Vertx vertx, final VertxTestContext context)
consumerVerticle.setConsumer(new MockReactiveKafkaConsumer<>(consumer));
consumerVerticle.setRecordDispatcher(recordDispatcher);
consumerVerticle.setCloser(Future::succeededFuture);
consumerVerticle.setRebalanceListener(new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {}

@Override
public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {}
});
return Future.succeededFuture();
});

Expand Down Expand Up @@ -130,6 +137,13 @@ public void stop(final Vertx vertx, final VertxTestContext context) {
consumerVerticle.setConsumer(new MockReactiveKafkaConsumer<>(consumer));
consumerVerticle.setRecordDispatcher(recordDispatcher);
consumerVerticle.setCloser(Future::succeededFuture);
consumerVerticle.setRebalanceListener(new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {}

@Override
public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {}
});

return Future.succeededFuture();
});
Expand Down Expand Up @@ -207,7 +221,13 @@ public void shouldCloseEverything(final Vertx vertx, final VertxTestContext cont
consumerVerticle.setConsumer(consumer);
consumerVerticle.setRecordDispatcher(recordDispatcher);
consumerVerticle.setCloser(Future::succeededFuture);
consumerVerticle.setRebalanceListener(new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {}

@Override
public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {}
});
return Future.succeededFuture();
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.vertx.core.Vertx;
import io.vertx.junit5.VertxExtension;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -43,6 +44,7 @@
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
Expand Down Expand Up @@ -123,7 +125,13 @@ public void consumeOneByOne(
consumerVerticle.setConsumer(new MockReactiveKafkaConsumer<>(consumer));
consumerVerticle.setRecordDispatcher(recordDispatcher);
consumerVerticle.setCloser(Future::succeededFuture);
consumerVerticle.setRebalanceListener(new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {}

@Override
public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {}
});
return Future.succeededFuture();
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import dev.knative.eventing.kafka.broker.core.testing.CoreObjects;
import io.vertx.ext.web.client.WebClientOptions;
import java.util.HashMap;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;

public class FakeConsumerVerticleContext {

Expand All @@ -30,16 +29,7 @@ public static ConsumerVerticleContext get() {
.withProducerConfigs(new HashMap<>())
.withConsumerConfigs(new HashMap<>())
.withMeterRegistry(Metrics.getRegistry())
.withResource(CoreObjects.resource1(), CoreObjects.egress1())
.withConsumerRebalanceListener(new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(
final java.util.Collection<org.apache.kafka.common.TopicPartition> partitions) {}

@Override
public void onPartitionsAssigned(
final java.util.Collection<org.apache.kafka.common.TopicPartition> partitions) {}
});
.withResource(CoreObjects.resource1(), CoreObjects.egress1());
}

public static ConsumerVerticleContext get(
Expand All @@ -50,15 +40,6 @@ public static ConsumerVerticleContext get(
.withAuthProvider(AuthProvider.noAuth())
.withWebClientOptions(new WebClientOptions())
.withMeterRegistry(Metrics.getRegistry())
.withResource(resource, egress)
.withConsumerRebalanceListener(new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(
final java.util.Collection<org.apache.kafka.common.TopicPartition> partitions) {}

@Override
public void onPartitionsAssigned(
final java.util.Collection<org.apache.kafka.common.TopicPartition> partitions) {}
});
.withResource(resource, egress);
}
}
Loading