Skip to content

Commit

Permalink
Handle quarkusio#4961 - Kafka Streams sasl, ssl config.
Browse files Browse the repository at this point in the history
  • Loading branch information
alesj authored and geoand committed Jun 22, 2020
1 parent 5b45010 commit 75914f6
Show file tree
Hide file tree
Showing 15 changed files with 558 additions and 75 deletions.
@@ -1,18 +1,17 @@
package io.quarkus.kafka.streams.deployment;

import static io.quarkus.kafka.streams.runtime.KafkaStreamsPropertiesUtil.buildKafkaStreamsProperties;

import java.io.IOException;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes.ByteArraySerde;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.rocksdb.RocksDBException;
import org.rocksdb.Status;
import org.rocksdb.util.Environment;
Expand All @@ -31,17 +30,13 @@
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.deployment.builditem.nativeimage.RuntimeReinitializedClassBuildItem;
import io.quarkus.deployment.pkg.NativeConfig;
import io.quarkus.kafka.streams.runtime.HotReplacementInterceptor;
import io.quarkus.kafka.streams.runtime.KafkaStreamsRecorder;
import io.quarkus.kafka.streams.runtime.KafkaStreamsRuntimeConfig;
import io.quarkus.kafka.streams.runtime.KafkaStreamsTopologyManager;
import io.quarkus.runtime.LaunchMode;
import io.quarkus.smallrye.health.deployment.spi.HealthBuildItem;

class KafkaStreamsProcessor {

private static final String STREAMS_OPTION_PREFIX = "kafka-streams.";

@BuildStep
void build(BuildProducer<FeatureBuildItem> feature,
BuildProducer<ReflectiveClassBuildItem> reflectiveClasses,
Expand Down Expand Up @@ -150,43 +145,6 @@ BeanContainerListenerBuildItem processBuildTimeConfig(KafkaStreamsRecorder recor
return new BeanContainerListenerBuildItem(recorder.configure(kafkaStreamsProperties));
}

private Properties buildKafkaStreamsProperties(LaunchMode launchMode) {
Config config = ConfigProvider.getConfig();
Properties kafkaStreamsProperties = new Properties();
for (String property : config.getPropertyNames()) {
if (isKafkaStreamsProperty(property)) {
includeKafkaStreamsProperty(config, kafkaStreamsProperties, property);
}
}

if (launchMode == LaunchMode.DEVELOPMENT) {
addHotReplacementInterceptor(kafkaStreamsProperties);
}

return kafkaStreamsProperties;
}

private void addHotReplacementInterceptor(Properties kafkaStreamsProperties) {
String interceptorConfig = HotReplacementInterceptor.class.getName();
Object originalInterceptorConfig = kafkaStreamsProperties
.get(StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG));

if (originalInterceptorConfig != null) {
interceptorConfig = interceptorConfig + "," + originalInterceptorConfig;
}

kafkaStreamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG), interceptorConfig);
}

private boolean isKafkaStreamsProperty(String property) {
return property.startsWith(STREAMS_OPTION_PREFIX);
}

private void includeKafkaStreamsProperty(Config config, Properties kafkaStreamsProperties, String property) {
kafkaStreamsProperties.setProperty(property.substring(STREAMS_OPTION_PREFIX.length()),
config.getValue(property, String.class));
}

@BuildStep
@Record(ExecutionTime.RUNTIME_INIT)
void configureAndLoadRocksDb(KafkaStreamsRecorder recorder, KafkaStreamsRuntimeConfig runtimeConfig) {
Expand Down
@@ -0,0 +1,72 @@
package io.quarkus.kafka.streams.runtime;

import java.util.Optional;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.streams.StreamsConfig;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;

import io.quarkus.runtime.LaunchMode;

public class KafkaStreamsPropertiesUtil {

private static final String STREAMS_OPTION_PREFIX = "kafka-streams.";
private static final String QUARKUS_STREAMS_OPTION_PREFIX = "quarkus." + STREAMS_OPTION_PREFIX;

private static boolean isKafkaStreamsProperty(String prefix, String property) {
return property.startsWith(prefix);
}

private static void includeKafkaStreamsProperty(Config config, Properties kafkaStreamsProperties, String prefix,
String property) {
Optional<String> value = config.getOptionalValue(property, String.class);
if (value.isPresent()) {
kafkaStreamsProperties.setProperty(property.substring(prefix.length()), value.get());
}
}

private static void addHotReplacementInterceptor(Properties kafkaStreamsProperties) {
String interceptorConfig = HotReplacementInterceptor.class.getName();
Object originalInterceptorConfig = kafkaStreamsProperties
.get(StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG));

if (originalInterceptorConfig != null) {
interceptorConfig = interceptorConfig + "," + originalInterceptorConfig;
}

kafkaStreamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG), interceptorConfig);
}

private static Properties kafkaStreamsProperties(String prefix) {
Properties kafkaStreamsProperties = new Properties();
Config config = ConfigProvider.getConfig();
for (String property : config.getPropertyNames()) {
if (isKafkaStreamsProperty(prefix, property)) {
includeKafkaStreamsProperty(config, kafkaStreamsProperties, prefix, property);
}
}

return kafkaStreamsProperties;
}

public static Properties appKafkaStreamsProperties() {
return kafkaStreamsProperties(STREAMS_OPTION_PREFIX);
}

public static Properties quarkusKafkaStreamsProperties() {
return kafkaStreamsProperties(QUARKUS_STREAMS_OPTION_PREFIX);
}

public static Properties buildKafkaStreamsProperties(LaunchMode launchMode) {
Properties kafkaStreamsProperties = appKafkaStreamsProperties();

if (launchMode == LaunchMode.DEVELOPMENT) {
addHotReplacementInterceptor(kafkaStreamsProperties);
}

return kafkaStreamsProperties;
}

}
Expand Up @@ -5,6 +5,7 @@
import org.rocksdb.RocksDB;

import io.quarkus.arc.Arc;
import io.quarkus.arc.runtime.BeanContainer;
import io.quarkus.arc.runtime.BeanContainerListener;
import io.quarkus.runtime.annotations.Recorder;

Expand All @@ -20,9 +21,13 @@ public void configureRuntimeProperties(KafkaStreamsRuntimeConfig runtimeConfig)
}

public BeanContainerListener configure(Properties properties) {
return container -> {
KafkaStreamsTopologyManager instance = container.instance(KafkaStreamsTopologyManager.class);
instance.configure(properties);
return new BeanContainerListener() {

@Override
public void created(BeanContainer container) {
KafkaStreamsTopologyManager instance = container.instance(KafkaStreamsTopologyManager.class);
instance.configure(properties);
}
};
}
}
Expand Up @@ -38,10 +38,46 @@ public class KafkaStreamsRuntimeConfig {
@ConfigItem
public List<String> topics;

/**
* The schema registry key.
*
* e.g. to diff between different registry impls / instances
* as they have this registry url under different property key.
*
* Red Hat / Apicurio - apicurio.registry.url
* Confluent - schema.registry.url
*/
@ConfigItem(defaultValue = "schema.registry.url")
public String schemaRegistryKey;

/**
* The schema registry url.
*/
@ConfigItem
public Optional<String> schemaRegistryUrl;

/**
* The SASL JAAS config.
*/
public SaslConfig sasl;

/**
* Kafka SSL config
*/
public SslConfig ssl;

@Override
public String toString() {
return "KafkaStreamsRuntimeConfig [applicationId=" + applicationId + ", bootstrapServers=" + bootstrapServers
+ ", applicationServer=" + applicationServer + ", topics=" + topics + "]";
return "KafkaStreamsRuntimeConfig{" +
"applicationId='" + applicationId + '\'' +
", bootstrapServers=" + bootstrapServers +
", applicationServer=" + applicationServer +
", topics=" + topics +
", schemaRegistryKey='" + schemaRegistryKey + '\'' +
", schemaRegistryUrl=" + schemaRegistryUrl +
", sasl=" + sasl +
", ssl=" + ssl +
'}';
}

public List<String> getTrimmedTopics() {
Expand Down

0 comments on commit 75914f6

Please sign in to comment.