Skip to content

Commit

Permalink
Add support for optional Kafka SSL
Browse files Browse the repository at this point in the history
Co-authored-by: Kenzyme Le <kl@kenzymele.com>
Co-authored-by: Matthias Strobl <matthias.strobl@bmw.de>
  • Loading branch information
2 people authored and kokosing committed Apr 9, 2021
1 parent 1f3955a commit 92ee334
Show file tree
Hide file tree
Showing 43 changed files with 1,243 additions and 38 deletions.
76 changes: 76 additions & 0 deletions docs/src/main/sphinx/connector/kafka.rst
Expand Up @@ -67,6 +67,15 @@ Property Name Description
``kafka.hide-internal-columns`` Controls whether internal columns are part of the table schema or not
``kafka.messages-per-split`` Number of messages that are processed by each Trino split, defaults to 100000
``kafka.timestamp-upper-bound-force-push-down-enabled`` Controls if upper bound timestamp push down is enabled for topics using ``CreateTime`` mode
``kafka.security-protocol`` Security protocol for connection to Kafka cluster, defaults to ``PLAINTEXT``
``kafka.ssl.keystore.location`` Location of the keystore file
``kafka.ssl.keystore.password`` Password for the keystore file
``kafka.ssl.keystore.type`` File format of the keystore file, defaults to ``JKS``
``kafka.ssl.truststore.location`` Location of the truststore file
``kafka.ssl.truststore.password`` Password for the truststore file
``kafka.ssl.truststore.type`` File format of the truststore file, defaults to ``JKS``
``kafka.ssl.key.password`` Password for the private key in the keystore file
``kafka.ssl.endpoint-identification-algorithm`` Endpoint identification algorithm used by clients to validate server host name, defaults to ``https``
========================================================== ==============================================================================

In addition, you need to configure :ref:`table schema and schema registry usage
Expand Down Expand Up @@ -125,6 +134,73 @@ show up in ``DESCRIBE <table-name>`` or ``SELECT *``.

This property is optional; the default is ``true``.

``kafka.security-protocol``
^^^^^^^^^^^^^^^^^^^^^^^^^^^

Protocol used to communicate with brokers.
Valid values are: PLAINTEXT, SSL.

This property is optional; default is ``PLAINTEXT``.

``kafka.ssl.keystore.location``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Location of the keystore file used for connection to Kafka cluster.

This property is optional.

``kafka.ssl.keystore.password``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Password for the keystore file used for connection to Kafka cluster.

This property is optional, but required when ``kafka.ssl.keystore.location`` is given.

``kafka.ssl.keystore.type``
^^^^^^^^^^^^^^^^^^^^^^^^^^^

File format of the keystore file.
Valid values are: JKS, PKCS12.

This property is optional; default is ``JKS``.

``kafka.ssl.truststore.location``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Location of the truststore file used for connection to Kafka cluster.

This property is optional.

``kafka.ssl.truststore.password``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Password for the truststore file used for connection to Kafka cluster.

This property is optional, but required when ``kafka.ssl.truststore.location`` is given.

``kafka.ssl.truststore.type``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

File format of the truststore file.
Valid values are: JKS, PKCS12.

This property is optional; default is ``JKS``.

``kafka.ssl.key.password``
^^^^^^^^^^^^^^^^^^^^^^^^^^

Password for the private key in the keystore file used for connection to Kafka cluster.

This property is optional. This is required for clients only if two-way authentication is configured i.e. ``ssl.client.auth=required``.

``kafka.ssl.endpoint-identification-algorithm``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The endpoint identification algorithm used by clients to validate server host name for connection to Kafka cluster.
Kafka uses ``https`` as default. Use ``disabled`` to disable server host name validation.

This property is optional; default is ``https``.

Internal columns
----------------

Expand Down
5 changes: 5 additions & 0 deletions plugin/trino-kafka/pom.xml
Expand Up @@ -84,6 +84,11 @@
<artifactId>kafka-schema-registry-client</artifactId>
</dependency>

<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
</dependency>

<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
Expand Down
@@ -0,0 +1,60 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.kafka;

import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Scopes;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.trino.plugin.kafka.security.ForKafkaSsl;
import io.trino.plugin.kafka.security.SecurityProtocol;

import static io.airlift.configuration.ConditionalModule.installModuleIf;

public class KafkaClientsModule
extends AbstractConfigurationAwareModule
{
@Override
protected void setup(Binder binder)
{
installClientModule(SecurityProtocol.PLAINTEXT, KafkaClientsModule::configurePlainText);
installClientModule(SecurityProtocol.SSL, KafkaClientsModule::configureSsl);
}

private void installClientModule(SecurityProtocol securityProtocol, Module module)
{
install(installModuleIf(
KafkaConfig.class,
config -> config.getSecurityProtocol().equals(securityProtocol),
module));
}

private static void configurePlainText(Binder binder)
{
binder.bind(KafkaConsumerFactory.class).to(PlainTextKafkaConsumerFactory.class).in(Scopes.SINGLETON);
binder.bind(KafkaProducerFactory.class).to(PlainTextKafkaProducerFactory.class).in(Scopes.SINGLETON);
binder.bind(KafkaAdminFactory.class).to(PlainTextKafkaAdminFactory.class).in(Scopes.SINGLETON);
}

private static void configureSsl(Binder binder)
{
binder.bind(KafkaConsumerFactory.class).annotatedWith(ForKafkaSsl.class).to(PlainTextKafkaConsumerFactory.class).in(Scopes.SINGLETON);
binder.bind(KafkaProducerFactory.class).annotatedWith(ForKafkaSsl.class).to(PlainTextKafkaProducerFactory.class).in(Scopes.SINGLETON);
binder.bind(KafkaAdminFactory.class).annotatedWith(ForKafkaSsl.class).to(PlainTextKafkaAdminFactory.class).in(Scopes.SINGLETON);

binder.bind(KafkaConsumerFactory.class).to(SslKafkaConsumerFactory.class).in(Scopes.SINGLETON);
binder.bind(KafkaProducerFactory.class).to(SslKafkaProducerFactory.class).in(Scopes.SINGLETON);
binder.bind(KafkaAdminFactory.class).to(SslKafkaAdminFactory.class).in(Scopes.SINGLETON);
}
}
Expand Up @@ -21,6 +21,7 @@
import io.airlift.units.DataSize;
import io.airlift.units.DataSize.Unit;
import io.trino.plugin.kafka.schema.file.FileTableDescriptionSupplier;
import io.trino.plugin.kafka.security.SecurityProtocol;
import io.trino.spi.HostAddress;

import javax.validation.constraints.Min;
Expand All @@ -31,6 +32,7 @@
import java.util.stream.StreamSupport;

import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.plugin.kafka.security.SecurityProtocol.PLAINTEXT;

@DefunctConfig("kafka.connect-timeout")
public class KafkaConfig
Expand All @@ -44,6 +46,7 @@ public class KafkaConfig
private int messagesPerSplit = 100_000;
private boolean timestampUpperBoundPushDownEnabled;
private String tableDescriptionSupplier = FileTableDescriptionSupplier.NAME;
private SecurityProtocol securityProtocol = PLAINTEXT;

@Size(min = 1)
public Set<HostAddress> getNodes()
Expand Down Expand Up @@ -152,4 +155,17 @@ public KafkaConfig setTimestampUpperBoundPushDownEnabled(boolean timestampUpperB
this.timestampUpperBoundPushDownEnabled = timestampUpperBoundPushDownEnabled;
return this;
}

@Config("kafka.security-protocol")
@ConfigDescription("Security protocol used for Kafka connection")
public KafkaConfig setSecurityProtocol(SecurityProtocol securityProtocol)
{
this.securityProtocol = securityProtocol;
return this;
}

public SecurityProtocol getSecurityProtocol()
{
return securityProtocol;
}
}
Expand Up @@ -17,6 +17,7 @@
import com.google.inject.Module;
import io.airlift.bootstrap.Bootstrap;
import io.airlift.json.JsonModule;
import io.trino.plugin.kafka.security.KafkaSecurityModule;
import io.trino.spi.NodeManager;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorContext;
Expand Down Expand Up @@ -59,6 +60,7 @@ public Connector create(String catalogName, Map<String, String> config, Connecto
Bootstrap app = new Bootstrap(
new JsonModule(),
new KafkaConnectorModule(),
new KafkaSecurityModule(),
extension,
binder -> {
binder.bind(ClassLoader.class).toInstance(KafkaConnectorFactory.class.getClassLoader());
Expand Down
Expand Up @@ -14,7 +14,10 @@
package io.trino.plugin.kafka;

import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Module;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.trino.plugin.kafka.security.KafkaSecurityModule;
import io.trino.spi.Plugin;
import io.trino.spi.connector.ConnectorFactory;

Expand All @@ -23,10 +26,14 @@
public class KafkaPlugin
implements Plugin
{
public static final Module DEFAULT_EXTENSION = binder -> {
binder.install(new KafkaConsumerModule());
binder.install(new KafkaProducerModule());
binder.install(new KafkaAdminModule());
public static final Module DEFAULT_EXTENSION = new AbstractConfigurationAwareModule()
{
@Override
protected void setup(Binder binder)
{
install(new KafkaClientsModule());
install(new KafkaSecurityModule());
}
};

private final Module extension;
Expand Down
Expand Up @@ -11,7 +11,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.trino.plugin.kafka;

import io.trino.spi.connector.ConnectorSession;
Expand Down
Expand Up @@ -14,6 +14,7 @@

package io.trino.plugin.kafka;

import io.trino.plugin.kafka.security.SecurityProtocol;
import io.trino.spi.HostAddress;
import io.trino.spi.connector.ConnectorSession;

Expand All @@ -24,19 +25,22 @@

import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;
import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;

public class PlainTextKafkaAdminFactory
implements KafkaAdminFactory
{
private final Set<HostAddress> nodes;
private final SecurityProtocol securityProtocol;

@Inject
public PlainTextKafkaAdminFactory(KafkaConfig kafkaConfig)
{
requireNonNull(kafkaConfig, "kafkaConfig is null");

nodes = kafkaConfig.getNodes();
securityProtocol = kafkaConfig.getSecurityProtocol();
}

@Override
Expand All @@ -46,6 +50,7 @@ public Properties configure(ConnectorSession session)
properties.setProperty(BOOTSTRAP_SERVERS_CONFIG, nodes.stream()
.map(HostAddress::toString)
.collect(joining(",")));
properties.setProperty(SECURITY_PROTOCOL_CONFIG, securityProtocol.name());
return properties;
}
}
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.kafka;

import io.airlift.units.DataSize;
import io.trino.plugin.kafka.security.SecurityProtocol;
import io.trino.spi.HostAddress;
import io.trino.spi.connector.ConnectorSession;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
Expand All @@ -25,6 +26,7 @@

import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;
import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
Expand All @@ -36,6 +38,7 @@ public class PlainTextKafkaConsumerFactory
{
private final Set<HostAddress> nodes;
private final DataSize kafkaBufferSize;
private final SecurityProtocol securityProtocol;

@Inject
public PlainTextKafkaConsumerFactory(KafkaConfig kafkaConfig)
Expand All @@ -44,6 +47,7 @@ public PlainTextKafkaConsumerFactory(KafkaConfig kafkaConfig)

nodes = kafkaConfig.getNodes();
kafkaBufferSize = kafkaConfig.getKafkaBufferSize();
securityProtocol = kafkaConfig.getSecurityProtocol();
}

@Override
Expand All @@ -57,6 +61,7 @@ public Properties configure(ConnectorSession session)
properties.setProperty(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
properties.setProperty(RECEIVE_BUFFER_CONFIG, Long.toString(kafkaBufferSize.toBytes()));
properties.setProperty(ENABLE_AUTO_COMMIT_CONFIG, Boolean.toString(false));
properties.setProperty(SECURITY_PROTOCOL_CONFIG, securityProtocol.name());
return properties;
}
}
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.kafka;

import io.trino.plugin.kafka.security.SecurityProtocol;
import io.trino.spi.HostAddress;
import io.trino.spi.connector.ConnectorSession;
import org.apache.kafka.common.serialization.ByteArraySerializer;
Expand All @@ -24,6 +25,7 @@

import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;
import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
Expand All @@ -34,13 +36,15 @@ public class PlainTextKafkaProducerFactory
implements KafkaProducerFactory
{
private final Set<HostAddress> nodes;
private final SecurityProtocol securityProtocol;

@Inject
public PlainTextKafkaProducerFactory(KafkaConfig kafkaConfig)
{
requireNonNull(kafkaConfig, "kafkaConfig is null");

nodes = kafkaConfig.getNodes();
securityProtocol = kafkaConfig.getSecurityProtocol();
}

@Override
Expand All @@ -54,6 +58,7 @@ public Properties configure(ConnectorSession session)
properties.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
properties.setProperty(ACKS_CONFIG, "all");
properties.setProperty(LINGER_MS_CONFIG, Long.toString(5));
properties.setProperty(SECURITY_PROTOCOL_CONFIG, securityProtocol.name());
return properties;
}
}

0 comments on commit 92ee334

Please sign in to comment.