Skip to content

Commit

Permalink
Add support for SSL, SASL_PLAIN, and SASL_SSL listeners in kafka brok…
Browse files Browse the repository at this point in the history
…ers. (#28)

* WIP for sasl support

* WIP - add customized listeners?

* WIP get SSL listener working in test cases, needs cleanup

* Continued work in progress

* Fix checkstyle violations, get working test cases, additional clean up required

* Initial cleanup

* Update JAAS variable for tests

* Update travis file with path to jaas

* Bump release version to 3.1.0

* Fix changed behavior in test

* Cleanup

* increase heap in travis

* revert

* increase heap

* cleanup

* bump heap

* temp disable tests

* temp disable tests

* Switch client auth to requested

* Fix compilation error

* Disable tests

* Disable hostname verification

* Use localhost

* be  more selective on logs

* Move to generate certificates

* add gitignore

* Update default hostname

* Move to being client auth required in SSL and SASL_SSL tests

* Update test cases

* fix violations

* Add test cases for junit5

* Add additional test cases

* Code cleanup

* General code cleanup

* Fix method scope

* Update README

* Update README

* Update README

* Bump heapsize

* Give the test additional time

* Back down replica factor to avoid unclean shutdown

* Back down replica factor to avoid unclean shutdown

* fix test

* speed up tests by allowing unclean shutdown

* Enable clean shutdown

* Enable clean shutdown

* condense test cases

* add debug log

* debug log

* debug log

* more debug

* more debug

* remove debug + test cases'

* remove travis options
  • Loading branch information
Crim committed Dec 13, 2018
1 parent a91f945 commit 4c527af
Show file tree
Hide file tree
Showing 47 changed files with 2,501 additions and 356 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Expand Up @@ -7,3 +7,6 @@
target/
README.md.bak
*.versionsBackup
script/generated/
kafka-junit-core/src/test/resources/kafka.keystore.jks
kafka-junit-core/src/test/resources/kafka.truststore.jks
9 changes: 6 additions & 3 deletions .travis.yml
Expand Up @@ -12,16 +12,19 @@ env:
-
KAFKA_VERSION=0.11.0.3
EXCLUDE_KAFKA_TESTS=1_0_x

-
KAFKA_VERSION=1.0.2
EXCLUDE_KAFKA_TESTS=NOT-USED
-
KAFKA_VERSION=1.1.1
EXCLUDE_KAFKA_TESTS=NOT-USED
-
KAFKA_VERSION=2.0.0
KAFKA_VERSION=2.0.1
EXCLUDE_KAFKA_TESTS=0_11_0_x
script:
## Generate dummy SSL Certificates used in tests
- script/generateCertificatesForTests.sh
## Run CheckStyle and License Header checks, compile, and install locally
- mvn clean install -DskipTests=true -DskipCheckStyle=false -Dmaven.javadoc.skip=true -B -V -DkafkaVersion=$KAFKA_VERSION -Dtests.excluded=$EXCLUDE_KAFKA_TESTS
- mvn test -B -DkafkaVersion=$KAFKA_VERSION -Dtests.excluded=$EXCLUDE_KAFKA_TESTS -DskipCheckStyle=true
## Run test suite
- mvn test -B -DkafkaVersion=$KAFKA_VERSION -Dtests.excluded=$EXCLUDE_KAFKA_TESTS -DskipCheckStyle=true -Djava.security.auth.login.config=${PWD}/kafka-junit-core/src/test/resources/jaas.conf
9 changes: 6 additions & 3 deletions CHANGELOG.md
Expand Up @@ -2,10 +2,13 @@
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## UNRELEASED
- Officially support Kafka 2.0.0
## 3.1.0 (UNRELEASED)
- Officially support Kafka 2.0.x
- KafkaTestUtils.produceRecords() and its variants now set producer configuration "acks" to "all"
- Update Kafka-JUnit5 junit dependency from 5.1.1 to JUnit 5.2.0
- Update Kafka-JUnit5 junit dependency from 5.1.1 to JUnit 5.3.2
- Add support for registering PLAINTEXT, SSL, SASL_PLAIN, and SASL_SSL listeners on internal kafka brokers.
- Kafka Connection Strings now include the protocol prefix.
- Default configured host changed from '127.0.0.1' to 'localhost'

## 3.0.1 (06/20/2018)
- [Issue-16](https://github.com/salesforce/kafka-junit/issues/16) Bugfix for re-using the same SharedKafkaTestResource instance multiple times, as might happen if you run a test multiple times.
Expand Down
6 changes: 5 additions & 1 deletion README.md
Expand Up @@ -5,7 +5,11 @@
This library wraps Apache Kafka's [KafkaServerStartable](https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/server/KafkaServerStartable.scala#L32) class and allows you to easily create and run tests against
one or more "real" kafka brokers. No longer do you need to setup and coordinate with an external kafka cluster for your tests! The library transparently supports running a single or multi-broker cluster. Running a multi-broker cluster allows you to validate how your software reacts under various error scenarios, such as when one or more brokers become unavailable.

Currently the library supports Kafka versions 2.0.x, 1.1.x, 1.0.x, and 0.11.0.x.
## Features
- Support for JUnit 4 and JUnit 5.
- Support for Kafka versions 2.0.x, 1.1.x, 1.0.x, and 0.11.0.x.
- Support for running either single broker cluster, or multi-broker clusters.
- Support for PLAINTEXT, SASL_PLAINTEXT, SASL_SSL, and SSL listeners.

## Using Kafka-JUnit with JUnit 4.

Expand Down
117 changes: 116 additions & 1 deletion kafka-junit-core/README.md
Expand Up @@ -102,7 +102,7 @@ public final class KafkaTestClusterRunner implements ApplicationRunner {
}

/**
* Creates default admin user if none exists.
* Starts multi-broker kafka cluster.
*/
private void startKafkaService(final int clusterSize) {
kafkaTestCluster = new KafkaTestCluster(clusterSize);
Expand Down Expand Up @@ -144,3 +144,118 @@ public final class KafkaTestClusterRunner implements ApplicationRunner {
}
}
```

### Example starting a cluster with SSL support

```java
/**
* Starts multi-broker kafka cluster with SSL support.
*/
private void startKafkaService(final int clusterSize) {
// Create SSL listener
final BrokerListener listener = new SslListener()
.withClientAuthRequired()
.withKeyStoreLocation("/path/to/your/kafka.keystore.jks")
.withKeyStorePassword("YourKeyStorePassword")
.withTrustStoreLocation("/path/to/your/kafka.truststore.jks")
.withTrustStorePassword("YourTrustStorePassword")
.withKeyPassword("YourKeyPassword");

// Define any other broker properties you may need.
final Properties brokerProperties = new Properties();

// Create cluster
kafkaTestCluster = new KafkaTestCluster(
clusterSize,
brokerProperties,
Collections.singletonList(listener)
);

// Start the cluster.
kafkaTestCluster.start();

// Log details about the cluster
logger.info("Cluster started at: {}", kafkaTestCluster.getKafkaConnectString());
}
```

### Example starting a cluster with SASL_PLAIN support

**NOTE:** Kafka reads in the JAAS file as defined by an Environment variable at JVM start up. This property
can not be set at run time.

In order to make use of this Listener, you **must** start the JVM with the following argument:

`-Djava.security.auth.login.config=/path/to/your/jaas.conf`

```java
/**
* Starts multi-broker kafka cluster with SASL_PLAIN support.
*/
private void startKafkaService(final int clusterSize) {
// Create SSL_PLAIN listener
final BrokerListener listener = new SaslPlainListener()
// Define your username and password
.withUsername("kafkaclient")
.withPassword("client-secret");

// Define any other broker properties you may need.
final Properties brokerProperties = new Properties();

// Create cluster
kafkaTestCluster = new KafkaTestCluster(
clusterSize,
brokerProperties,
Collections.singletonList(listener)
);

// Start the cluster.
kafkaTestCluster.start();

// Log details about the cluster
logger.info("Cluster started at: {}", kafkaTestCluster.getKafkaConnectString());
}
```

### Example starting a cluster with SASL_SSL support

**NOTE:** Kafka reads in the JAAS file as defined by an Environment variable at JVM start up. This property
can not be set at run time.

In order to make use of this Listener, you **must** start the JVM with the following argument:

`-Djava.security.auth.login.config=/path/to/your/jaas.conf`

```java
/**
* Starts multi-broker kafka cluster with SASL_SSL support.
*/
private void startKafkaService(final int clusterSize) {
// Create SASL_SSL listener
final BrokerListener listener = new SaslSslListener()
.withKeyStoreLocation("/path/to/your/kafka.keystore.jks")
.withKeyStorePassword("YourKeyStorePassword")
.withTrustStoreLocation("/path/to/your/kafka.truststore.jks")
.withTrustStorePassword("YourTrustStorePassword")
.withKeyPassword("YourKeyPassword")
// Define your username and password
.withUsername("kafkaclient")
.withPassword("client-secret");

// Define any other broker properties you may need.
final Properties brokerProperties = new Properties();

// Create cluster
kafkaTestCluster = new KafkaTestCluster(
clusterSize,
brokerProperties,
Collections.singletonList(listener)
);

// Start the cluster.
kafkaTestCluster.start();

// Log details about the cluster
logger.info("Cluster started at: {}", kafkaTestCluster.getKafkaConnectString());
}
```
12 changes: 9 additions & 3 deletions kafka-junit-core/pom.xml
Expand Up @@ -5,12 +5,12 @@
<parent>
<artifactId>kafka-junit</artifactId>
<groupId>com.salesforce.kafka.test</groupId>
<version>3.0.1</version>
<version>3.1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>kafka-junit-core</artifactId>
<version>3.0.1</version>
<version>3.1.0</version>

<!-- defined properties -->
<properties>
Expand All @@ -26,6 +26,12 @@
<version>${junit5.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<version>${junit5.version}</version>
<scope>test</scope>
</dependency>

<!-- Mockito for mocks in tests -->
<dependency>
Expand Down Expand Up @@ -99,7 +105,7 @@
</dependency>
</dependencies>
<configuration>
<argLine>-Xmx2048M</argLine>
<argLine>-Xmx${tests.heapSize}</argLine>
<skipTests>${skipTests}</skipTests>
<reportFormat>plain</reportFormat>
</configuration>
Expand Down
Expand Up @@ -25,6 +25,9 @@

package com.salesforce.kafka.test;

import com.salesforce.kafka.test.listeners.BrokerListener;
import com.salesforce.kafka.test.listeners.PlainListener;

import java.util.Properties;

/**
Expand All @@ -47,6 +50,11 @@ public abstract class AbstractKafkaTestResource<T extends AbstractKafkaTestResou
*/
private int numberOfBrokers = 1;

/**
* Defines which listener has been set to be configured on the brokers.
*/
private BrokerListener registeredListener = new PlainListener();

/**
* Default constructor.
*/
Expand Down Expand Up @@ -107,6 +115,20 @@ public T withBrokers(final int brokerCount) {
return (T) this;
}

/**
* Register additional listeners on the kafka brokers.
* @param listener listener instance to register.
* @return SharedKafkaTestResource for method chaining.
*/
@SuppressWarnings("unchecked")
public T registerListener(final BrokerListener listener) {
if (listener == null) {
throw new IllegalArgumentException("Listener argument may not be null.");
}
registeredListener = listener;
return (T) this;
}

/**
* KafkaTestUtils is a collection of re-usable/common access patterns for interacting with the Kafka cluster.
* @return Instance of KafkaTestUtils configured to operate on the Kafka cluster.
Expand Down Expand Up @@ -144,6 +166,14 @@ public KafkaBrokers getKafkaBrokers() {
return kafkaCluster.getKafkaBrokers();
}

/**
* Returns all registered listener.
* @return The configured listener.
*/
protected BrokerListener getRegisteredListener() {
return registeredListener;
}

protected KafkaCluster getKafkaCluster() {
return kafkaCluster;
}
Expand Down
Expand Up @@ -25,6 +25,8 @@

package com.salesforce.kafka.test;

import java.util.List;

/**
* Provides a slimmed down view onto KafkaCluster to avoid circular references in code.
*/
Expand All @@ -41,6 +43,12 @@ public interface KafkaProvider {
*/
String getKafkaConnectString();

/**
* Connection details about each of the registered listeners on the kafka broker.
* @return details about each of the registered listeners on the kafka broker.
*/
List<ListenerProperties> getListenerProperties();

/**
* Returns connection string for zookeeper clients.
* @return Connection string to connect to the Zookeeper instance.
Expand Down

0 comments on commit 4c527af

Please sign in to comment.