Skip to content
This repository has been archived by the owner on Mar 30, 2023. It is now read-only.

Commit

Permalink
GH-296: Add Kafka-backed MessageChannels
Browse files Browse the repository at this point in the history
Resolves #296

* * Add DSL support

* * Polishing and XML support
  • Loading branch information
garyrussell committed Feb 24, 2020
1 parent a89257f commit 879e7a7
Show file tree
Hide file tree
Showing 16 changed files with 1,222 additions and 4 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ compileTestKotlin {

ext {
assertkVersion = '0.20'
googleJsr305Version = '3.0.2'
jacksonVersion = '2.10.1'
junitJupiterVersion = '5.6.0'
log4jVersion = '2.13.0'
Expand Down Expand Up @@ -98,6 +99,7 @@ jacoco {
}

dependencies {
compileOnly "com.google.code.findbugs:jsr305:$googleJsr305Version"
compile 'org.springframework.integration:spring-integration-core'
compile "org.springframework.kafka:spring-kafka:$springKafkaVersion"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.kafka.channel;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.springframework.core.log.LogAccessor;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;

/**
* Abstract MessageChannel backed by a Kafka topic.
*
* @author Gary Russell
* @since 3.3
*
*/
public abstract class AbstractKafkaChannel extends AbstractMessageChannel {

protected final LogAccessor logger = new LogAccessor(super.logger); // NOSONAR final

private final KafkaOperations<?, ?> template;

protected final String topic; // NOSONAR final

private String groupId;

/**
* Construct an instance with the provided paramters.
* @param template the template.
* @param topic the topic.
*/
public AbstractKafkaChannel(KafkaOperations<?, ?> template, String topic) {
Assert.notNull(template, "'template' cannot be null");
Assert.notNull(topic, "'topic' cannot be null");
this.template = template;
this.topic = topic;
}

/**
* Set the group id for the consumer; if not set, the bean name will be used.
* @param groupId the group id.
*/
public void setGroupId(String groupId) {
this.groupId = groupId;
}

protected String getGroupId() {
return this.groupId;
}

@Override
protected boolean doSend(Message<?> message, long timeout) {
try {
this.template.send(MessageBuilder.fromMessage(message)
.setHeader(KafkaHeaders.TOPIC, this.topic)
.build())
.get(timeout, TimeUnit.MILLISECONDS);
}
catch (@SuppressWarnings("unused") InterruptedException e) {
Thread.currentThread().interrupt();
this.logger.debug(() -> "Interrupted while waiting for send result for: " + message);
return false;
}
catch (ExecutionException e) {
this.logger.error(e.getCause(), () -> "Interrupted while waiting for send result for: " + message);
return false;
}
catch (TimeoutException e) {
this.logger.debug(e, () -> "Timed out while waiting for send result for: " + message);
return false;
}
return true;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.kafka.channel;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import org.springframework.integration.channel.ExecutorChannelInterceptorAware;
import org.springframework.integration.kafka.inbound.KafkaMessageSource;
import org.springframework.integration.support.management.PollableChannelManagement;
import org.springframework.integration.support.management.metrics.CounterFacade;
import org.springframework.integration.support.management.metrics.MetricsCaptor;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.ExecutorChannelInterceptor;
import org.springframework.util.Assert;

/**
* Pollable channel backed by a Kafka topic.
*
* @author Gary Russell
* @since 3.3
*
*/
public class PollableKafkaChannel extends AbstractKafkaChannel
implements PollableChannel, PollableChannelManagement, ExecutorChannelInterceptorAware {

private final KafkaMessageSource<?, ?> source;

private CounterFacade receiveCounter;

private volatile int executorInterceptorsSize;

/**
* Construct an instance with the provided parameters.
* @param template the template for sending.
* @param source the source for receiving.
*/
public PollableKafkaChannel(KafkaOperations<?, ?> template, KafkaMessageSource<?, ?> source) {
super(template, topic(source));
this.source = source;
if (source.getConsumerProperties().getGroupId() == null) {
String groupId = getGroupId();
source.getConsumerProperties().setGroupId(groupId != null ? groupId : getBeanName());
}
}

private static String topic(KafkaMessageSource<?, ?> source) {
Assert.notNull(source, "'source' cannot be null");
Assert.isTrue(source.getConsumerProperties().getTopics().length == 1, "Only one topic is allowed");
return source.getConsumerProperties().getTopics()[0];
}

@Override
public int getReceiveCount() {
return getMetrics().getReceiveCount();
}

@Override
public long getReceiveCountLong() {
return getMetrics().getReceiveCountLong();
}

@Override
public int getReceiveErrorCount() {
return getMetrics().getReceiveErrorCount();
}

@Override
public long getReceiveErrorCountLong() {
return getMetrics().getReceiveErrorCountLong();
}

@Override
@Nullable
public Message<?> receive() {
return doReceive();
}

@Override
@Nullable
public Message<?> receive(long timeout) {
return doReceive();
}

@Nullable
protected Message<?> doReceive() {
ChannelInterceptorList interceptorList = getIChannelInterceptorList();
Deque<ChannelInterceptor> interceptorStack = null;
AtomicBoolean counted = new AtomicBoolean();
boolean countsEnabled = isCountsEnabled();
boolean traceEnabled = isLoggingEnabled() && logger.isTraceEnabled();
try {
if (traceEnabled) {
logger.trace("preReceive on channel '" + this + "'");
}
if (interceptorList.getInterceptors().size() > 0) {
interceptorStack = new ArrayDeque<>();
if (!interceptorList.preReceive(this, interceptorStack)) {
return null;
}
}
Message<?> message = this.source.receive();
if (message != null) {
incrementReceiveCounter();
message = interceptorList.postReceive(message, this);
}
interceptorList.afterReceiveCompletion(message, this, null, interceptorStack);
return message;
}
catch (RuntimeException ex) {
if (countsEnabled && !counted.get()) {
incrementReceiveErrorCounter(ex);
}
interceptorList.afterReceiveCompletion(null, this, ex, interceptorStack);
throw ex;
}
}

private void incrementReceiveCounter() {
MetricsCaptor metricsCaptor = getMetricsCaptor();
if (metricsCaptor != null) {
if (this.receiveCounter == null) {
this.receiveCounter = buildReceiveCounter(metricsCaptor, null);
}
this.receiveCounter.increment();
}
}

private void incrementReceiveErrorCounter(Exception ex) {
MetricsCaptor metricsCaptor = getMetricsCaptor();
if (metricsCaptor != null) {
buildReceiveCounter(metricsCaptor, ex).increment();
}
getMetrics().afterError();
}

private CounterFacade buildReceiveCounter(MetricsCaptor metricsCaptor, @Nullable Exception ex) {
CounterFacade counterFacade = metricsCaptor
.counterBuilder(RECEIVE_COUNTER_NAME)
.tag("name", getComponentName() == null ? "unknown" : getComponentName())
.tag("type", "channel")
.tag("result", ex == null ? "success" : "failure")
.tag("exception", ex == null ? "none" : ex.getClass().getSimpleName())
.description("Messages received")
.build();
this.meters.add(counterFacade);
return counterFacade;
}

@Override
public void setInterceptors(List<ChannelInterceptor> interceptors) {
super.setInterceptors(interceptors);
for (ChannelInterceptor interceptor : interceptors) {
if (interceptor instanceof ExecutorChannelInterceptor) {
this.executorInterceptorsSize++;
}
}
}

@Override
public void addInterceptor(ChannelInterceptor interceptor) {
super.addInterceptor(interceptor);
if (interceptor instanceof ExecutorChannelInterceptor) {
this.executorInterceptorsSize++;
}
}

@Override
public void addInterceptor(int index, ChannelInterceptor interceptor) {
super.addInterceptor(index, interceptor);
if (interceptor instanceof ExecutorChannelInterceptor) {
this.executorInterceptorsSize++;
}
}

@Override
public boolean removeInterceptor(ChannelInterceptor interceptor) {
boolean removed = super.removeInterceptor(interceptor);
if (removed && interceptor instanceof ExecutorChannelInterceptor) {
this.executorInterceptorsSize--;
}
return removed;
}

@Override
@Nullable
public ChannelInterceptor removeInterceptor(int index) {
ChannelInterceptor interceptor = super.removeInterceptor(index);
if (interceptor instanceof ExecutorChannelInterceptor) {
this.executorInterceptorsSize--;
}
return interceptor;
}

@Override
public boolean hasExecutorInterceptors() {
return this.executorInterceptorsSize > 0;
}

}
Loading

0 comments on commit 879e7a7

Please sign in to comment.