Skip to content

Improvement proposal - RecordInterceptor for transactional consumer #1321

@ltorzynski

Description

@ltorzynski

Hi,
Currently RecordInterceptor ins invoked inside doInvokeOnMessage method in KafkaMessagelistenerContainer:

			ConsumerRecord<K, V> record = recordArg;
			if (this.recordInterceptor != null) {
				record = this.recordInterceptor.intercept(record);
			}
			if (record == null) {
				this.logger.debug(() -> "RecordInterceptor returned null, skipping: " + recordArg);
			}

This is ok for transactionless use cases.

But, we need to intercept ConsumerRecord just before any transaction begins - so that our transactionManager (its HibernateTransactionManager that synchronizes with KafkaTransactionManager:

new ChainedKafkaTransactionManager<>(kafkaTransactionManager, hibernateTransactionManager)

could begin transaction having tenant information - that information we pass in kafka message header.

It could be possible with some kind of interceptor invoked inside "invokeRecordListenerInTx" method - just before "tenantTemplate.execute" invocation":

private void invokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
			Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
			while (iterator.hasNext()) {
				final ConsumerRecord<K, V> record = iterator.next();
				this.logger.trace(() -> "Processing " + record);
				try {
					TransactionSupport
							.setTransactionIdSuffix(zombieFenceTxIdSuffix(record.topic(), record.partition()));
					this.transactionTemplate.execute(new TransactionCallbackWithoutResult() {

What do you think about it ?

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions