From fd41e5088399d2062d9794b29dc0d9532d72bdca Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Mon, 17 Aug 2020 12:02:06 -0400 Subject: [PATCH] GH-1565: ErrorHandlingDeserializer Extensions Resolves https://github.com/spring-projects/spring-kafka/issues/1565 Previously extended `ErrorHandlingDeserializer` only worked if configured as a class rather than a class name. Spring Boot automatically converts the class names to classes but config outside of such an environment would not work. **cherry-pick to 2.5.x, 2.4.x, 2.3.x** --- .../KafkaMessageListenerContainer.java | 36 ++++++++++++++----- .../ErrorHandlingDeserializerTests.java | 6 +++- 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index f171a23ea1..e21cef75e3 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -680,8 +680,8 @@ else if (listener instanceof MessageListener) { this.logger.info(this.toString()); } Map props = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties(); - this.checkNullKeyForExceptions = checkDeserializer(findDeserializerClass(props, false)); - this.checkNullValueForExceptions = checkDeserializer(findDeserializerClass(props, true)); + this.checkNullKeyForExceptions = checkDeserializer(findDeserializerClass(props, consumerProperties, false)); + this.checkNullValueForExceptions = checkDeserializer(findDeserializerClass(props, consumerProperties, true)); this.syncCommitTimeout = determineSyncCommitTimeout(); if (this.containerProperties.getSyncCommitTimeout() == null) { // update the property so we can use it directly from code elsewhere @@ -846,14 +846,21 @@ else if (timeout instanceof String) { } } - private Object findDeserializerClass(Map props, boolean isValue) { + @Nullable + private Object findDeserializerClass(Map props, Properties consumerOverrides, boolean isValue) { Object configuredDeserializer = isValue ? KafkaMessageListenerContainer.this.consumerFactory.getValueDeserializer() : KafkaMessageListenerContainer.this.consumerFactory.getKeyDeserializer(); if (configuredDeserializer == null) { - return props.get(isValue + Object deser = consumerOverrides.get(isValue ? ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG : ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); + if (deser == null) { + deser = props.get(isValue + ? ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG + : ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); + } + return deser; } else { return configuredDeserializer.getClass(); @@ -885,10 +892,23 @@ private void subscribeOrAssignTopics(final Consumer subscr } } - private boolean checkDeserializer(Object deser) { - return deser instanceof Class - ? ErrorHandlingDeserializer2.class.isAssignableFrom((Class) deser) - : deser instanceof String && deser.equals(ErrorHandlingDeserializer2.class.getName()); + private boolean checkDeserializer(@Nullable Object deser) { + Class deserializer = null; + if (deser instanceof Class) { + deserializer = (Class) deser; + } + else if (deser instanceof String) { + try { + deserializer = ClassUtils.forName((String) deser, getApplicationContext().getClassLoader()); + } + catch (ClassNotFoundException | LinkageError e) { + throw new IllegalStateException(e); + } + } + else if (deser != null) { + throw new IllegalStateException("Deserializer must be a class or class name, not a " + deser.getClass()); + } + return deserializer == null ? false : ErrorHandlingDeserializer2.class.isAssignableFrom(deserializer); } protected void checkConsumer() { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ErrorHandlingDeserializerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ErrorHandlingDeserializerTests.java index ad50e02e76..3d0fdf30bf 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ErrorHandlingDeserializerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ErrorHandlingDeserializerTests.java @@ -171,7 +171,7 @@ else if (r.key() == null && t.getCause() instanceof DeserializationException) { public ConsumerFactory cf() { Map props = KafkaTestUtils.consumerProps(TOPIC + ".g1", "false", embeddedKafka()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ExtendedEHD.class.getName()); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class); props.put(ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASS, FailSometimesDeserializer.class); props.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, FailSometimesDeserializer.class.getName()); @@ -233,4 +233,8 @@ public String deserialize(String topic, Headers headers, byte[] data) { } + public static class ExtendedEHD extends ErrorHandlingDeserializer2 { + + } + }