Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perform rollback processing on ProducerFencedException when it is caused by the same thread #1408

Open
bartosz-stasikowski-projectdrgn opened this issue Mar 6, 2020 · 1 comment

Comments

@bartosz-stasikowski-projectdrgn

Affects Version(s): 2.3.6.RELASE

馃巵 Enhancement

Scenario (using transactions and non batch listener) - (point 1-4 in attached log in the bottom):

  1. Producer (clientId=producer-12) is created as there is partition assignment and transactional operation needs to be performed (ListenerConsumerRebalanceListener )
  2. ListenerConsumerRebalanceListener fails on: Timeout expired after 30000milliseconds while awaiting InitProducerId
  3. New producer is created (clientId=producer-13) and it also initialize transaction which calls InitProducerId
  4. InitProducerId finished for both producers but it looks like there was a race condition as second producer got older epoch then first one: Producer (clientId=producer-12) for epoch 32 and producer (clientId=producer-13) got epoch 31 (see logs)
  5. Currently used producer is producer-13 but it has old epoch so first transactional operation when processing record will be fenced:
  • currently as I am still using 2.3.1.RELEASE - producers are not closed when failing on transactions so it will just fail on every processed records but none of offsets will be commited - until there is restart or rebalance - this is not so bad
  • if I switch to 2.3.6 (which has closing fix GH-1369: Fix Fenced Consumer-based Producers聽#1378) - then (I assume) when processing first record producer will be closed, but since rollback processing is not done for ProducerFencedException, record which was processed wont be processed again - another poll wont return it.

Is it possible to find out that producer is fenced by other producer created in the same thread?
For this case it would be good to do rollback to process the same records again or to send it to DLT since next poll will be able to fetch next record on partition.

Mar 3, 2020 @ 15:25:56.701	[Producer clientId=producer-12, transactionalId=some-processor.some.topic.event.0] ProducerId set to 18085 with epoch 32
Mar 3, 2020 @ 15:25:56.596	[Producer clientId=producer-12, transactionalId=some-processor.some.topic.event.0] Cluster ID: K3pyMpjRRHqd0goAdvtJ2Q
Mar 3, 2020 @ 15:25:56.202	[Producer clientId=producer-13, transactionalId=some-processor.some.topic.event.0] ProducerId set to 18085 with epoch 31
Mar 3, 2020 @ 15:25:56.097	[Producer clientId=producer-13, transactionalId=some-processor.some.topic.event.0] Cluster ID: K3pyMpjRRHqd0goAdvtJ2Q
Mar 3, 2020 @ 15:25:55.996	[Producer clientId=producer-13, transactionalId=some-processor.some.topic.event.0] ProducerId set to -1 with epoch -1
Mar 3, 2020 @ 15:25:55.994	[Producer clientId=producer-13, transactionalId=some-processor.some.topic.event.0] Overriding the default retries config to the recommended value of 2147483647 since the idempotent producer is enabled.
Mar 3, 2020 @ 15:25:55.993	ProducerConfig values: 
Mar 3, 2020 @ 15:25:55.993	[Producer clientId=producer-13, transactionalId=some-processor.some.topic.event.0] Instantiated a transactional producer.
Mar 3, 2020 @ 15:25:55.974	[Consumer clientId=some-processor-0, groupId=some-processor] User provided listener org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener failed on partition assignment (org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout expired after 30000milliseconds while awaiting InitProducerId)
Mar 3, 2020 @ 15:25:25.973	[Producer clientId=producer-12, transactionalId=some-processor.some.topic.event.0] ProducerId set to -1 with epoch -1
Mar 3, 2020 @ 15:25:25.972	[Producer clientId=producer-12, transactionalId=some-processor.some.topic.event.0] Overriding the default retries config to the recommended value of 2147483647 since the idempotent producer is enabled.
Mar 3, 2020 @ 15:25:25.971	ProducerConfig values: 
Mar 3, 2020 @ 15:25:25.971	[Producer clientId=producer-12, transactionalId=some-processor.some.topic.event.0] Instantiated a transactional producer.

@garyrussell
Copy link
Contributor

garyrussell commented Mar 6, 2020

@bartosz-stasikowski-projectdrgn Interesting; as a work around, 2.3.6 also added the AssignmentCommitOption container property (#1371).

If you set it to NEVER or LATEST_ONLY_NO_TX you can avoid creating a transactional producer during the rebalance.

	/**
	 * Set the assignment commit option. Default {@link AssignmentCommitOption#ALWAYS}.
	 * In a future release it will default to {@link AssignmentCommitOption#LATEST_ONLY}.
	 * @param assignmentCommitOption the option.
	 * @since 2.3.6
	 */
	public void setAssignmentCommitOption(AssignmentCommitOption assignmentCommitOption) {
		Assert.notNull(assignmentCommitOption, "'assignmentCommitOption' cannot be null");
		this.assignmentCommitOption = assignmentCommitOption;
	}

	public enum AssignmentCommitOption {

		/**
		 * Always commit the current offset during partition assignment.
		 */
		ALWAYS,

		/**
		 * Never commit the current offset during partition assignment.
		 */
		NEVER,

		/**
		 * Commit the current offset during partition assignment when auto.offset.reset is
		 * 'latest'; transactional if so configured.
		 */
		LATEST_ONLY,

		/**
		 * Commit the current offset during partition assignment when auto.offset.reset is
		 * 'latest'; use consumer commit even when transactions are being used.
		 */
		LATEST_ONLY_NO_TX

	}

I am not sure if we will be able to detect if we fenced ourself.

@garyrussell garyrussell added this to the 2.5.M1 milestone Mar 6, 2020
@garyrussell garyrussell removed this from the 2.5.M1 milestone Mar 6, 2020
@garyrussell garyrussell added this to the Waiting For Triage milestone Aug 4, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants