Skip to content

Commit

Permalink
GH-1827: Add ContainerGroupSequencer
Browse files Browse the repository at this point in the history
Resolves #1827

Add a mechanism to automatically control a sequence of containers,
starting a group of containers when the current group goes idle.

* Improve test; fix race.

* Fix typo and revert test logging level.
  • Loading branch information
garyrussell committed Jun 15, 2021
1 parent cf954d7 commit eef4389
Show file tree
Hide file tree
Showing 12 changed files with 639 additions and 14 deletions.
53 changes: 53 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/kafka.adoc
Expand Up @@ -2397,6 +2397,59 @@ If the function returns `null`, the handler's default `BackOff` will be used.
Starting with version 2.6.3, set `resetStateOnExceptionChange` to `true` and the retry sequence will be restarted (including the selection of a new `BackOff`, if so configured) if the exception type changes between failures.
By default, the exception type is not considered.

[[sequencing]]
===== Starting `@KafkaListener` s in Sequence

A common use case is to start a listener after another listener has consumed all the records in a topic.
For example, you may want to load the contents of one or more compacted topics into memory before processing records from other topics.
Starting with version 2.7.3, a new component `ContainerGroupSequencer` has been introduced.
It uses the `@KafkaListener` `containerGroup` property to group containers together and start the containers in the next group, when all the containers in the current group have gone idle.

It is best illustrated with an example.

====
[source, java]
----
@KafkaListener(id = "listen1", topics = "topic1", containerGroup = "g1", concurrency = "2")
public void listen1(String in) {
}
@KafkaListener(id = "listen2", topics = "topic2", containerGroup = "g1", concurrency = "2")
public void listen2(String in) {
}
@KafkaListener(id = "listen3", topics = "topic3", containerGroup = "g2", concurrency = "2")
public void listen3(String in) {
}
@KafkaListener(id = "listen4", topics = "topic4", containerGroup = "g2", concurrency = "2")
public void listen4(String in) {
}
@Bean
ContainerGroupSequencer sequencer(KafkaListenerEndpointRegistry registry) {
return new ContainerGroupSequencer(registry, 5000, "g1", "g2");
}
----
====

Here, we have 4 listeners in two groups, `g1` and `g2`.

During application context initialization, the sequencer, sets the `autoStartup` property of all the containers in the provided groups to `false`.
It also sets the `idleEventInterval` for any containers (that do not already have one set) to the supplied value (5000ms in this case).
Then, when the sequencer is started by the application context, the containers in the first group are started.
As `ListenerContainerIdleEvent` s are received, each individual child container in each container is stopped.
When all child containers in a `ConcurrentMessageListenerContainer` are stopped, the parent container is stopped.
When all containers in a group have been stopped, the containers in the next group are started.
There is no limit to the number of groups or containers in a group.

By default, the containers in the final group (`g2` above) are not stopped when they go idle.
To modify that behavior, set `stopLastGroupWhenIdle` to `true` on the sequencer.

As an aside; previously, containers in each group were added to a bean of type `Collection<MessageListenerContainer>` with the bean name being the `containerGroup`.
These collections are now deprecated in favor of beans of type `ContainerGroup` with a bean name that is the group name, suffixed with `.group`; in the example above, there would be 2 beans `g1.group` and `g2.group`.
The `Collection` beans will be removed in a future release.

[[container-props]]
==== Listener Container Properties

Expand Down
5 changes: 5 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/whats-new.adoc
Expand Up @@ -94,3 +94,8 @@ See <<configuring-topics>> for more information.

It is now possible to add a `spring-messaging` `SmartMessageConverter` to the `MessagingMessageConverter`, allowing content negotiation based on the `contentType` header.
See <<messaging-message-conversion>> for more information.

[[x27-sequencing]]
==== Sequencing `@KafkaListener` s

See <<container-sequencing>> for more information.
Expand Up @@ -23,6 +23,7 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import org.springframework.kafka.listener.ContainerGroup;
import org.springframework.messaging.handler.annotation.MessageMapping;

/**
Expand Down Expand Up @@ -148,11 +149,14 @@
TopicPartition[] topicPartitions() default {};

/**
* If provided, the listener container for this listener will be added to a bean
* with this value as its name, of type {@code Collection<MessageListenerContainer>}.
* This allows, for example, iteration over the collection to start/stop a subset
* of containers.
* <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
* If provided, the listener container for this listener will be added to a bean with
* this value as its name, of type {@code Collection<MessageListenerContainer>}. This
* allows, for example, iteration over the collection to start/stop a subset of
* containers. The {@code Collection} beans are deprecated as of version 2.7.3 and
* will be removed in 2.8. Instead, a bean with name {@code containerGroup + ".group"}
* and type {@link ContainerGroup} should be used instead.
* <p>
* SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
* @return the bean name for the group.
*/
String containerGroup() default "";
Expand Down
Expand Up @@ -81,6 +81,7 @@
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
import org.springframework.kafka.listener.ContainerGroupSequencer;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.retrytopic.RetryTopicBootstrapper;
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
Expand Down Expand Up @@ -305,6 +306,9 @@ public void afterSingletonsInstantiated() {

// Actually register all listeners
this.registrar.afterPropertiesSet();
Map<String, ContainerGroupSequencer> sequencers =
this.applicationContext.getBeansOfType(ContainerGroupSequencer.class, false, false);
sequencers.values().forEach(seq -> seq.initialize());
}

private void buildEnhancer() {
Expand Down
Expand Up @@ -39,6 +39,7 @@
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ContainerGroup;
import org.springframework.kafka.listener.ListenerContainerRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.lang.Nullable;
Expand Down Expand Up @@ -181,16 +182,22 @@ public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListe
MessageListenerContainer container = createListenerContainer(endpoint, factory);
this.listenerContainers.put(id, container);
ConfigurableApplicationContext appContext = this.applicationContext;
if (StringUtils.hasText(endpoint.getGroup()) && appContext != null) {
String groupName = endpoint.getGroup();
if (StringUtils.hasText(groupName) && appContext != null) {
List<MessageListenerContainer> containerGroup;
if (appContext.containsBean(endpoint.getGroup())) { // NOSONAR - hasText
containerGroup = appContext.getBean(endpoint.getGroup(), List.class); // NOSONAR - hasText
ContainerGroup group;
if (appContext.containsBean(groupName)) { // NOSONAR - hasText
containerGroup = appContext.getBean(groupName, List.class); // NOSONAR - hasText
group = appContext.getBean(groupName + ".group", ContainerGroup.class);
}
else {
containerGroup = new ArrayList<MessageListenerContainer>();
appContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup); // NOSONAR - hasText
appContext.getBeanFactory().registerSingleton(groupName, containerGroup); // NOSONAR - hasText
group = new ContainerGroup(groupName);
appContext.getBeanFactory().registerSingleton(groupName + ".group", group);
}
containerGroup.add(container);
group.addContainers(container);
}
if (startImmediately) {
startIfNecessary(container);
Expand Down
Expand Up @@ -154,6 +154,19 @@ public boolean isContainerPaused() {
}
}

@Override
public boolean isChildRunning() {
if (!isRunning()) {
return false;
}
for (MessageListenerContainer container : this.containers) {
if (container.isRunning()) {
return true;
}
}
return false;
}

@Override
public Map<String, Map<MetricName, ? extends Metric>> metrics() {
synchronized (this.lifecycleMonitor) {
Expand Down Expand Up @@ -265,7 +278,11 @@ private TopicPartitionOffset[] partitionSubset(ContainerProperties containerProp
protected void doStop(final Runnable callback) {
final AtomicInteger count = new AtomicInteger();
if (isRunning()) {
boolean childRunning = isChildRunning();
setRunning(false);
if (!childRunning) {
callback.run();
}
for (KafkaMessageListenerContainer<K, V> container : this.containers) {
if (container.isRunning()) {
count.incrementAndGet();
Expand Down
@@ -0,0 +1,165 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener;

import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.commons.logging.LogFactory;

import org.springframework.context.Lifecycle;
import org.springframework.core.log.LogAccessor;
import org.springframework.util.Assert;

/**
* A group of listener containers.
*
* @author Gary Russell
* @since 2.7.3
*
*/
public class ContainerGroup implements Lifecycle {

private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(ContainerGroup.class));

private final String name;

private final Collection<MessageListenerContainer> containers = new LinkedHashSet<>();

private boolean running;

/**
* Construct an instance with the provided name.
* @param name the group name.
*/
public ContainerGroup(String name) {
this.name = name;
}

/**
* Construct an instance with the provided name and containers.
* @param name the group name.
* @param containers the containers.
*/
public ContainerGroup(String name, List<MessageListenerContainer> containers) {
this.name = name;
this.containers.addAll(containers);
}

/**
* Construct an instance with the provided name and containers.
* @param name the group name.
* @param containers the containers.
*/
public ContainerGroup(String name, MessageListenerContainer...containers) {
this.name = name;
for (MessageListenerContainer container : containers) {
this.containers.add(container);
}
}

/**
* Return the group name.
* @return the name.
*/
public String getName() {
return this.name;
}

/**
* Return the listener ids of the containers in this group.
* @return the listener ids.
*/
public Collection<String> getListenerIds() {
return this.containers.stream()
.map(container -> container.getListenerId())
.map(id -> {
Assert.state(id != null, "Containers must have listener ids to be used here");
return id;
})
.collect(Collectors.toList());
}

/**
* Return true if the provided container is in this group.
* @param container the container.
* @return true if it is in this group.
*/
public boolean contains(MessageListenerContainer container) {
return this.containers.contains(container);
}

/**
* Return true if all containers in this group are stopped.
* @return true if all are stopped.
*/
public boolean allStopped() {
return this.containers.stream()
.allMatch(container -> !container.isRunning());
}

/**
* Add one or more containers to the group.
* @param theContainers the container(s).
*/
public void addContainers(MessageListenerContainer... theContainers) {
for (MessageListenerContainer container : theContainers) {
this.containers.add(container);
}
}

/**
* Remove a container from the group.
* @param container the container.
* @return true if the container was removed.
*/
public boolean removeContainer(MessageListenerContainer container) {
return this.containers.remove(container);
}

@Override
public synchronized void start() {
if (!this.running) {
this.containers.forEach(container -> {
LOGGER.debug(() -> "Starting: " + container);
container.start();
});
this.running = true;
}
}

@Override
public synchronized void stop() {
if (this.running) {
this.containers.forEach(container -> container.stop());
this.running = false;
}
}

@Override
public synchronized boolean isRunning() {
return this.running;
}

@Override
public String toString() {
return "ContainerGroup [name=" + this.name + ", containers=" + this.containers + "]";
}

}

0 comments on commit eef4389

Please sign in to comment.