Skip to content

Commit

Permalink
GH-1681: Add "after" methods to RecordInterceptor
Browse files Browse the repository at this point in the history
Resolves #1681

Also add `BatchInterceptor`.

* Fix LogAccessor Usage.
  • Loading branch information
garyrussell committed Jan 27, 2021
1 parent 8879427 commit f2f54ca
Show file tree
Hide file tree
Showing 12 changed files with 409 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.BatchErrorHandler;
import org.springframework.kafka.listener.BatchInterceptor;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.GenericErrorHandler;
Expand Down Expand Up @@ -106,6 +107,8 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe

private RecordInterceptor<K, V> recordInterceptor;

private BatchInterceptor<K, V> batchInterceptor;

private BatchToRecordAdapter<K, V> batchToRecordAdapter;

private ApplicationContext applicationContext;
Expand Down Expand Up @@ -309,6 +312,16 @@ public void setRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
this.recordInterceptor = recordInterceptor;
}

/**
* Set a batch interceptor to be called before and after calling the listener.
* Does not apply to batch listeners.
* @param batchInterceptor the interceptor.
* @since 2.7
*/
public void setBatchInterceptor(BatchInterceptor<K, V> batchInterceptor) {
this.batchInterceptor = batchInterceptor;
}

/**
* Set a {@link BatchToRecordAdapter}.
* @param batchToRecordAdapter the adapter.
Expand Down Expand Up @@ -407,6 +420,7 @@ else if (this.autoStartup != null) {
instance.setAutoStartup(this.autoStartup);
}
instance.setRecordInterceptor(this.recordInterceptor);
instance.setBatchInterceptor(this.batchInterceptor);
JavaUtils.INSTANCE
.acceptIfNotNull(this.phase, instance::setPhase)
.acceptIfNotNull(this.applicationContext, instance::setApplicationContext)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 the original author or authors.
* Copyright 2016-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -95,6 +95,8 @@ public abstract class AbstractMessageListenerContainer<K, V>

private RecordInterceptor<K, V> recordInterceptor;

private BatchInterceptor<K, V> batchInterceptor;

private boolean interceptBeforeTx;

private volatile boolean running = false;
Expand Down Expand Up @@ -299,7 +301,7 @@ protected RecordInterceptor<K, V> getRecordInterceptor() {
}

/**
* Set an interceptor to be called before calling the listener.
* Set an interceptor to be called before calling the record listener.
* Does not apply to batch listeners.
* @param recordInterceptor the interceptor.
* @since 2.2.7
Expand All @@ -309,6 +311,21 @@ public void setRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
this.recordInterceptor = recordInterceptor;
}

protected BatchInterceptor<K, V> getBatchInterceptor() {
return this.batchInterceptor;
}

/**
* Set an interceptor to be called before calling the record listener.
* Does not apply to batch listeners.
* @param batchInterceptor the interceptor.
* @since 2.6.6
* @see #setInterceptBeforeTx(boolean)
*/
public void setBatchInterceptor(BatchInterceptor<K, V> batchInterceptor) {
this.batchInterceptor = batchInterceptor;
}

protected boolean isInterceptBeforeTx() {
return this.interceptBeforeTx;
}
Expand All @@ -318,6 +335,7 @@ protected boolean isInterceptBeforeTx() {
* @param interceptBeforeTx true to intercept before the transaction.
* @since 2.3.4
* @see #setRecordInterceptor(RecordInterceptor)
* @see #setBatchInterceptor(BatchInterceptor)
*/
public void setInterceptBeforeTx(boolean interceptBeforeTx) {
this.interceptBeforeTx = interceptBeforeTx;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.springframework.lang.Nullable;

/**
* An interceptor for batches of records.
*
* @param <K> the key type.
* @param <V> the value type.
*
* @author Gary Russell
* @since 2.7
*
*/
@FunctionalInterface
public interface BatchInterceptor<K, V> {

/**
* Perform some action on the records or return a different one. If null is returned
* the records will be skipped. Invoked before the listener.
* @param records the records.
* @return the records or null.
*/
@Nullable
ConsumerRecords<K, V> intercept(ConsumerRecords<K, V> records);

/**
* Called after the listener exits normally.
* @param records the records.
*/
default void success(ConsumerRecords<K, V> records) {
}

/**
* Called after the listener throws an exception.
* @param records the records.
* @param exception the exception.
*/
default void failure(ConsumerRecords<K, V> records, Exception exception) {
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.springframework.util.Assert;

/**
* A {@link BatchInterceptor} that delegates to one or more {@link BatchInterceptor}s in
* order.
*
* @param <K> the key type.
* @param <V> the value type.
*
* @author Gary Russell
* @since 2.7
*
*/
public class CompositeBatchInterceptor<K, V> implements BatchInterceptor<K, V> {

private final Collection<BatchInterceptor<K, V>> delegates = new ArrayList<>();

/**
* Construct an instance with the provided delegates.
* @param delegates the delegates.
*/
@SafeVarargs
@SuppressWarnings("varargs")
public CompositeBatchInterceptor(BatchInterceptor<K, V>... delegates) {
Assert.notNull(delegates, "'delegates' cannot be null");
Assert.noNullElements(delegates, "'delegates' cannot have null entries");
this.delegates.addAll(Arrays.asList(delegates));
}

@Override
public ConsumerRecords<K, V> intercept(ConsumerRecords<K, V> records) {
ConsumerRecords<K, V> recordsToIntercept = records;
for (BatchInterceptor<K, V> delegate : this.delegates) {
recordsToIntercept = delegate.intercept(recordsToIntercept);
}
return recordsToIntercept;
}

@Override
public void success(ConsumerRecords<K, V> records) {
this.delegates.forEach(del -> del.success(records));
}

@Override
public void failure(ConsumerRecords<K, V> records, Exception exception) {
this.delegates.forEach(del -> del.failure(records, exception));
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 the original author or authors.
* Copyright 2019-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,7 +25,7 @@
import org.springframework.util.Assert;

/**
* A {@link RecordInterceptor} that delegates to one or more {@link RecordInterceptor} in
* A {@link RecordInterceptor} that delegates to one or more {@link RecordInterceptor}s in
* order.
*
* @param <K> the key type.
Expand All @@ -40,6 +40,10 @@ public class CompositeRecordInterceptor<K, V> implements RecordInterceptor<K, V>

private final Collection<RecordInterceptor<K, V>> delegates = new ArrayList<>();

/**
* Construct an instance with the provided delegates.
* @param delegates the delegates.
*/
@SafeVarargs
@SuppressWarnings("varargs")
public CompositeRecordInterceptor(RecordInterceptor<K, V>... delegates) {
Expand All @@ -57,4 +61,14 @@ public ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record) {
return recordToIntercept;
}

@Override
public void success(ConsumerRecord<K, V> record) {
this.delegates.forEach(del -> del.success(record));
}

@Override
public void failure(ConsumerRecord<K, V> record, Exception exception) {
this.delegates.forEach(del -> del.failure(record, exception));
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 the original author or authors.
* Copyright 2015-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -191,6 +191,7 @@ protected void doStart() {
container.setGenericErrorHandler(getGenericErrorHandler());
container.setAfterRollbackProcessor(getAfterRollbackProcessor());
container.setRecordInterceptor(getRecordInterceptor());
container.setBatchInterceptor(getBatchInterceptor());
container.setInterceptBeforeTx(isInterceptBeforeTx());
container.setEmergencyStop(() -> {
stop(() -> {
Expand Down

0 comments on commit f2f54ca

Please sign in to comment.