diff --git a/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java b/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java index 4d7504fd25591..5823671ef3abb 100644 --- a/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java +++ b/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java @@ -1,9 +1,10 @@ package io.quarkus.kafka.streams.deployment; +import static io.quarkus.kafka.streams.runtime.KafkaStreamsPropertiesUtil.buildKafkaStreamsProperties; + import java.io.IOException; import java.util.Properties; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes.ByteArraySerde; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; @@ -11,8 +12,6 @@ import org.apache.kafka.streams.processor.DefaultPartitionGrouper; import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor; -import org.eclipse.microprofile.config.Config; -import org.eclipse.microprofile.config.ConfigProvider; import org.rocksdb.RocksDBException; import org.rocksdb.Status; import org.rocksdb.util.Environment; @@ -31,17 +30,13 @@ import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; import io.quarkus.deployment.builditem.nativeimage.RuntimeReinitializedClassBuildItem; import io.quarkus.deployment.pkg.NativeConfig; -import io.quarkus.kafka.streams.runtime.HotReplacementInterceptor; import io.quarkus.kafka.streams.runtime.KafkaStreamsRecorder; import io.quarkus.kafka.streams.runtime.KafkaStreamsRuntimeConfig; import io.quarkus.kafka.streams.runtime.KafkaStreamsTopologyManager; -import io.quarkus.runtime.LaunchMode; import io.quarkus.smallrye.health.deployment.spi.HealthBuildItem; class KafkaStreamsProcessor { - private static final String STREAMS_OPTION_PREFIX = "kafka-streams."; - @BuildStep void build(BuildProducer feature, BuildProducer reflectiveClasses, @@ -150,43 +145,6 @@ BeanContainerListenerBuildItem processBuildTimeConfig(KafkaStreamsRecorder recor return new BeanContainerListenerBuildItem(recorder.configure(kafkaStreamsProperties)); } - private Properties buildKafkaStreamsProperties(LaunchMode launchMode) { - Config config = ConfigProvider.getConfig(); - Properties kafkaStreamsProperties = new Properties(); - for (String property : config.getPropertyNames()) { - if (isKafkaStreamsProperty(property)) { - includeKafkaStreamsProperty(config, kafkaStreamsProperties, property); - } - } - - if (launchMode == LaunchMode.DEVELOPMENT) { - addHotReplacementInterceptor(kafkaStreamsProperties); - } - - return kafkaStreamsProperties; - } - - private void addHotReplacementInterceptor(Properties kafkaStreamsProperties) { - String interceptorConfig = HotReplacementInterceptor.class.getName(); - Object originalInterceptorConfig = kafkaStreamsProperties - .get(StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG)); - - if (originalInterceptorConfig != null) { - interceptorConfig = interceptorConfig + "," + originalInterceptorConfig; - } - - kafkaStreamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG), interceptorConfig); - } - - private boolean isKafkaStreamsProperty(String property) { - return property.startsWith(STREAMS_OPTION_PREFIX); - } - - private void includeKafkaStreamsProperty(Config config, Properties kafkaStreamsProperties, String property) { - kafkaStreamsProperties.setProperty(property.substring(STREAMS_OPTION_PREFIX.length()), - config.getValue(property, String.class)); - } - @BuildStep @Record(ExecutionTime.RUNTIME_INIT) void configureAndLoadRocksDb(KafkaStreamsRecorder recorder, KafkaStreamsRuntimeConfig runtimeConfig) { diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsPropertiesUtil.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsPropertiesUtil.java new file mode 100644 index 0000000000000..fe70ba2eab0da --- /dev/null +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsPropertiesUtil.java @@ -0,0 +1,72 @@ +package io.quarkus.kafka.streams.runtime; + +import java.util.Optional; +import java.util.Properties; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.streams.StreamsConfig; +import org.eclipse.microprofile.config.Config; +import org.eclipse.microprofile.config.ConfigProvider; + +import io.quarkus.runtime.LaunchMode; + +public class KafkaStreamsPropertiesUtil { + + private static final String STREAMS_OPTION_PREFIX = "kafka-streams."; + private static final String QUARKUS_STREAMS_OPTION_PREFIX = "quarkus." + STREAMS_OPTION_PREFIX; + + private static boolean isKafkaStreamsProperty(String prefix, String property) { + return property.startsWith(prefix); + } + + private static void includeKafkaStreamsProperty(Config config, Properties kafkaStreamsProperties, String prefix, + String property) { + Optional value = config.getOptionalValue(property, String.class); + if (value.isPresent()) { + kafkaStreamsProperties.setProperty(property.substring(prefix.length()), value.get()); + } + } + + private static void addHotReplacementInterceptor(Properties kafkaStreamsProperties) { + String interceptorConfig = HotReplacementInterceptor.class.getName(); + Object originalInterceptorConfig = kafkaStreamsProperties + .get(StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG)); + + if (originalInterceptorConfig != null) { + interceptorConfig = interceptorConfig + "," + originalInterceptorConfig; + } + + kafkaStreamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG), interceptorConfig); + } + + private static Properties kafkaStreamsProperties(String prefix) { + Properties kafkaStreamsProperties = new Properties(); + Config config = ConfigProvider.getConfig(); + for (String property : config.getPropertyNames()) { + if (isKafkaStreamsProperty(prefix, property)) { + includeKafkaStreamsProperty(config, kafkaStreamsProperties, prefix, property); + } + } + + return kafkaStreamsProperties; + } + + public static Properties appKafkaStreamsProperties() { + return kafkaStreamsProperties(STREAMS_OPTION_PREFIX); + } + + public static Properties quarkusKafkaStreamsProperties() { + return kafkaStreamsProperties(QUARKUS_STREAMS_OPTION_PREFIX); + } + + public static Properties buildKafkaStreamsProperties(LaunchMode launchMode) { + Properties kafkaStreamsProperties = appKafkaStreamsProperties(); + + if (launchMode == LaunchMode.DEVELOPMENT) { + addHotReplacementInterceptor(kafkaStreamsProperties); + } + + return kafkaStreamsProperties; + } + +} diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRecorder.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRecorder.java index 23fb68c10b9ed..8e381f38a9906 100644 --- a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRecorder.java +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRecorder.java @@ -5,6 +5,7 @@ import org.rocksdb.RocksDB; import io.quarkus.arc.Arc; +import io.quarkus.arc.runtime.BeanContainer; import io.quarkus.arc.runtime.BeanContainerListener; import io.quarkus.runtime.annotations.Recorder; @@ -20,9 +21,13 @@ public void configureRuntimeProperties(KafkaStreamsRuntimeConfig runtimeConfig) } public BeanContainerListener configure(Properties properties) { - return container -> { - KafkaStreamsTopologyManager instance = container.instance(KafkaStreamsTopologyManager.class); - instance.configure(properties); + return new BeanContainerListener() { + + @Override + public void created(BeanContainer container) { + KafkaStreamsTopologyManager instance = container.instance(KafkaStreamsTopologyManager.class); + instance.configure(properties); + } }; } } diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRuntimeConfig.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRuntimeConfig.java index 2f423f3037dcd..178c06f66ba1e 100644 --- a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRuntimeConfig.java +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRuntimeConfig.java @@ -38,10 +38,46 @@ public class KafkaStreamsRuntimeConfig { @ConfigItem public List topics; + /** + * The schema registry key. + * + * e.g. to diff between different registry impls / instances + * as they have this registry url under different property key. + * + * Red Hat / Apicurio - apicurio.registry.url + * Confluent - schema.registry.url + */ + @ConfigItem(defaultValue = "schema.registry.url") + public String schemaRegistryKey; + + /** + * The schema registry url. + */ + @ConfigItem + public Optional schemaRegistryUrl; + + /** + * The SASL JAAS config. + */ + public SaslConfig sasl; + + /** + * Kafka SSL config + */ + public SslConfig ssl; + @Override public String toString() { - return "KafkaStreamsRuntimeConfig [applicationId=" + applicationId + ", bootstrapServers=" + bootstrapServers - + ", applicationServer=" + applicationServer + ", topics=" + topics + "]"; + return "KafkaStreamsRuntimeConfig{" + + "applicationId='" + applicationId + '\'' + + ", bootstrapServers=" + bootstrapServers + + ", applicationServer=" + applicationServer + + ", topics=" + topics + + ", schemaRegistryKey='" + schemaRegistryKey + '\'' + + ", schemaRegistryUrl=" + schemaRegistryUrl + + ", sasl=" + sasl + + ", ssl=" + ssl + + '}'; } public List getTrimmedTopics() { diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTopologyManager.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTopologyManager.java index c083ea657ecec..3c1e31e67253b 100644 --- a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTopologyManager.java +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTopologyManager.java @@ -1,12 +1,13 @@ package io.quarkus.kafka.streams.runtime; import java.net.InetSocketAddress; +import java.time.Duration; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; +import java.util.Objects; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; @@ -14,6 +15,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Function; import java.util.stream.Collectors; import javax.enterprise.context.ApplicationScoped; @@ -26,6 +28,8 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.StateListener; @@ -54,7 +58,7 @@ public class KafkaStreamsTopologyManager { private volatile KafkaStreamsRuntimeConfig runtimeConfig; private volatile Instance topology; private volatile Properties properties; - private volatile Map adminClientConfig; + private volatile Properties adminClientConfig; private volatile Instance kafkaClientSupplier; private volatile Instance stateListener; @@ -91,17 +95,90 @@ private static Properties getStreamsProperties(Properties properties, String boo // build-time options streamsProperties.putAll(properties); + // dynamic add -- back-compatibility + streamsProperties.putAll(KafkaStreamsPropertiesUtil.quarkusKafkaStreamsProperties()); + streamsProperties.putAll(KafkaStreamsPropertiesUtil.appKafkaStreamsProperties()); + // add runtime options streamsProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig); streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, runtimeConfig.applicationId); + // app id if (runtimeConfig.applicationServer.isPresent()) { streamsProperties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, runtimeConfig.applicationServer.get()); } + // schema registry + if (runtimeConfig.schemaRegistryUrl.isPresent()) { + streamsProperties.put(runtimeConfig.schemaRegistryKey, runtimeConfig.schemaRegistryUrl.get()); + } + + // sasl + SaslConfig sc = runtimeConfig.sasl; + if (sc != null) { + setProperty(sc.jaasConfig, streamsProperties, SaslConfigs.SASL_JAAS_CONFIG); + + setProperty(sc.clientCallbackHandlerClass, streamsProperties, SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS); + + setProperty(sc.loginCallbackHandlerClass, streamsProperties, SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS); + setProperty(sc.loginClass, streamsProperties, SaslConfigs.SASL_LOGIN_CLASS); + + setProperty(sc.kerberosServiceName, streamsProperties, SaslConfigs.SASL_KERBEROS_SERVICE_NAME); + setProperty(sc.kerberosKinitCmd, streamsProperties, SaslConfigs.SASL_KERBEROS_KINIT_CMD); + setProperty(sc.kerberosTicketRenewWindowFactor, streamsProperties, + SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR); + setProperty(sc.kerberosTicketRenewJitter, streamsProperties, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER); + setProperty(sc.kerberosMinTimeBeforeRelogin, streamsProperties, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN); + + setProperty(sc.loginRefreshWindowFactor, streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR); + setProperty(sc.loginRefreshWindowJitter, streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER); + + setProperty(sc.loginRefreshMinPeriod, streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS, + DurationToSecondsFunction.INSTANCE); + setProperty(sc.loginRefreshBuffer, streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS, + DurationToSecondsFunction.INSTANCE); + } + + // ssl + SslConfig ssl = runtimeConfig.ssl; + if (ssl != null) { + setProperty(ssl.protocol, streamsProperties, SslConfigs.SSL_PROTOCOL_CONFIG); + setProperty(ssl.provider, streamsProperties, SslConfigs.SSL_PROVIDER_CONFIG); + setProperty(ssl.cipherSuites, streamsProperties, SslConfigs.SSL_CIPHER_SUITES_CONFIG); + setProperty(ssl.enabledProtocols, streamsProperties, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG); + + setStoreConfig(ssl.truststore, streamsProperties, "ssl.truststore"); + setStoreConfig(ssl.keystore, streamsProperties, "ssl.keystore"); + setStoreConfig(ssl.key, streamsProperties, "ssl.key"); + + setProperty(ssl.keymanagerAlgorithm, streamsProperties, SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG); + setProperty(ssl.trustmanagerAlgorithm, streamsProperties, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG); + Optional eia = Optional.of(ssl.endpointIdentificationAlgorithm.orElse("")); + setProperty(eia, streamsProperties, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG); + setProperty(ssl.secureRandomImplementation, streamsProperties, SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG); + } + return streamsProperties; } + private static void setStoreConfig(StoreConfig sc, Properties properties, String key) { + if (sc != null) { + setProperty(sc.type, properties, key + ".type"); + setProperty(sc.location, properties, key + ".location"); + setProperty(sc.password, properties, key + ".password"); + } + } + + private static void setProperty(Optional property, Properties properties, String key) { + setProperty(property, properties, key, Objects::toString); + } + + private static void setProperty(Optional property, Properties properties, String key, Function fn) { + if (property.isPresent()) { + properties.put(key, fn.apply(property.get())); + } + } + private static String asString(List addresses) { return addresses.stream() .map(InetSocketAddress::toString) @@ -130,17 +207,21 @@ void onStart(@Observes StartupEvent ev) { streams.setGlobalStateRestoreListener(globalStateRestoreListener.get()); } - adminClientConfig = getAdminClientConfig(bootstrapServersConfig); + adminClientConfig = getAdminClientConfig(streamsProperties); + + executor.execute(new Runnable() { - executor.execute(() -> { - try { - waitForTopicsToBeCreated(runtimeConfig.getTrimmedTopics()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; + @Override + public void run() { + try { + waitForTopicsToBeCreated(runtimeConfig.getTrimmedTopics()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + LOGGER.debug("Starting Kafka Streams pipeline"); + streams.start(); } - LOGGER.debug("Starting Kafka Streams pipeline"); - streams.start(); }); } @@ -199,7 +280,7 @@ public Set getMissingTopics(Collection topicsToCheck) Set topicNames = topics.names().get(10, TimeUnit.SECONDS); if (topicNames.containsAll(topicsToCheck)) { - return Collections.EMPTY_SET; + return Collections.emptySet(); } else { missing.removeAll(topicNames); } @@ -210,9 +291,8 @@ public Set getMissingTopics(Collection topicsToCheck) return missing; } - private Map getAdminClientConfig(String bootstrapServersConfig) { - Map adminClientConfig = new HashMap<>(); - adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig); + private Properties getAdminClientConfig(Properties properties) { + Properties adminClientConfig = new Properties(properties); // include other AdminClientConfig(s) that have been configured for (final String knownAdminClientConfig : AdminClientConfig.configNames()) { // give preference to admin. first @@ -233,4 +313,14 @@ public void setRuntimeConfig(KafkaStreamsRuntimeConfig runtimeConfig) { public void configure(Properties properties) { this.properties = properties; } + + private static final class DurationToSecondsFunction implements Function { + + private static final DurationToSecondsFunction INSTANCE = new DurationToSecondsFunction(); + + @Override + public String apply(Duration d) { + return String.valueOf(d.getSeconds()); + } + } } diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/SaslConfig.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/SaslConfig.java new file mode 100644 index 0000000000000..66666957dff84 --- /dev/null +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/SaslConfig.java @@ -0,0 +1,91 @@ +package io.quarkus.kafka.streams.runtime; + +import java.time.Duration; +import java.util.Optional; + +import io.quarkus.runtime.annotations.ConfigGroup; +import io.quarkus.runtime.annotations.ConfigItem; + +@ConfigGroup +public class SaslConfig { + + /** + * JAAS login context parameters for SASL connections in the format used by JAAS configuration files + */ + @ConfigItem + public Optional jaasConfig; + + /** + * The fully qualified name of a SASL client callback handler class + */ + @ConfigItem + public Optional clientCallbackHandlerClass; + + /** + * The fully qualified name of a SASL login callback handler class + */ + @ConfigItem + public Optional loginCallbackHandlerClass; + + /** + * The fully qualified name of a class that implements the Login interface + */ + @ConfigItem + public Optional loginClass; + + /** + * The Kerberos principal name that Kafka runs as + */ + @ConfigItem + public Optional kerberosServiceName; + + /** + * Kerberos kinit command path + */ + @ConfigItem + public Optional kerberosKinitCmd; + + /** + * Login thread will sleep until the specified window factor of time from last refresh + */ + @ConfigItem + public Optional kerberosTicketRenewWindowFactor; + + /** + * Percentage of random jitter added to the renewal time + */ + @ConfigItem + public Optional kerberosTicketRenewJitter; + + /** + * Percentage of random jitter added to the renewal time + */ + @ConfigItem + public Optional kerberosMinTimeBeforeRelogin; + + /** + * Login refresh thread will sleep until the specified window factor relative to the + * credential's lifetime has been reached- + */ + @ConfigItem + public Optional loginRefreshWindowFactor; + + /** + * The maximum amount of random jitter relative to the credential's lifetime + */ + @ConfigItem + public Optional loginRefreshWindowJitter; + + /** + * The desired minimum duration for the login refresh thread to wait before refreshing a credential + */ + @ConfigItem + public Optional loginRefreshMinPeriod; + + /** + * The amount of buffer duration before credential expiration to maintain when refreshing a credential + */ + @ConfigItem + public Optional loginRefreshBuffer; + +} diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/SslConfig.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/SslConfig.java new file mode 100644 index 0000000000000..e2703c337fefe --- /dev/null +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/SslConfig.java @@ -0,0 +1,73 @@ +package io.quarkus.kafka.streams.runtime; + +import java.util.Optional; + +import io.quarkus.runtime.annotations.ConfigGroup; +import io.quarkus.runtime.annotations.ConfigItem; + +@ConfigGroup +public class SslConfig { + + /** + * The SSL protocol used to generate the SSLContext + */ + @ConfigItem + public Optional protocol; + + /** + * The name of the security provider used for SSL connections + */ + @ConfigItem + public Optional provider; + + /** + * A list of cipher suites + */ + @ConfigItem + public Optional cipherSuites; + + /** + * The list of protocols enabled for SSL connections + */ + @ConfigItem + public Optional enabledProtocols; + + /** + * Truststore config + */ + public StoreConfig truststore; + + /** + * Keystore config + */ + public StoreConfig keystore; + + /** + * Key config + */ + public StoreConfig key; + + /** + * The algorithm used by key manager factory for SSL connections + */ + @ConfigItem + public Optional keymanagerAlgorithm; + + /** + * The algorithm used by trust manager factory for SSL connections + */ + @ConfigItem + public Optional trustmanagerAlgorithm; + + /** + * The endpoint identification algorithm to validate server hostname using server certificate + */ + @ConfigItem(defaultValue = "https") + public Optional endpointIdentificationAlgorithm; + + /** + * The SecureRandom PRNG implementation to use for SSL cryptography operations + */ + @ConfigItem + public Optional secureRandomImplementation; +} diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/StoreConfig.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/StoreConfig.java new file mode 100644 index 0000000000000..149c69705ec95 --- /dev/null +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/StoreConfig.java @@ -0,0 +1,27 @@ +package io.quarkus.kafka.streams.runtime; + +import java.util.Optional; + +import io.quarkus.runtime.annotations.ConfigGroup; +import io.quarkus.runtime.annotations.ConfigItem; + +@ConfigGroup +public class StoreConfig { + /** + * Store type + */ + @ConfigItem + public Optional type; + + /** + * Store location + */ + @ConfigItem + public Optional location; + + /** + * Store password + */ + @ConfigItem + public Optional password; +} diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/devmode/KafkaStreamsHotReplacementSetup.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/devmode/KafkaStreamsHotReplacementSetup.java index dc2374faea2a2..d5bbcac879bd1 100644 --- a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/devmode/KafkaStreamsHotReplacementSetup.java +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/devmode/KafkaStreamsHotReplacementSetup.java @@ -28,13 +28,16 @@ public void run() { if (nextUpdate < System.currentTimeMillis()) { synchronized (this) { if (nextUpdate < System.currentTimeMillis()) { - executor.execute(() -> { - try { - context.doScan(true); - } catch (RuntimeException e) { - throw e; - } catch (Exception e) { - throw new RuntimeException(e); + executor.execute(new Runnable() { + @Override + public void run() { + try { + context.doScan(true); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } } }); // we update at most once every 2s diff --git a/integration-tests/kafka-streams/src/main/resources/application.properties b/integration-tests/kafka-streams/src/main/resources/application.properties index e6abe663ddcf6..33366525eab69 100644 --- a/integration-tests/kafka-streams/src/main/resources/application.properties +++ b/integration-tests/kafka-streams/src/main/resources/application.properties @@ -5,6 +5,18 @@ quarkus.log.category.\"org.apache.zookeeper\".level=WARN quarkus.kafka-streams.bootstrap-servers=localhost:19092 quarkus.kafka-streams.topics=streams-test-categories,streams-test-customers +quarkus.kafka-streams.schema-registry-key=apicurio.registry.url +quarkus.kafka-streams.schema-registry-url=http://localhost:8080 + +quarkus.kafka-streams.security.protocol=SSL +quarkus.kafka-streams.ssl.truststore.location=./target/classes/ks-truststore.p12 +quarkus.kafka-streams.ssl.truststore.password=Z_pkTh9xgZovK4t34cGB2o6afT4zZg0L +quarkus.kafka-streams.ssl.truststore.type=PKCS12 +quarkus.kafka-streams.ssl.endpoint-identification-algorithm= + +quarkus.kafka-streams.sasl.kerberos-ticket-renew-jitter=0.06 +quarkus.kafka-streams.sasl.login-refresh-buffer=PT20S + # streams options kafka-streams.cache.max.bytes.buffering=10240 kafka-streams.commit.interval.ms=1000 @@ -12,4 +24,5 @@ kafka-streams.metadata.max.age.ms=500 kafka-streams.auto.offset.reset=earliest # Set explicitly as for tests the quarkus.application.name does not default to the name of the project -%test.quarkus.application.name=streams-test-pipeline \ No newline at end of file +%test.quarkus.application.name=streams-test-pipeline +kafka-streams.some-property=dummy diff --git a/integration-tests/kafka-streams/src/main/resources/ks-keystore.p12 b/integration-tests/kafka-streams/src/main/resources/ks-keystore.p12 new file mode 100644 index 0000000000000..abfdbcb2053a3 Binary files /dev/null and b/integration-tests/kafka-streams/src/main/resources/ks-keystore.p12 differ diff --git a/integration-tests/kafka-streams/src/main/resources/ks-truststore.p12 b/integration-tests/kafka-streams/src/main/resources/ks-truststore.p12 new file mode 100644 index 0000000000000..58ff7c596254c Binary files /dev/null and b/integration-tests/kafka-streams/src/main/resources/ks-truststore.p12 differ diff --git a/integration-tests/kafka-streams/src/test/java/io/quarkus/it/kafka/streams/KafkaStreamsPropertiesTest.java b/integration-tests/kafka-streams/src/test/java/io/quarkus/it/kafka/streams/KafkaStreamsPropertiesTest.java new file mode 100644 index 0000000000000..9d12ba5c3532c --- /dev/null +++ b/integration-tests/kafka-streams/src/test/java/io/quarkus/it/kafka/streams/KafkaStreamsPropertiesTest.java @@ -0,0 +1,37 @@ +package io.quarkus.it.kafka.streams; + +import java.lang.reflect.Field; +import java.util.Map; + +import javax.inject.Inject; + +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; + +@QuarkusTestResource(KafkaTestResource.class) +@QuarkusTest +public class KafkaStreamsPropertiesTest { + + @Inject + KafkaStreams streams; + + @Test + public void testProperties() throws Exception { + // reflection hack ... no other way to get raw props ... + Field configField = KafkaStreams.class.getDeclaredField("config"); + configField.setAccessible(true); + StreamsConfig config = (StreamsConfig) configField.get(streams); + + Map originals = config.originals(); + + Assertions.assertEquals("20", originals.get(SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS)); + Assertions.assertEquals("http://localhost:8080", originals.get("apicurio.registry.url")); + Assertions.assertEquals("dummy", originals.get("some-property")); + } +} diff --git a/integration-tests/kafka-streams/src/test/java/io/quarkus/it/kafka/streams/KafkaStreamsTest.java b/integration-tests/kafka-streams/src/test/java/io/quarkus/it/kafka/streams/KafkaStreamsTest.java index e4203d401632e..7c68a740cc959 100644 --- a/integration-tests/kafka-streams/src/test/java/io/quarkus/it/kafka/streams/KafkaStreamsTest.java +++ b/integration-tests/kafka-streams/src/test/java/io/quarkus/it/kafka/streams/KafkaStreamsTest.java @@ -1,5 +1,8 @@ package io.quarkus.it.kafka.streams; +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; @@ -7,6 +10,7 @@ import java.util.Properties; import org.apache.http.HttpStatus; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -16,6 +20,7 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.hamcrest.CoreMatchers; @@ -32,12 +37,27 @@ @QuarkusTest public class KafkaStreamsTest { + private static void addSSL(Properties props) { + try { + File sslDir = KafkaTestResource.sslDir(null, false); + File tsFile = new File(sslDir, "ks-truststore.p12"); + props.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); + props.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, tsFile.getPath()); + props.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "Z_pkTh9xgZovK4t34cGB2o6afT4zZg0L"); + props.setProperty(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PKCS12"); + props.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + private static Producer createCustomerProducer() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092"); props.put(ProducerConfig.CLIENT_ID_CONFIG, "streams-test-producer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ObjectMapperSerializer.class.getName()); + addSSL(props); return new KafkaProducer<>(props); } @@ -48,6 +68,7 @@ private static Producer createCategoryProducer() { props.put(ProducerConfig.CLIENT_ID_CONFIG, "streams-test-category-producer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ObjectMapperSerializer.class.getName()); + addSSL(props); return new KafkaProducer<>(props); } @@ -60,6 +81,7 @@ private static KafkaConsumer createConsumer() { props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, EnrichedCustomerDeserializer.class.getName()); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + addSSL(props); KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("streams-test-customers-processed")); diff --git a/integration-tests/kafka-streams/src/test/java/io/quarkus/it/kafka/streams/KafkaTestResource.java b/integration-tests/kafka-streams/src/test/java/io/quarkus/it/kafka/streams/KafkaTestResource.java index bdafe00a01d04..75c1702a71394 100644 --- a/integration-tests/kafka-streams/src/test/java/io/quarkus/it/kafka/streams/KafkaTestResource.java +++ b/integration-tests/kafka-streams/src/test/java/io/quarkus/it/kafka/streams/KafkaTestResource.java @@ -1,10 +1,17 @@ package io.quarkus.it.kafka.streams; import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; import java.util.Collections; import java.util.Map; import java.util.Properties; +import org.apache.kafka.common.config.SslConfigs; + import io.debezium.kafka.KafkaCluster; import io.debezium.util.Testing; import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; @@ -13,13 +20,62 @@ public class KafkaTestResource implements QuarkusTestResourceLifecycleManager { private KafkaCluster kafka; + public static File sslDir(File directory, boolean removeExistingContent) throws IOException { + if (directory == null) { + directory = Testing.Files.createTestingDirectory("kafka-data", removeExistingContent); + } + + File targetDir = directory.getParentFile().getParentFile(); + File sslDir = new File(targetDir, "ssl_test"); + if (sslDir.exists() == false) { + //noinspection ResultOfMethodCallIgnored + sslDir.mkdir(); + } + return sslDir; + } + @Override public Map start() { try { + File directory = Testing.Files.createTestingDirectory("kafka-data", true); + File sslDir = sslDir(directory, true); + + Path ksPath = new File(sslDir, "ks-keystore.p12").toPath(); + try (InputStream ksStream = getClass().getResourceAsStream("/ks-keystore.p12")) { + Files.copy( + ksStream, + ksPath, + StandardCopyOption.REPLACE_EXISTING); + } + + Path tsPath = new File(sslDir, "ks-truststore.p12").toPath(); + try (InputStream tsStream = getClass().getResourceAsStream("/ks-truststore.p12")) { + Files.copy( + tsStream, + tsPath, + StandardCopyOption.REPLACE_EXISTING); + } + String password = "Z_pkTh9xgZovK4t34cGB2o6afT4zZg0L"; + String type = "PKCS12"; + Properties props = new Properties(); props.setProperty("zookeeper.connection.timeout.ms", "45000"); - File directory = Testing.Files.createTestingDirectory("kafka-data", true); - kafka = new KafkaCluster().withPorts(2182, 19092) + + // http://kafka.apache.org/documentation.html#security_ssl + props.setProperty("listener.security.protocol.map", "CLIENT:SSL"); + props.setProperty("listeners", "CLIENT://:19092"); + props.setProperty("inter.broker.listener.name", "CLIENT"); + props.setProperty(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ksPath.toString()); + props.setProperty(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, password); + props.setProperty(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, type); + props.setProperty(SslConfigs.SSL_KEY_PASSWORD_CONFIG, password); + props.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, tsPath.toString()); + props.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, password); + props.setProperty(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, type); + props.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); + + kafka = new KafkaCluster() + .withPorts(2182, 19092) .addBrokers(1) .usingDirectory(directory) .deleteDataUponShutdown(true)