Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BE: Implement a mechanism to skip SSL verification #422

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 25 additions & 14 deletions api/src/main/java/io/kafbat/ui/config/ClustersProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,31 @@ public class ClustersProperties {
public static class Cluster {
String name;
String bootstrapServers;

TruststoreConfig ssl;

String schemaRegistry;
SchemaRegistryAuth schemaRegistryAuth;
KeystoreConfig schemaRegistrySsl;

String ksqldbServer;
KsqldbServerAuth ksqldbServerAuth;
KeystoreConfig ksqldbServerSsl;

List<ConnectCluster> kafkaConnect;
MetricsConfigData metrics;
Map<String, Object> properties;
boolean readOnly = false;

List<SerdeConfig> serde;
String defaultKeySerde;
String defaultValueSerde;
List<Masking> masking;

MetricsConfigData metrics;
Map<String, Object> properties;
boolean readOnly = false;

Long pollingThrottleRate;
TruststoreConfig ssl;

List<Masking> masking;

AuditProperties audit;
}

Expand Down Expand Up @@ -99,6 +108,16 @@ public static class SchemaRegistryAuth {
public static class TruststoreConfig {
String truststoreLocation;
String truststorePassword;
boolean verifySsl = true;
}

@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString(exclude = {"keystorePassword"})
public static class KeystoreConfig {
String keystoreLocation;
String keystorePassword;
}

@Data
Expand All @@ -118,15 +137,6 @@ public static class KsqldbServerAuth {
String password;
}

@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString(exclude = {"keystorePassword"})
public static class KeystoreConfig {
String keystoreLocation;
String keystorePassword;
}

@Data
public static class Masking {
Type type;
Expand Down Expand Up @@ -182,6 +192,7 @@ private void flattenClusterProperties() {
}
}

@SuppressWarnings("unchecked")
private Map<String, Object> flattenClusterProperties(@Nullable String prefix,
@Nullable Map<String, Object> propertiesMap) {
Map<String, Object> flattened = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.model.KafkaCluster;
import io.kafbat.ui.util.SslPropertiesUtil;
import io.kafbat.ui.util.KafkaClientSslPropertiesUtil;
import java.io.Closeable;
import java.time.Instant;
import java.util.Map;
Expand Down Expand Up @@ -42,7 +42,7 @@ public Mono<ReactiveAdminClient> get(KafkaCluster cluster) {
private Mono<ReactiveAdminClient> createAdminClient(KafkaCluster cluster) {
return Mono.fromSupplier(() -> {
Properties properties = new Properties();
SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
properties.putAll(cluster.getProperties());
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
properties.putIfAbsent(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import io.kafbat.ui.model.SortOrderDTO;
import io.kafbat.ui.service.rbac.AccessControlService;
import io.kafbat.ui.util.ApplicationMetrics;
import io.kafbat.ui.util.SslPropertiesUtil;
import io.kafbat.ui.util.KafkaClientSslPropertiesUtil;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
Expand Down Expand Up @@ -254,7 +254,7 @@ public EnhancedConsumer createConsumer(KafkaCluster cluster) {
public EnhancedConsumer createConsumer(KafkaCluster cluster,
Map<String, Object> properties) {
Properties props = new Properties();
SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), props);
KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), props);
props.putAll(cluster.getProperties());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafbat-ui-consumer-" + System.currentTimeMillis());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
Expand Down
4 changes: 2 additions & 2 deletions api/src/main/java/io/kafbat/ui/service/MessagesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import io.kafbat.ui.model.TopicMessageEventDTO;
import io.kafbat.ui.serdes.ConsumerRecordDeserializer;
import io.kafbat.ui.serdes.ProducerRecordCreator;
import io.kafbat.ui.util.SslPropertiesUtil;
import io.kafbat.ui.util.KafkaClientSslPropertiesUtil;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
Expand Down Expand Up @@ -199,7 +199,7 @@ private Mono<RecordMetadata> sendMessageImpl(KafkaCluster cluster,
public static KafkaProducer<byte[], byte[]> createProducer(KafkaCluster cluster,
Map<String, Object> additionalProps) {
Properties properties = new Properties();
SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
properties.putAll(cluster.getProperties());
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ private Flux<KsqlResponseTable> executeSelect(String ksql, Map<String, String> s
* Some version of ksqldb (?..0.24) can cut off json streaming without respect proper array ending like <p/>
* <code>[{"header":{"queryId":"...","schema":"..."}}, ]</code>
* which will cause json parsing error and will be propagated to UI.
* This is a know issue(https://github.com/confluentinc/ksql/issues/8746), but we don't know when it will be fixed.
* To workaround this we need to check DecodingException err msg.
* This is a known issue(<a href="https://github.com/confluentinc/ksql/issues/8746">...</a>), but we don't know when it will be fixed.
* To work around this we need to check DecodingException err msg.
*/
private boolean isUnexpectedJsonArrayEndCharException(Throwable th) {
return th instanceof DecodingException
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.kafbat.ui.util;

import io.kafbat.ui.config.ClustersProperties;
import java.util.Properties;
import javax.annotation.Nullable;
import org.apache.kafka.common.config.SslConfigs;

public final class KafkaClientSslPropertiesUtil {

private KafkaClientSslPropertiesUtil() {
}

public static void addKafkaSslProperties(@Nullable ClustersProperties.TruststoreConfig truststoreConfig,
Properties sink) {
if (truststoreConfig == null) {
return;
}

if (!truststoreConfig.isVerifySsl()) {
sink.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
}

if (truststoreConfig.getTruststoreLocation() == null) {
return;
}

sink.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreConfig.getTruststoreLocation());

if (truststoreConfig.getTruststorePassword() != null) {
sink.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststoreConfig.getTruststorePassword());
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public static Mono<ApplicationPropertyValidationDTO> validateClusterConnection(S
@Nullable
TruststoreConfig ssl) {
Properties properties = new Properties();
SslPropertiesUtil.addKafkaSslProperties(ssl, properties);
KafkaClientSslPropertiesUtil.addKafkaSslProperties(ssl, properties);
properties.putAll(clusterProps);
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// editing properties to make validation faster
Expand Down
23 changes: 0 additions & 23 deletions api/src/main/java/io/kafbat/ui/util/SslPropertiesUtil.java

This file was deleted.

16 changes: 16 additions & 0 deletions api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.kafbat.ui.exception.ValidationException;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import java.io.FileInputStream;
import java.security.KeyStore;
import java.util.function.Consumer;
Expand Down Expand Up @@ -45,6 +46,10 @@ private static ObjectMapper defaultOM() {

public WebClientConfigurator configureSsl(@Nullable ClustersProperties.TruststoreConfig truststoreConfig,
@Nullable ClustersProperties.KeystoreConfig keystoreConfig) {
if (truststoreConfig != null && !truststoreConfig.isVerifySsl()) {
return configureNoSsl();
}

return configureSsl(
keystoreConfig != null ? keystoreConfig.getKeystoreLocation() : null,
keystoreConfig != null ? keystoreConfig.getKeystorePassword() : null,
Expand Down Expand Up @@ -97,6 +102,17 @@ private WebClientConfigurator configureSsl(
return this;
}

@SneakyThrows
public WebClientConfigurator configureNoSsl() {
var contextBuilder = SslContextBuilder.forClient();
contextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);

SslContext context = contextBuilder.build();

httpClient = httpClient.secure(t -> t.sslContext(context));
return this;
}

public WebClientConfigurator configureBasicAuth(@Nullable String username, @Nullable String password) {
if (username != null && password != null) {
builder.defaultHeaders(httpHeaders -> httpHeaders.setBasicAuth(username, password));
Expand Down
Loading