Skip to content

Commit

Permalink
GH-1565: ErrorHandlingDeserializer Extensions
Browse files Browse the repository at this point in the history
Resolves #1565

Previously extended `ErrorHandlingDeserializer` only worked if configured
as a class rather than a class name.

Spring Boot automatically converts the class names to classes but config
outside of such an environment would not work.

**cherry-pick to 2.5.x, 2.4.x, 2.3.x**
  • Loading branch information
garyrussell committed Aug 17, 2020
1 parent 7303413 commit fd41e50
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 9 deletions.
Expand Up @@ -680,8 +680,8 @@ else if (listener instanceof MessageListener) {
this.logger.info(this.toString());
}
Map<String, Object> props = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties();
this.checkNullKeyForExceptions = checkDeserializer(findDeserializerClass(props, false));
this.checkNullValueForExceptions = checkDeserializer(findDeserializerClass(props, true));
this.checkNullKeyForExceptions = checkDeserializer(findDeserializerClass(props, consumerProperties, false));
this.checkNullValueForExceptions = checkDeserializer(findDeserializerClass(props, consumerProperties, true));
this.syncCommitTimeout = determineSyncCommitTimeout();
if (this.containerProperties.getSyncCommitTimeout() == null) {
// update the property so we can use it directly from code elsewhere
Expand Down Expand Up @@ -846,14 +846,21 @@ else if (timeout instanceof String) {
}
}

private Object findDeserializerClass(Map<String, Object> props, boolean isValue) {
@Nullable
private Object findDeserializerClass(Map<String, Object> props, Properties consumerOverrides, boolean isValue) {
Object configuredDeserializer = isValue
? KafkaMessageListenerContainer.this.consumerFactory.getValueDeserializer()
: KafkaMessageListenerContainer.this.consumerFactory.getKeyDeserializer();
if (configuredDeserializer == null) {
return props.get(isValue
Object deser = consumerOverrides.get(isValue
? ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
: ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
if (deser == null) {
deser = props.get(isValue
? ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
: ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
}
return deser;
}
else {
return configuredDeserializer.getClass();
Expand Down Expand Up @@ -885,10 +892,23 @@ private void subscribeOrAssignTopics(final Consumer<? super K, ? super V> subscr
}
}

private boolean checkDeserializer(Object deser) {
return deser instanceof Class
? ErrorHandlingDeserializer2.class.isAssignableFrom((Class<?>) deser)
: deser instanceof String && deser.equals(ErrorHandlingDeserializer2.class.getName());
private boolean checkDeserializer(@Nullable Object deser) {
Class<?> deserializer = null;
if (deser instanceof Class) {
deserializer = (Class<?>) deser;
}
else if (deser instanceof String) {
try {
deserializer = ClassUtils.forName((String) deser, getApplicationContext().getClassLoader());
}
catch (ClassNotFoundException | LinkageError e) {
throw new IllegalStateException(e);
}
}
else if (deser != null) {
throw new IllegalStateException("Deserializer must be a class or class name, not a " + deser.getClass());
}
return deserializer == null ? false : ErrorHandlingDeserializer2.class.isAssignableFrom(deserializer);
}

protected void checkConsumer() {
Expand Down
Expand Up @@ -171,7 +171,7 @@ else if (r.key() == null && t.getCause() instanceof DeserializationException) {
public ConsumerFactory<String, String> cf() {
Map<String, Object> props = KafkaTestUtils.consumerProps(TOPIC + ".g1", "false", embeddedKafka());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ExtendedEHD.class.getName());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASS, FailSometimesDeserializer.class);
props.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, FailSometimesDeserializer.class.getName());
Expand Down Expand Up @@ -233,4 +233,8 @@ public String deserialize(String topic, Headers headers, byte[] data) {

}

public static class ExtendedEHD<T> extends ErrorHandlingDeserializer2<T> {

}

}

0 comments on commit fd41e50

Please sign in to comment.