Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SSL connection parameter for Schema registry #237

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,9 @@ docker run -d --rm -p 9000:9000 \
|`KAFKA_PROPERTIES_FILE`|Internal location where the Kafka properties file will be written to (if `KAFKA_PROPERTIES` is set). Defaults to `kafka.properties`.
|`KAFKA_TRUSTSTORE_FILE`|Internal location where the truststore file will be written to (if `KAFKA_TRUSTSTORE` is set). Defaults to `kafka.truststore.jks`.
|`KAFKA_KEYSTORE_FILE` |Internal location where the keystore file will be written to (if `KAFKA_KEYSTORE` is set). Defaults to `kafka.keystore.jks`.
|`SCHEMAREGISTRY_PROPERTIES`|Additional properties to configure the schema registry connection (base-64 encoded). Provides keystore/trustore location and password.
|`SCHEMAREGISTRY_TRUSTSTORE`|Certificate for schema registry authentication (base-64 encoded). Required for TLS/SSL.
|`SCHEMAREGISTRY_KEYSTORE` |Private key for mutual TLS authentication (base-64 encoded).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could also useful to add a simple documentation on how to configure these properties and when is necessary?

### Using Helm
Like in the Docker example, supply the files in base-64 form:
Expand Down
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my understending Kafdrop uses spring-kafka. So I think that to update Kafka clients we have to update spring-kafka.
Is required for this PR to update kafka-clients? If not I suggest to remove this update.

<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.freemarker</groupId>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you have added this dependency? If not strictly required for the PR I suggest to remove it (and in case create another PR ...).

<artifactId>freemarker</artifactId>
<version>2.3.28</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
Expand Down
24 changes: 24 additions & 0 deletions src/main/docker/kafdrop.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ else
rm $KAFKA_PROPERTIES_FILE |& > /dev/null | true
fi

SCHEMAREGISTRY_PROPERTIES_FILE=${SCHEMAREGISTRY_PROPERTIES_FILE:-schemaregistry.properties}
if [ "$SCHEMAREGISTRY_PROPERTIES" != "" ]; then
echo Writing Schema Registry properties into $SCHEMAREGISTRY_PROPERTIES_FILE
echo "$SCHEMAREGISTRY_PROPERTIES" | base64 --decode --ignore-garbage > $SCHEMAREGISTRY_PROPERTIES_FILE
else
rm $SCHEMAREGISTRY_PROPERTIES_FILE |& > /dev/null | true
fi

KAFKA_TRUSTSTORE_FILE=${KAFKA_TRUSTSTORE_FILE:-kafka.truststore.jks}
if [ "$KAFKA_TRUSTSTORE" != "" ]; then
echo Writing Kafka truststore into $KAFKA_TRUSTSTORE_FILE
Expand All @@ -51,6 +59,14 @@ else
rm $KAFKA_TRUSTSTORE_FILE |& > /dev/null | true
fi

SCHEMAREGISTRY_TRUSTSTORE_FILE=${SCHEMAREGISTRY_TRUSTSTORE_FILE:-schemaregistry.truststore.jks}
if [ "$SCHEMAREGISTRY_TRUSTSTORE" != "" ]; then
echo Writing Schema Registry truststore into $SCHEMAREGISTRY_TRUSTSTORE_FILE
echo "$SCHEMAREGISTRY_TRUSTSTORE" | base64 --decode --ignore-garbage > $SCHEMAREGISTRY_TRUSTSTORE_FILE
else
rm $SCHEMAREGISTRY_TRUSTSTORE_FILE |& > /dev/null | true
fi

KAFKA_KEYSTORE_FILE=${KAFKA_KEYSTORE_FILE:-kafka.keystore.jks}
if [ "$KAFKA_KEYSTORE" != "" ]; then
echo Writing Kafka keystore into $KAFKA_KEYSTORE_FILE
Expand All @@ -59,6 +75,14 @@ else
rm $KAFKA_KEYSTORE_FILE |& > /dev/null | true
fi

SCHEMAREGISTRY_KEYSTORE_FILE=${SCHEMAREGISTRY_KEYSTORE_FILE:-schemaregistry.keystore.jks}
if [ "$SCHEMAREGISTRY_KEYSTORE" != "" ]; then
echo Writing Schema Registry keystore into $SCHEMAREGISTRY_KEYSTORE_FILE
echo "$SCHEMAREGISTRY_KEYSTORE" | base64 --decode --ignore-garbage > $SCHEMAREGISTRY_KEYSTORE_FILE
else
rm $SCHEMAREGISTRY_KEYSTORE_FILE |& > /dev/null | true
fi

ARGS="--add-opens=java.base/sun.nio.ch=ALL-UNNAMED -Xss256K \
$JMX_ARGS \
$HEAP_ARGS \
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package kafdrop.config;

public final class KafkaConfigurationException extends RuntimeException {
KafkaConfigurationException(Throwable cause) {
public KafkaConfigurationException(Throwable cause) {
super(cause);
}
}
12 changes: 12 additions & 0 deletions src/main/java/kafdrop/config/SchemaRegistryConfiguration.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package kafdrop.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.*;
import org.springframework.context.annotation.*;
import org.springframework.stereotype.*;
Expand All @@ -14,10 +16,12 @@ public class SchemaRegistryConfiguration {
@Component
@ConfigurationProperties(prefix = "schemaregistry")
public static final class SchemaRegistryProperties {
private static final Logger LOG = LoggerFactory.getLogger(SchemaRegistryConfiguration.class);
static final Pattern CONNECT_SEPARATOR = Pattern.compile("\\s*,\\s*");

private String connect;
private String auth;
private String propertiesFile;

public String getConnect() {
return connect;
Expand All @@ -37,5 +41,13 @@ public List<String> getConnectList() {
.filter(s -> s.length() > 0)
.collect(Collectors.toList());
}

public String getPropertiesFile(){
LOG.debug("Returning property path: {}", propertiesFile);
return propertiesFile;
}

public void setPropertiesFile(String propertiesFile) {this.propertiesFile = propertiesFile;}

}
}
7 changes: 6 additions & 1 deletion src/main/java/kafdrop/controller/MessageController.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package kafdrop.controller;

import java.io.File;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -253,12 +255,15 @@ List<Object> getPartitionOrMessages(
private MessageDeserializer getDeserializer(String topicName, MessageFormat format, String descFile, String msgTypeName) {
final MessageDeserializer deserializer;


if (format == MessageFormat.AVRO) {
final var schemaRegistryUrl = schemaRegistryProperties.getConnect();
final var schemaRegistryAuth = schemaRegistryProperties.getAuth();
final var schemaPropertiesFile = schemaRegistryProperties.getPropertiesFile();

deserializer = new AvroMessageDeserializer(topicName, schemaRegistryUrl, schemaRegistryAuth);
deserializer = new AvroMessageDeserializer(topicName, schemaRegistryUrl, schemaRegistryAuth, schemaPropertiesFile);
} else if (format == MessageFormat.PROTOBUF && null != descFile) {

// filter the input file name

final var descFileName = descFile.replace(".desc", "")
Expand Down
62 changes: 57 additions & 5 deletions src/main/java/kafdrop/util/AvroMessageDeserializer.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
package kafdrop.util;

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;

import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.serializers.*;
import kafdrop.config.KafkaConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.nio.*;
import java.util.*;

Expand All @@ -10,9 +21,15 @@ public final class AvroMessageDeserializer implements MessageDeserializer {
private final String topicName;
private final KafkaAvroDeserializer deserializer;

public AvroMessageDeserializer(String topicName, String schemaRegistryUrl, String schemaRegistryAuth) {
private static final Logger LOG = LoggerFactory.getLogger(AvroMessageDeserializer.class);

public AvroMessageDeserializer(
String topicName,
String schemaRegistryUrl,
String schemaRegistryAuth,
String schemaProperty) {
this.topicName = topicName;
this.deserializer = getDeserializer(schemaRegistryUrl, schemaRegistryAuth);
this.deserializer = getDeserializer(schemaRegistryUrl, schemaRegistryAuth, schemaProperty);
}

@Override
Expand All @@ -22,15 +39,50 @@ public String deserializeMessage(ByteBuffer buffer) {
return deserializer.deserialize(topicName, bytes).toString();
}

private static KafkaAvroDeserializer getDeserializer(String schemaRegistryUrl, String schemaRegistryAuth) {
private static KafkaAvroDeserializer getDeserializer(
String schemaRegistryUrl,
String schemaRegistryAuth,
String schemaPropertyFile) {
final var config = new HashMap<String, Object>();

final var sslConfig = new HashMap<String, Object>();
config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);

//Add TlS properties if connection is secured
if(schemaRegistryUrl.toLowerCase().contains("https")){
final var propertiesFile = new File(schemaPropertyFile);
if (propertiesFile.isFile()) {
final var propertyOverrides = new Properties();
try (var propsReader = new BufferedReader(new FileReader(propertiesFile))) {
propertyOverrides.load(propsReader);
} catch (IOException e) {
throw new KafkaConfigurationException(e);
}
for (final String name : propertyOverrides.stringPropertyNames()){
LOG.debug("ssl Config Tag: {} - Value: {}",name, propertyOverrides.getProperty(name));
sslConfig.put(name, propertyOverrides.getProperty(name));
}
}
}


if (schemaRegistryAuth != null) {
config.put(AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
config.put(AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG, schemaRegistryAuth);
}
final var kafkaAvroDeserializer = new KafkaAvroDeserializer();
kafkaAvroDeserializer.configure(config, false);


RestService restService = new RestService(schemaRegistryUrl);

SchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(
restService,
AbstractKafkaSchemaSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT,
sslConfig,
null
);

final var kafkaAvroDeserializer = new KafkaAvroDeserializer(schemaRegistry, config);

return kafkaAvroDeserializer;
}
}
5 changes: 4 additions & 1 deletion src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,7 @@ kafka:
securityProtocol: "SASL_PLAINTEXT"
truststoreFile: "${KAFKA_TRUSTSTORE_FILE:kafka.truststore.jks}"
propertiesFile : "${KAFKA_PROPERTIES_FILE:kafka.properties}"
keystoreFile: "${KAFKA_KEYSTORE_FILE:kafka.keystore.jks}"
keystoreFile: "${KAFKA_KEYSTORE_FILE:kafka.keystore.jks}"

schemaregistry:
propertiesFile: "${SCHEMAREGISTRY_PROPERTIES_FILE:schemaregistry.properties}"