diff --git a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java index 8318226..2c36474 100644 --- a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java +++ b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java @@ -8,14 +8,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; import java.util.Properties; import java.util.concurrent.ExecutionException; /** - * Example application that uses the Kafka Clients API to produce messages. + * Example application that uses the Kafka Clients API to produce messages securely over SSL (TLS). + * * Run with: * java -javaagent:path/to/superstream-clients-1.0.0.jar -Dlogback.configurationFile=logback.xml -jar kafka-clients-example-1.0.0-jar-with-dependencies.jar * @@ -28,27 +26,63 @@ * Environment variables: * - KAFKA_BOOTSTRAP_SERVERS: The Kafka bootstrap servers (default: superstream-test-superstream-3591.k.aivencloud.com:18837) * - SUPERSTREAM_TOPICS_LIST: Comma-separated list of topics to optimize for (default: example-topic) + * + * SSL Requirements: + * To establish a secure SSL/TLS connection with Kafka brokers, you must generate and configure: + * + * - CA certificate (`ca.pem`) — the trusted certificate authority (from the Kafka provider) + * - Client certificate (`client.cert.pem`) — identifies the client application + * - Client private key (`client.pk8.pem`) — the private key paired with the client certificate + * + * Steps to prepare truststore and keystore: + * + * Step 1: Create a Truststore (truststore.jks) containing the CA certificate: + * keytool -importcert \ + * -trustcacerts \ + * -alias aiven-ca \ + * -file /path/to/ca.pem \ + * -keystore /path/to/truststore.jks \ + * -storepass changeit + * + * Step 2: Create a Keystore (keystore.p12) containing the client certificate and private key: + * openssl pkcs12 -export \ + * -in /path/to/client.cert.pem \ + * -inkey /path/to/client.pk8.pem \ + * -out /path/to/keystore.p12 \ + * -name kafka-client \ + * -passout pass:changeit + * + * Notes: + * - The Truststore (`truststore.jks`) ensures the client trusts the Kafka broker's SSL certificate. + * - The Keystore (`keystore.p12`) provides client authentication (mutual TLS) toward the broker. + * - Both Truststore and Keystore must be correctly configured for SSL handshake to succeed. + * + * Security Advice: + * - The password must be at least 6 characters long and must match the password configured in your Java Kafka client. + * - You must configure these passwords properly in your Java Kafka client (`ssl.truststore.password`, `ssl.keystore.password`). + * - Use strong passwords instead of "changeit" in production environments. + * - Protect your truststore.jks and keystore.p12 files carefully; leaking them would compromise your SSL security. */ + public class AivenKafkaExample { private static final Logger logger = LoggerFactory.getLogger(AivenKafkaExample.class); // === Configuration Constants === private static final String DEFAULT_BOOTSTRAP_SERVERS = "superstream-test-superstream-3591.k.aivencloud.com:18837"; - private static final String TRUSTSTORE_PATH = - "../../examples/kafka-clients-example/src/main/resources/crets/ca.pem"; - private static final String KEYSTORE_CERT_PATH = - "../../examples/kafka-clients-example/src/main/resources/crets/client.cert.pem"; - private static final String KEYSTORE_KEY_PATH = - "../../examples/kafka-clients-example/src/main/resources/crets/client.pk8.pem"; + // Replace with full absolute path to your generated truststore.jks + private static final String TRUSTSTORE_LOCATION = "/absolute/path/to/truststore.jks"; + // The password must be at least 6 characters long and must match the password configured in your Java Kafka client. + private static final String TRUSTSTORE_PASSWORD = "changeit"; + // Replace with full absolute path to your generated keystore.p12 + private static final String KEYSTORE_KEY_PATH = "/absolute/path/to/keystore.p12"; private static final String SECURITY_PROTOCOL = "SSL"; - private static final String TRUSTSTORE_TYPE = "PEM"; - private static final String KEYSTORE_TYPE = "PEM"; - private static final String ENDPOINT_IDENTIFICATION_ALGORITHM = ""; // skip hostname verification + private static final String TRUSTSTORE_TYPE = "JKS"; + private static final String KEYSTORE_TYPE = "PKCS12"; private static final String CLIENT_ID = "superstream-example-producer"; private static final String COMPRESSION_TYPE = "gzip"; - private static final int BATCH_SIZE = 16384; + private static final Integer BATCH_SIZE = 16384; private static final String TOPIC_NAME = "example-topic"; private static final String MESSAGE_KEY = "test-key"; @@ -69,13 +103,12 @@ public static void main(String[] args) { props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put("security.protocol", SECURITY_PROTOCOL); + props.put("ssl.truststore.location", TRUSTSTORE_LOCATION); + props.put("ssl.truststore.password", TRUSTSTORE_PASSWORD); props.put("ssl.truststore.type", TRUSTSTORE_TYPE); + props.put("ssl.keystore.location",KEYSTORE_KEY_PATH ); + props.put("ssl.keystore.password", TRUSTSTORE_PASSWORD); props.put("ssl.keystore.type", KEYSTORE_TYPE); - props.put("ssl.truststore.certificates", TRUSTSTORE_PATH); - props.put("ssl.keystore.certificate.chain", KEYSTORE_CERT_PATH); - props.put("ssl.keystore.key", KEYSTORE_KEY_PATH); - props.put("ssl.endpoint.identification.algorithm", ENDPOINT_IDENTIFICATION_ALGORITHM); - // Set some basic configuration props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, COMPRESSION_TYPE); props.put(ProducerConfig.BATCH_SIZE_CONFIG, BATCH_SIZE); diff --git a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/ConfluentProducerExample.java b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/ConfluentProducerExample.java index 56bc30f..6da9423 100644 --- a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/ConfluentProducerExample.java +++ b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/ConfluentProducerExample.java @@ -39,7 +39,7 @@ public class ConfluentProducerExample { private static final String CLIENT_ID = "superstream-example-producer"; private static final String COMPRESSION_TYPE = "gzip"; - private static final int BATCH_SIZE = 16384; + private static final Integer BATCH_SIZE = 16384; private static final String TOPIC_NAME = "example-topic"; private static final String MESSAGE_KEY = "test-key"; diff --git a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaProducerExample.java b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaProducerExample.java index 6ab7ed2..cb84139 100644 --- a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaProducerExample.java +++ b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaProducerExample.java @@ -34,7 +34,7 @@ public class KafkaProducerExample { private static final String CLIENT_ID = "superstream-example-producer"; private static final String COMPRESSION_TYPE = "gzip"; - private static final int BATCH_SIZE = 16384; + private static final Integer BATCH_SIZE = 16384; private static final String TOPIC_NAME = "example-topic"; private static final String MESSAGE_KEY = "test-key"; diff --git a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/MskKafkaExample.java b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/MskKafkaExample.java index 1a0bc6f..564485b 100644 --- a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/MskKafkaExample.java +++ b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/MskKafkaExample.java @@ -45,7 +45,7 @@ public class MskKafkaExample { private static final String CLIENT_ID = "superstream-example-producer"; private static final String COMPRESSION_TYPE = "gzip"; - private static final int BATCH_SIZE = 16384; + private static final Integer BATCH_SIZE = 16384; private static final String TOPIC_NAME = "example-topic"; private static final String MESSAGE_KEY = "test-key";