Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
* Example application that uses the Kafka Clients API to produce messages.
* Example application that uses the Kafka Clients API to produce messages securely over SSL (TLS).
*
* Run with:
* java -javaagent:path/to/superstream-clients-1.0.0.jar -Dlogback.configurationFile=logback.xml -jar kafka-clients-example-1.0.0-jar-with-dependencies.jar
*
Expand All @@ -28,27 +26,63 @@
* Environment variables:
* - KAFKA_BOOTSTRAP_SERVERS: The Kafka bootstrap servers (default: superstream-test-superstream-3591.k.aivencloud.com:18837)
* - SUPERSTREAM_TOPICS_LIST: Comma-separated list of topics to optimize for (default: example-topic)
*
* SSL Requirements:
* To establish a secure SSL/TLS connection with Kafka brokers, you must generate and configure:
*
* - CA certificate (`ca.pem`) — the trusted certificate authority (from the Kafka provider)
* - Client certificate (`client.cert.pem`) — identifies the client application
* - Client private key (`client.pk8.pem`) — the private key paired with the client certificate
*
* Steps to prepare truststore and keystore:
*
* Step 1: Create a Truststore (truststore.jks) containing the CA certificate:
* keytool -importcert \
* -trustcacerts \
* -alias aiven-ca \
* -file /path/to/ca.pem \
* -keystore /path/to/truststore.jks \
* -storepass changeit
*
* Step 2: Create a Keystore (keystore.p12) containing the client certificate and private key:
* openssl pkcs12 -export \
* -in /path/to/client.cert.pem \
* -inkey /path/to/client.pk8.pem \
* -out /path/to/keystore.p12 \
* -name kafka-client \
* -passout pass:changeit
*
* Notes:
* - The Truststore (`truststore.jks`) ensures the client trusts the Kafka broker's SSL certificate.
* - The Keystore (`keystore.p12`) provides client authentication (mutual TLS) toward the broker.
* - Both Truststore and Keystore must be correctly configured for SSL handshake to succeed.
*
* Security Advice:
* - The password must be at least 6 characters long and must match the password configured in your Java Kafka client.
* - You must configure these passwords properly in your Java Kafka client (`ssl.truststore.password`, `ssl.keystore.password`).
* - Use strong passwords instead of "changeit" in production environments.
* - Protect your truststore.jks and keystore.p12 files carefully; leaking them would compromise your SSL security.
*/

public class AivenKafkaExample {
private static final Logger logger = LoggerFactory.getLogger(AivenKafkaExample.class);

// === Configuration Constants ===
private static final String DEFAULT_BOOTSTRAP_SERVERS =
"superstream-test-superstream-3591.k.aivencloud.com:18837";
private static final String TRUSTSTORE_PATH =
"../../examples/kafka-clients-example/src/main/resources/crets/ca.pem";
private static final String KEYSTORE_CERT_PATH =
"../../examples/kafka-clients-example/src/main/resources/crets/client.cert.pem";
private static final String KEYSTORE_KEY_PATH =
"../../examples/kafka-clients-example/src/main/resources/crets/client.pk8.pem";
// Replace with full absolute path to your generated truststore.jks
private static final String TRUSTSTORE_LOCATION = "/absolute/path/to/truststore.jks";
// The password must be at least 6 characters long and must match the password configured in your Java Kafka client.
private static final String TRUSTSTORE_PASSWORD = "changeit";
// Replace with full absolute path to your generated keystore.p12
private static final String KEYSTORE_KEY_PATH = "/absolute/path/to/keystore.p12";
private static final String SECURITY_PROTOCOL = "SSL";
private static final String TRUSTSTORE_TYPE = "PEM";
private static final String KEYSTORE_TYPE = "PEM";
private static final String ENDPOINT_IDENTIFICATION_ALGORITHM = ""; // skip hostname verification
private static final String TRUSTSTORE_TYPE = "JKS";
private static final String KEYSTORE_TYPE = "PKCS12";

private static final String CLIENT_ID = "superstream-example-producer";
private static final String COMPRESSION_TYPE = "gzip";
private static final int BATCH_SIZE = 16384;
private static final Integer BATCH_SIZE = 16384;

private static final String TOPIC_NAME = "example-topic";
private static final String MESSAGE_KEY = "test-key";
Expand All @@ -69,13 +103,12 @@ public static void main(String[] args) {
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

props.put("security.protocol", SECURITY_PROTOCOL);
props.put("ssl.truststore.location", TRUSTSTORE_LOCATION);
props.put("ssl.truststore.password", TRUSTSTORE_PASSWORD);
props.put("ssl.truststore.type", TRUSTSTORE_TYPE);
props.put("ssl.keystore.location",KEYSTORE_KEY_PATH );
props.put("ssl.keystore.password", TRUSTSTORE_PASSWORD);
props.put("ssl.keystore.type", KEYSTORE_TYPE);
props.put("ssl.truststore.certificates", TRUSTSTORE_PATH);
props.put("ssl.keystore.certificate.chain", KEYSTORE_CERT_PATH);
props.put("ssl.keystore.key", KEYSTORE_KEY_PATH);
props.put("ssl.endpoint.identification.algorithm", ENDPOINT_IDENTIFICATION_ALGORITHM);

// Set some basic configuration
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, COMPRESSION_TYPE);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, BATCH_SIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class ConfluentProducerExample {

private static final String CLIENT_ID = "superstream-example-producer";
private static final String COMPRESSION_TYPE = "gzip";
private static final int BATCH_SIZE = 16384;
private static final Integer BATCH_SIZE = 16384;

private static final String TOPIC_NAME = "example-topic";
private static final String MESSAGE_KEY = "test-key";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class KafkaProducerExample {

private static final String CLIENT_ID = "superstream-example-producer";
private static final String COMPRESSION_TYPE = "gzip";
private static final int BATCH_SIZE = 16384;
private static final Integer BATCH_SIZE = 16384;

private static final String TOPIC_NAME = "example-topic";
private static final String MESSAGE_KEY = "test-key";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class MskKafkaExample {

private static final String CLIENT_ID = "superstream-example-producer";
private static final String COMPRESSION_TYPE = "gzip";
private static final int BATCH_SIZE = 16384;
private static final Integer BATCH_SIZE = 16384;

private static final String TOPIC_NAME = "example-topic";
private static final String MESSAGE_KEY = "test-key";
Expand Down