From 05c4c2d2f18058f47075619cd479160a120cc11e Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Mon, 21 Apr 2025 23:48:03 +0300 Subject: [PATCH 01/12] add msk, aiven and confluent producer examples --- examples/kafka-clients-example/pom.xml | 6 +- .../examples/AivenKafkaExample.java | 129 ++++++++++++++++++ .../examples/ConfluentProducerExample.java | 95 +++++++++++++ .../superstream/examples/MskKafkaExample.java | 95 +++++++++++++ 4 files changed, 324 insertions(+), 1 deletion(-) create mode 100644 examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java create mode 100644 examples/kafka-clients-example/src/main/java/ai/superstream/examples/ConfluentProducerExample.java create mode 100644 examples/kafka-clients-example/src/main/java/ai/superstream/examples/MskKafkaExample.java diff --git a/examples/kafka-clients-example/pom.xml b/examples/kafka-clients-example/pom.xml index 30f82e9..21d14c5 100644 --- a/examples/kafka-clients-example/pom.xml +++ b/examples/kafka-clients-example/pom.xml @@ -1,4 +1,3 @@ - @@ -36,6 +35,11 @@ ch.qos.logback logback-classic + + software.amazon.msk + aws-msk-iam-auth + 1.1.8 + diff --git a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java new file mode 100644 index 0000000..b470fa5 --- /dev/null +++ b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java @@ -0,0 +1,129 @@ +package ai.superstream.examples; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +/** + * Example application that uses the Kafka Clients API to produce messages. + * Run with: + * java -javaagent:path/to/superstream-clients-1.0.0.jar -Dlogback.configurationFile=logback.xml -jar kafka-clients-example-1.0.0-jar-with-dependencies.jar + * + * Prerequisites: + * 1. A Kafka server with the following topics: + * - superstream.metadata_v1 - with a configuration message + * - superstream.clients - for client reports + * - example-topic - for test messages + * + * Environment variables: + * - KAFKA_BOOTSTRAP_SERVERS: The Kafka bootstrap servers (default: localhost:9092) + * - SUPERSTREAM_TOPICS_LIST: Comma-separated list of topics to optimize for (default: example-topic) + */ +public class AivenKafkaExample { + private static final Logger logger = LoggerFactory.getLogger(AivenKafkaExample.class); + + public static void main(String[] args) { + // Get bootstrap servers from environment variable or use default + String bootstrapServers = System.getenv("KAFKA_BOOTSTRAP_SERVERS"); + if (bootstrapServers == null || bootstrapServers.isEmpty()) { + bootstrapServers = "superstream-test-superstream-3591.k.aivencloud.com:18837"; + } + + // Configure the producer + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put("client.id", "superstream-example-producer"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put("security.protocol", "SSL"); + props.put("ssl.truststore.type", "PEM"); + props.put("ssl.keystore.type", "PEM"); + + Path caPath = Paths.get("/Users/shohamroditi/superstream/superstream-clients-parent/examples/kafka-clients-example/src/main/resources/crets/ca.pem"); + Path clientCertPath = Paths.get("/Users/shohamroditi/superstream/superstream-clients-parent/examples/kafka-clients-example/src/main/resources/crets/client.cert.pem"); + Path clientKeyPath = Paths.get("/Users/shohamroditi/superstream/superstream-clients-parent/examples/kafka-clients-example/src/main/resources/crets/client.pk8.pem"); + + + props.put("ssl.truststore.certificates", caPath.toAbsolutePath().toString()); + props.put("ssl.keystore.certificate.chain", clientCertPath.toAbsolutePath().toString()); + props.put("ssl.keystore.key", clientKeyPath.toAbsolutePath().toString()); + + + logger.info("client cert path={}", clientCertPath.toAbsolutePath().toString()); + logger.info("ca path={}", caPath.toAbsolutePath().toString()); + logger.info("client key path={}", clientKeyPath.toAbsolutePath().toString()); + props.put("ssl.endpoint.identification.algorithm", ""); + + + +// props.put("ssl.truststore.certificates", "src/main/resources/crets/ca.pem"); +// props.put("ssl.keystore.certificate.chain", "src/main/resources/crets/client.cert.pem"); +// props.put("ssl.keystore.key", "/Users/shohamroditi/superstream/superstream-clients-parent/examples/kafka-clients-example/src/main/resources/crets/client.key.pem"); +// props.put("ssl.keystore.key", "/Users/shohamroditi/superstream/superstream-clients-parent/examples/kafka-clients-example/src/main/resources/crets/client.pk81.pem"); +// props.put("ssl.keystore.key", "src/main/resources/crets/client.pk81.pem"); + + // Set some basic configuration + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); + logger.info("Creating producer with bootstrap servers: {}", bootstrapServers); + logger.info("Original producer configuration:"); + props.forEach((k, v) -> logger.info(" {} = {}", k, v)); + + try (Producer producer = new KafkaProducer<>(props)) { + // The Superstream Agent should have intercepted the producer creation + // and potentially optimized the configuration + + // Log the actual configuration used by the producer + logger.info("Actual producer configuration (after potential Superstream optimization):"); + + // Get the actual configuration from the producer via reflection + java.lang.reflect.Field configField = producer.getClass().getDeclaredField("producerConfig"); + configField.setAccessible(true); + org.apache.kafka.clients.producer.ProducerConfig actualConfig = + (org.apache.kafka.clients.producer.ProducerConfig) configField.get(producer); + + // Get the values for key configuration parameters + logger.info(" compression.type = {}", actualConfig.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); + logger.info(" batch.size = {}", actualConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG)); + + // Send a test message + String topic = "example-topic"; + String key = "test-key"; + String value = "Hello, Superstream!"; + + + logger.info("Sending message to topic {}: key={}, value={}", topic, key, value); + producer.send(new ProducerRecord<>(topic, key, value)).get(); + logger.info("Message sent successfully!"); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("Interrupted while sending message", e); + } catch (ExecutionException e) { + logger.error("Error sending message", e); + } catch (Exception e) { + logger.error("Unexpected error", e); + } + } + +// private static String writeToTempFile(String prefix, String content) throws Exception { +// logger.info("prefix: {}", prefix); +// File tempFile = File.createTempFile(prefix, ".pem"); +// content = content.replace("\\n", "\n"); +// try (FileWriter writer = new FileWriter(tempFile)) { +// writer.write(content); +// } +// logger.info("Wrote temp PEM file: {}", tempFile.getAbsolutePath()); +// +// tempFile.deleteOnExit(); +// return tempFile.getAbsolutePath(); +// } +} diff --git a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/ConfluentProducerExample.java b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/ConfluentProducerExample.java new file mode 100644 index 0000000..9f6b00b --- /dev/null +++ b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/ConfluentProducerExample.java @@ -0,0 +1,95 @@ +package ai.superstream.examples; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +/** + * Example application that uses the Kafka Clients API to produce messages. + * Run with: + * java -javaagent:path/to/superstream-clients-1.0.0.jar -Dlogback.configurationFile=logback.xml -jar kafka-clients-example-1.0.0-jar-with-dependencies.jar + * + * Prerequisites: + * 1. A Kafka server with the following topics: + * - superstream.metadata_v1 - with a configuration message + * - superstream.clients - for client reports + * - example-topic - for test messages + * + * Environment variables: + * - KAFKA_BOOTSTRAP_SERVERS: The Kafka bootstrap servers (default: localhost:9092) + * - SUPERSTREAM_TOPICS_LIST: Comma-separated list of topics to optimize for (default: example-topic) + */ +public class ConfluentProducerExample { + private static final Logger logger = LoggerFactory.getLogger(ConfluentProducerExample.class); + + public static void main(String[] args) { + // Get bootstrap servers from environment variable or use default + String bootstrapServers = System.getenv("KAFKA_BOOTSTRAP_SERVERS"); + if (bootstrapServers == null || bootstrapServers.isEmpty()) { + bootstrapServers = "pkc-7xoy1.eu-central-1.aws.confluent.cloud:9092"; + } + + // Configure the producer + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put("client.id", "superstream-example-producer"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + + props.put("security.protocol", "SASL_SSL"); + props.put("sasl.mechanism", "PLAIN"); + props.put("sasl.jaas.config", + "org.apache.kafka.common.security.plain.PlainLoginModule required " + + "username='PP4WXPJHT5I63MUF' " + + "password='uPhmnEJ66nMxCVEF6XE4gqsc1Un3pZ3KaE4HYLh4NCDUhACvrpbTKkVYy6bA1Xui';"); + + // Set some basic configuration + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); + logger.info("Creating producer with bootstrap servers: {}", bootstrapServers); + logger.info("Original producer configuration:"); + props.forEach((k, v) -> logger.info(" {} = {}", k, v)); + + try (Producer producer = new KafkaProducer<>(props)) { + // The Superstream Agent should have intercepted the producer creation + // and potentially optimized the configuration + + // Log the actual configuration used by the producer + logger.info("Actual producer configuration (after potential Superstream optimization):"); + + // Get the actual configuration from the producer via reflection + java.lang.reflect.Field configField = producer.getClass().getDeclaredField("producerConfig"); + configField.setAccessible(true); + org.apache.kafka.clients.producer.ProducerConfig actualConfig = + (org.apache.kafka.clients.producer.ProducerConfig) configField.get(producer); + + // Get the values for key configuration parameters + logger.info(" compression.type = {}", actualConfig.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); + logger.info(" batch.size = {}", actualConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG)); + + // Send a test message + String topic = "example-topic"; + String key = "test-key"; + String value = "Hello, Superstream!"; + + logger.info("Sending message to topic {}: key={}, value={}", topic, key, value); + producer.send(new ProducerRecord<>(topic, key, value)).get(); + logger.info("Message sent successfully!"); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("Interrupted while sending message", e); + } catch (ExecutionException e) { + logger.error("Error sending message", e); + } catch (Exception e) { + logger.error("Unexpected error", e); + } + } +} diff --git a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/MskKafkaExample.java b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/MskKafkaExample.java new file mode 100644 index 0000000..f7ebd88 --- /dev/null +++ b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/MskKafkaExample.java @@ -0,0 +1,95 @@ +package ai.superstream.examples; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +/** + * Example application that uses the Kafka Clients API to produce messages. + * Run with: + * java -javaagent:path/to/superstream-clients-1.0.0.jar -Dlogback.configurationFile=logback.xml -jar kafka-clients-example-1.0.0-jar-with-dependencies.jar + * + * Prerequisites: + * 1. A Kafka server with the following topics: + * - superstream.metadata_v1 - with a configuration message + * - superstream.clients - for client reports + * - example-topic - for test messages + * + * Environment variables: + * - KAFKA_BOOTSTRAP_SERVERS: The Kafka bootstrap servers (default: localhost:9092) + * - SUPERSTREAM_TOPICS_LIST: Comma-separated list of topics to optimize for (default: example-topic) + */ +public class MskKafkaExample { + private static final Logger logger = LoggerFactory.getLogger(MskKafkaExample.class); + + public static void main(String[] args) { + // Get bootstrap servers from environment variable or use default + String bootstrapServers = System.getenv("KAFKA_BOOTSTRAP_SERVERS"); + if (bootstrapServers == null || bootstrapServers.isEmpty()) { + bootstrapServers = "b-23-public.superstreamstgmsk.0y88si.c2.kafka.eu-central-1.amazonaws.com:9198,b-24-public.superstreamstgmsk.0y88si.c2.kafka.eu-central-1.amazonaws.com:9198,b-2-public.superstreamstgmsk.0y88si.c2.kafka.eu-central-1.amazonaws.com:9198"; + } + + // Configure the producer + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put("client.id", "superstream-example-producer"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + + System.setProperty("aws.accessKeyId", ""); + System.setProperty("aws.secretKey", ""); + + props.put("security.protocol", "SASL_SSL"); + props.put("sasl.mechanism", "AWS_MSK_IAM"); + props.put("sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;"); + props.put("sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler"); + + // Set some basic configuration + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); + logger.info("Creating producer with bootstrap servers: {}", bootstrapServers); + logger.info("Original producer configuration:"); + props.forEach((k, v) -> logger.info(" {} = {}", k, v)); + + try (Producer producer = new KafkaProducer<>(props)) { + // The Superstream Agent should have intercepted the producer creation + // and potentially optimized the configuration + + // Log the actual configuration used by the producer + logger.info("Actual producer configuration (after potential Superstream optimization):"); + + // Get the actual configuration from the producer via reflection + java.lang.reflect.Field configField = producer.getClass().getDeclaredField("producerConfig"); + configField.setAccessible(true); + org.apache.kafka.clients.producer.ProducerConfig actualConfig = + (org.apache.kafka.clients.producer.ProducerConfig) configField.get(producer); + // Get the values for key configuration parameters + logger.info(" compression.type = {}", actualConfig.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); + logger.info(" batch.size = {}", actualConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG)); + + // Send a test message + String topic = "example-topic"; + String key = "test-key"; + String value = "Hello, Superstream!"; + + logger.info("Sending message to topic {}: key={}, value={}", topic, key, value); + producer.send(new ProducerRecord<>(topic, key, value)).get(); + logger.info("Message sent successfully!"); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("Interrupted while sending message", e); + } catch (ExecutionException e) { + logger.error("Error sending message", e); + } catch (Exception e) { + logger.error("Unexpected error", e); + } + } +} From c5628e80e2fbb355a99a14d26ac7c306c4bd1716 Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Tue, 22 Apr 2025 00:04:41 +0300 Subject: [PATCH 02/12] add msk, aiven and confluent producer examples --- .../java/ai/superstream/examples/AivenKafkaExample.java | 3 ++- .../ai/superstream/examples/ConfluentProducerExample.java | 6 +++--- .../main/java/ai/superstream/examples/MskKafkaExample.java | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java index b470fa5..07a6f01 100644 --- a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java +++ b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java @@ -25,7 +25,7 @@ * - example-topic - for test messages * * Environment variables: - * - KAFKA_BOOTSTRAP_SERVERS: The Kafka bootstrap servers (default: localhost:9092) + * - KAFKA_BOOTSTRAP_SERVERS: The Kafka bootstrap servers (default: superstream-test-superstream-3591.k.aivencloud.com:18837) * - SUPERSTREAM_TOPICS_LIST: Comma-separated list of topics to optimize for (default: example-topic) */ public class AivenKafkaExample { @@ -44,6 +44,7 @@ public static void main(String[] args) { props.put("client.id", "superstream-example-producer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put("security.protocol", "SSL"); props.put("ssl.truststore.type", "PEM"); props.put("ssl.keystore.type", "PEM"); diff --git a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/ConfluentProducerExample.java b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/ConfluentProducerExample.java index 9f6b00b..d2b2fbc 100644 --- a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/ConfluentProducerExample.java +++ b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/ConfluentProducerExample.java @@ -23,7 +23,7 @@ * - example-topic - for test messages * * Environment variables: - * - KAFKA_BOOTSTRAP_SERVERS: The Kafka bootstrap servers (default: localhost:9092) + * - KAFKA_BOOTSTRAP_SERVERS: The Kafka bootstrap servers (default: pkc-7xoy1.eu-central-1.aws.confluent.cloud:9092) * - SUPERSTREAM_TOPICS_LIST: Comma-separated list of topics to optimize for (default: example-topic) */ public class ConfluentProducerExample { @@ -48,8 +48,8 @@ public static void main(String[] args) { props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required " + - "username='PP4WXPJHT5I63MUF' " + - "password='uPhmnEJ66nMxCVEF6XE4gqsc1Un3pZ3KaE4HYLh4NCDUhACvrpbTKkVYy6bA1Xui';"); + "username='' " + + "password='';"); // Set some basic configuration props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); diff --git a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/MskKafkaExample.java b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/MskKafkaExample.java index f7ebd88..3d889f7 100644 --- a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/MskKafkaExample.java +++ b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/MskKafkaExample.java @@ -23,7 +23,7 @@ * - example-topic - for test messages * * Environment variables: - * - KAFKA_BOOTSTRAP_SERVERS: The Kafka bootstrap servers (default: localhost:9092) + * - KAFKA_BOOTSTRAP_SERVERS: The Kafka bootstrap servers (default: b-23-public.superstreamstgmsk.0y88si.c2.kafka.eu-central-1.amazonaws.com:9198,b-24-public.superstreamstgmsk.0y88si.c2.kafka.eu-central-1.amazonaws.com:9198,b-2-public.superstreamstgmsk.0y88si.c2.kafka.eu-central-1.amazonaws.com:9198) * - SUPERSTREAM_TOPICS_LIST: Comma-separated list of topics to optimize for (default: example-topic) */ public class MskKafkaExample { From 06ac2a0ce94dcb1362582d777c3dbed6a5f034f7 Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Tue, 22 Apr 2025 12:07:14 +0300 Subject: [PATCH 03/12] update examples --- .../examples/AivenKafkaExample.java | 82 +++++++--------- .../examples/ConfluentProducerExample.java | 86 +++++++--------- .../superstream/examples/MskKafkaExample.java | 97 ++++++++----------- 3 files changed, 112 insertions(+), 153 deletions(-) diff --git a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java index 07a6f01..aae4906 100644 --- a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java +++ b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java @@ -8,6 +8,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Properties; @@ -31,50 +32,52 @@ public class AivenKafkaExample { private static final Logger logger = LoggerFactory.getLogger(AivenKafkaExample.class); + // === Configuration Constants === + private static final String DEFAULT_BOOTSTRAP_SERVERS = + "superstream-test-superstream-3591.k.aivencloud.com:18837"; + private static final String CLIENT_ID = "superstream-example-producer"; + private static final String COMPRESSION_TYPE = "gzip"; + private static final int BATCH_SIZE = 16384; + private static final String SECURITY_PROTOCOL = "SSL"; + private static final String TRUSTSTORE_TYPE = "PEM"; + private static final String KEYSTORE_TYPE = "PEM"; + private static final String ENDPOINT_IDENTIFICATION_ALGORITHM = ""; // skip hostname verification + private static final String TOPIC = "example-topic"; + private static final String KEY = "test-key"; + private static final String VALUE = "Hello, Superstream!"; + + private static final String TRUSTSTORE_PATH = + "../../examples/kafka-clients-example/src/main/resources/crets/ca.pem"; + private static final String KEYSTORE_CERT_PATH = + "../../examples/kafka-clients-example/src/main/resources/crets/client.cert.pem"; + private static final String KEYSTORE_KEY_PATH = + "../../examples/kafka-clients-example/src/main/resources/crets/client.pk8.pem"; + public static void main(String[] args) { // Get bootstrap servers from environment variable or use default String bootstrapServers = System.getenv("KAFKA_BOOTSTRAP_SERVERS"); if (bootstrapServers == null || bootstrapServers.isEmpty()) { - bootstrapServers = "superstream-test-superstream-3591.k.aivencloud.com:18837"; + bootstrapServers = DEFAULT_BOOTSTRAP_SERVERS; } // Configure the producer Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - props.put("client.id", "superstream-example-producer"); + props.put("client.id", CLIENT_ID); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - props.put("security.protocol", "SSL"); - props.put("ssl.truststore.type", "PEM"); - props.put("ssl.keystore.type", "PEM"); - - Path caPath = Paths.get("/Users/shohamroditi/superstream/superstream-clients-parent/examples/kafka-clients-example/src/main/resources/crets/ca.pem"); - Path clientCertPath = Paths.get("/Users/shohamroditi/superstream/superstream-clients-parent/examples/kafka-clients-example/src/main/resources/crets/client.cert.pem"); - Path clientKeyPath = Paths.get("/Users/shohamroditi/superstream/superstream-clients-parent/examples/kafka-clients-example/src/main/resources/crets/client.pk8.pem"); - - - props.put("ssl.truststore.certificates", caPath.toAbsolutePath().toString()); - props.put("ssl.keystore.certificate.chain", clientCertPath.toAbsolutePath().toString()); - props.put("ssl.keystore.key", clientKeyPath.toAbsolutePath().toString()); - - - logger.info("client cert path={}", clientCertPath.toAbsolutePath().toString()); - logger.info("ca path={}", caPath.toAbsolutePath().toString()); - logger.info("client key path={}", clientKeyPath.toAbsolutePath().toString()); - props.put("ssl.endpoint.identification.algorithm", ""); - - - -// props.put("ssl.truststore.certificates", "src/main/resources/crets/ca.pem"); -// props.put("ssl.keystore.certificate.chain", "src/main/resources/crets/client.cert.pem"); -// props.put("ssl.keystore.key", "/Users/shohamroditi/superstream/superstream-clients-parent/examples/kafka-clients-example/src/main/resources/crets/client.key.pem"); -// props.put("ssl.keystore.key", "/Users/shohamroditi/superstream/superstream-clients-parent/examples/kafka-clients-example/src/main/resources/crets/client.pk81.pem"); -// props.put("ssl.keystore.key", "src/main/resources/crets/client.pk81.pem"); + props.put("security.protocol", SECURITY_PROTOCOL); + props.put("ssl.truststore.type", TRUSTSTORE_TYPE); + props.put("ssl.keystore.type", KEYSTORE_TYPE); + props.put("ssl.truststore.certificates", TRUSTSTORE_PATH); + props.put("ssl.keystore.certificate.chain", KEYSTORE_CERT_PATH); + props.put("ssl.keystore.key", KEYSTORE_KEY_PATH); + props.put("ssl.endpoint.identification.algorithm", ENDPOINT_IDENTIFICATION_ALGORITHM); // Set some basic configuration - props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); - props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, COMPRESSION_TYPE); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, BATCH_SIZE); logger.info("Creating producer with bootstrap servers: {}", bootstrapServers); logger.info("Original producer configuration:"); props.forEach((k, v) -> logger.info(" {} = {}", k, v)); @@ -97,9 +100,9 @@ public static void main(String[] args) { logger.info(" batch.size = {}", actualConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG)); // Send a test message - String topic = "example-topic"; - String key = "test-key"; - String value = "Hello, Superstream!"; + String topic = TOPIC; + String key = KEY; + String value = VALUE; logger.info("Sending message to topic {}: key={}, value={}", topic, key, value); @@ -114,17 +117,4 @@ public static void main(String[] args) { logger.error("Unexpected error", e); } } - -// private static String writeToTempFile(String prefix, String content) throws Exception { -// logger.info("prefix: {}", prefix); -// File tempFile = File.createTempFile(prefix, ".pem"); -// content = content.replace("\\n", "\n"); -// try (FileWriter writer = new FileWriter(tempFile)) { -// writer.write(content); -// } -// logger.info("Wrote temp PEM file: {}", tempFile.getAbsolutePath()); -// -// tempFile.deleteOnExit(); -// return tempFile.getAbsolutePath(); -// } } diff --git a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/ConfluentProducerExample.java b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/ConfluentProducerExample.java index d2b2fbc..4cab90a 100644 --- a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/ConfluentProducerExample.java +++ b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/ConfluentProducerExample.java @@ -1,9 +1,6 @@ package ai.superstream.examples; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -11,74 +8,57 @@ import java.util.Properties; import java.util.concurrent.ExecutionException; -/** - * Example application that uses the Kafka Clients API to produce messages. - * Run with: - * java -javaagent:path/to/superstream-clients-1.0.0.jar -Dlogback.configurationFile=logback.xml -jar kafka-clients-example-1.0.0-jar-with-dependencies.jar - * - * Prerequisites: - * 1. A Kafka server with the following topics: - * - superstream.metadata_v1 - with a configuration message - * - superstream.clients - for client reports - * - example-topic - for test messages - * - * Environment variables: - * - KAFKA_BOOTSTRAP_SERVERS: The Kafka bootstrap servers (default: pkc-7xoy1.eu-central-1.aws.confluent.cloud:9092) - * - SUPERSTREAM_TOPICS_LIST: Comma-separated list of topics to optimize for (default: example-topic) - */ public class ConfluentProducerExample { private static final Logger logger = LoggerFactory.getLogger(ConfluentProducerExample.class); + // === Configuration Constants === + private static final String DEFAULT_BOOTSTRAP_SERVERS = "pkc-7xoy1.eu-central-1.aws.confluent.cloud:9092"; + private static final String CLIENT_ID = "superstream-example-producer"; + private static final String COMPRESSION_TYPE = "gzip"; + private static final int BATCH_SIZE = 16384; + + // Confluent Cloud authentication (replace with your real values) + private static final String CONFLUENT_USERNAME = ""; + private static final String CONFLUENT_PASSWORD = ""; + + private static final String SECURITY_PROTOCOL = "SASL_SSL"; + private static final String SASL_MECHANISM = "PLAIN"; + + private static final String TOPIC = "example-topic"; + private static final String KEY = "test-key"; + private static final String VALUE = "Hello, Superstream!"; + public static void main(String[] args) { - // Get bootstrap servers from environment variable or use default String bootstrapServers = System.getenv("KAFKA_BOOTSTRAP_SERVERS"); if (bootstrapServers == null || bootstrapServers.isEmpty()) { - bootstrapServers = "pkc-7xoy1.eu-central-1.aws.confluent.cloud:9092"; + bootstrapServers = DEFAULT_BOOTSTRAP_SERVERS; } - // Configure the producer Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - props.put("client.id", "superstream-example-producer"); + props.put("client.id", CLIENT_ID); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + // Confluent security configuration + props.put("security.protocol", SECURITY_PROTOCOL); + props.put("sasl.mechanism", SASL_MECHANISM); + props.put("sasl.jaas.config", String.format( + "org.apache.kafka.common.security.plain.PlainLoginModule required username='%s' password='%s';", + CONFLUENT_USERNAME, CONFLUENT_PASSWORD + )); - props.put("security.protocol", "SASL_SSL"); - props.put("sasl.mechanism", "PLAIN"); - props.put("sasl.jaas.config", - "org.apache.kafka.common.security.plain.PlainLoginModule required " + - "username='' " + - "password='';"); + // Optional producer tuning + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, COMPRESSION_TYPE); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, BATCH_SIZE); - // Set some basic configuration - props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); - props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); logger.info("Creating producer with bootstrap servers: {}", bootstrapServers); - logger.info("Original producer configuration:"); props.forEach((k, v) -> logger.info(" {} = {}", k, v)); try (Producer producer = new KafkaProducer<>(props)) { - // The Superstream Agent should have intercepted the producer creation - // and potentially optimized the configuration - - // Log the actual configuration used by the producer - logger.info("Actual producer configuration (after potential Superstream optimization):"); - - // Get the actual configuration from the producer via reflection - java.lang.reflect.Field configField = producer.getClass().getDeclaredField("producerConfig"); - configField.setAccessible(true); - org.apache.kafka.clients.producer.ProducerConfig actualConfig = - (org.apache.kafka.clients.producer.ProducerConfig) configField.get(producer); - - // Get the values for key configuration parameters - logger.info(" compression.type = {}", actualConfig.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); - logger.info(" batch.size = {}", actualConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG)); - - // Send a test message - String topic = "example-topic"; - String key = "test-key"; - String value = "Hello, Superstream!"; + String topic = TOPIC; + String key = KEY; + String value = VALUE; logger.info("Sending message to topic {}: key={}, value={}", topic, key, value); producer.send(new ProducerRecord<>(topic, key, value)).get(); diff --git a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/MskKafkaExample.java b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/MskKafkaExample.java index 3d889f7..463231a 100644 --- a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/MskKafkaExample.java +++ b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/MskKafkaExample.java @@ -1,9 +1,6 @@ package ai.superstream.examples; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -11,74 +8,66 @@ import java.util.Properties; import java.util.concurrent.ExecutionException; -/** - * Example application that uses the Kafka Clients API to produce messages. - * Run with: - * java -javaagent:path/to/superstream-clients-1.0.0.jar -Dlogback.configurationFile=logback.xml -jar kafka-clients-example-1.0.0-jar-with-dependencies.jar - * - * Prerequisites: - * 1. A Kafka server with the following topics: - * - superstream.metadata_v1 - with a configuration message - * - superstream.clients - for client reports - * - example-topic - for test messages - * - * Environment variables: - * - KAFKA_BOOTSTRAP_SERVERS: The Kafka bootstrap servers (default: b-23-public.superstreamstgmsk.0y88si.c2.kafka.eu-central-1.amazonaws.com:9198,b-24-public.superstreamstgmsk.0y88si.c2.kafka.eu-central-1.amazonaws.com:9198,b-2-public.superstreamstgmsk.0y88si.c2.kafka.eu-central-1.amazonaws.com:9198) - * - SUPERSTREAM_TOPICS_LIST: Comma-separated list of topics to optimize for (default: example-topic) - */ public class MskKafkaExample { private static final Logger logger = LoggerFactory.getLogger(MskKafkaExample.class); + // === Configuration Constants === + private static final String DEFAULT_BOOTSTRAP_SERVERS = + "b-23-public.superstreamstgmsk.0y88si.c2.kafka.eu-central-1.amazonaws.com:9198," + + "b-24-public.superstreamstgmsk.0y88si.c2.kafka.eu-central-1.amazonaws.com:9198," + + "b-2-public.superstreamstgmsk.0y88si.c2.kafka.eu-central-1.amazonaws.com:9198"; + + private static final String CLIENT_ID = "superstream-example-producer"; + private static final String COMPRESSION_TYPE = "gzip"; + private static final int BATCH_SIZE = 16384; + + // AWS IAM Credentials + private static final String AWS_ACCESS_KEY_ID = ""; + private static final String AWS_SECRET_ACCESS_KEY = ""; + + private static final String SECURITY_PROTOCOL = "SASL_SSL"; + private static final String SASL_MECHANISM = "AWS_MSK_IAM"; + private static final String SASL_JAAS_CONFIG = "software.amazon.msk.auth.iam.IAMLoginModule required;"; + private static final String SASL_CALLBACK_HANDLER = "software.amazon.msk.auth.iam.IAMClientCallbackHandler"; + + private static final String TOPIC = "example-topic"; + private static final String KEY = "test-key"; + private static final String VALUE = "Hello, Superstream!"; + + public static void main(String[] args) { - // Get bootstrap servers from environment variable or use default String bootstrapServers = System.getenv("KAFKA_BOOTSTRAP_SERVERS"); if (bootstrapServers == null || bootstrapServers.isEmpty()) { - bootstrapServers = "b-23-public.superstreamstgmsk.0y88si.c2.kafka.eu-central-1.amazonaws.com:9198,b-24-public.superstreamstgmsk.0y88si.c2.kafka.eu-central-1.amazonaws.com:9198,b-2-public.superstreamstgmsk.0y88si.c2.kafka.eu-central-1.amazonaws.com:9198"; + bootstrapServers = DEFAULT_BOOTSTRAP_SERVERS; } - // Configure the producer + // Set AWS credentials (not recommended in code — use ~/.aws/credentials or env vars in real apps) + System.setProperty("aws.accessKeyId", AWS_ACCESS_KEY_ID); + System.setProperty("aws.secretKey", AWS_SECRET_ACCESS_KEY); + + // Kafka properties Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - props.put("client.id", "superstream-example-producer"); + props.put("client.id", CLIENT_ID); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + // Security + props.put("security.protocol", SECURITY_PROTOCOL); + props.put("sasl.mechanism", SASL_MECHANISM); + props.put("sasl.jaas.config", SASL_JAAS_CONFIG); + props.put("sasl.client.callback.handler.class", SASL_CALLBACK_HANDLER); - System.setProperty("aws.accessKeyId", ""); - System.setProperty("aws.secretKey", ""); - - props.put("security.protocol", "SASL_SSL"); - props.put("sasl.mechanism", "AWS_MSK_IAM"); - props.put("sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;"); - props.put("sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler"); + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, COMPRESSION_TYPE); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, BATCH_SIZE); - // Set some basic configuration - props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); - props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); logger.info("Creating producer with bootstrap servers: {}", bootstrapServers); - logger.info("Original producer configuration:"); - props.forEach((k, v) -> logger.info(" {} = {}", k, v)); + props.forEach((k, v) -> logger.info(" {} = {}", k, v.toString().contains("secret") ? "****" : v)); try (Producer producer = new KafkaProducer<>(props)) { - // The Superstream Agent should have intercepted the producer creation - // and potentially optimized the configuration - - // Log the actual configuration used by the producer - logger.info("Actual producer configuration (after potential Superstream optimization):"); - - // Get the actual configuration from the producer via reflection - java.lang.reflect.Field configField = producer.getClass().getDeclaredField("producerConfig"); - configField.setAccessible(true); - org.apache.kafka.clients.producer.ProducerConfig actualConfig = - (org.apache.kafka.clients.producer.ProducerConfig) configField.get(producer); - // Get the values for key configuration parameters - logger.info(" compression.type = {}", actualConfig.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); - logger.info(" batch.size = {}", actualConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG)); - - // Send a test message - String topic = "example-topic"; - String key = "test-key"; - String value = "Hello, Superstream!"; + String topic = TOPIC; + String key = KEY; + String value = VALUE; logger.info("Sending message to topic {}: key={}, value={}", topic, key, value); producer.send(new ProducerRecord<>(topic, key, value)).get(); From 07c29ac07ddab85d0f8a008d01769187fd65802f Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Tue, 22 Apr 2025 12:56:25 +0300 Subject: [PATCH 04/12] update examples --- .../examples/AivenKafkaExample.java | 33 ++++++-------- .../examples/ConfluentProducerExample.java | 39 +++++++++++----- .../examples/KafkaProducerExample.java | 45 ++++++++----------- .../superstream/examples/MskKafkaExample.java | 41 ++++++++++------- 4 files changed, 85 insertions(+), 73 deletions(-) diff --git a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java index aae4906..8318226 100644 --- a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java +++ b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java @@ -35,23 +35,24 @@ public class AivenKafkaExample { // === Configuration Constants === private static final String DEFAULT_BOOTSTRAP_SERVERS = "superstream-test-superstream-3591.k.aivencloud.com:18837"; - private static final String CLIENT_ID = "superstream-example-producer"; - private static final String COMPRESSION_TYPE = "gzip"; - private static final int BATCH_SIZE = 16384; - private static final String SECURITY_PROTOCOL = "SSL"; - private static final String TRUSTSTORE_TYPE = "PEM"; - private static final String KEYSTORE_TYPE = "PEM"; - private static final String ENDPOINT_IDENTIFICATION_ALGORITHM = ""; // skip hostname verification - private static final String TOPIC = "example-topic"; - private static final String KEY = "test-key"; - private static final String VALUE = "Hello, Superstream!"; - private static final String TRUSTSTORE_PATH = "../../examples/kafka-clients-example/src/main/resources/crets/ca.pem"; private static final String KEYSTORE_CERT_PATH = "../../examples/kafka-clients-example/src/main/resources/crets/client.cert.pem"; private static final String KEYSTORE_KEY_PATH = "../../examples/kafka-clients-example/src/main/resources/crets/client.pk8.pem"; + private static final String SECURITY_PROTOCOL = "SSL"; + private static final String TRUSTSTORE_TYPE = "PEM"; + private static final String KEYSTORE_TYPE = "PEM"; + private static final String ENDPOINT_IDENTIFICATION_ALGORITHM = ""; // skip hostname verification + + private static final String CLIENT_ID = "superstream-example-producer"; + private static final String COMPRESSION_TYPE = "gzip"; + private static final int BATCH_SIZE = 16384; + + private static final String TOPIC_NAME = "example-topic"; + private static final String MESSAGE_KEY = "test-key"; + private static final String MESSAGE_VALUE = "Hello, Superstream!"; public static void main(String[] args) { // Get bootstrap servers from environment variable or use default @@ -95,18 +96,12 @@ public static void main(String[] args) { org.apache.kafka.clients.producer.ProducerConfig actualConfig = (org.apache.kafka.clients.producer.ProducerConfig) configField.get(producer); - // Get the values for key configuration parameters logger.info(" compression.type = {}", actualConfig.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); logger.info(" batch.size = {}", actualConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG)); // Send a test message - String topic = TOPIC; - String key = KEY; - String value = VALUE; - - - logger.info("Sending message to topic {}: key={}, value={}", topic, key, value); - producer.send(new ProducerRecord<>(topic, key, value)).get(); + logger.info("Sending message to topic {}: key={}, value={}", TOPIC_NAME, MESSAGE_KEY, MESSAGE_VALUE); + producer.send(new ProducerRecord<>(TOPIC_NAME, MESSAGE_KEY, MESSAGE_VALUE)).get(); logger.info("Message sent successfully!"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/ConfluentProducerExample.java b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/ConfluentProducerExample.java index 4cab90a..aeb86e2 100644 --- a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/ConfluentProducerExample.java +++ b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/ConfluentProducerExample.java @@ -13,9 +13,7 @@ public class ConfluentProducerExample { // === Configuration Constants === private static final String DEFAULT_BOOTSTRAP_SERVERS = "pkc-7xoy1.eu-central-1.aws.confluent.cloud:9092"; - private static final String CLIENT_ID = "superstream-example-producer"; - private static final String COMPRESSION_TYPE = "gzip"; - private static final int BATCH_SIZE = 16384; + // Confluent Cloud authentication (replace with your real values) private static final String CONFLUENT_USERNAME = ""; @@ -24,9 +22,13 @@ public class ConfluentProducerExample { private static final String SECURITY_PROTOCOL = "SASL_SSL"; private static final String SASL_MECHANISM = "PLAIN"; - private static final String TOPIC = "example-topic"; - private static final String KEY = "test-key"; - private static final String VALUE = "Hello, Superstream!"; + private static final String CLIENT_ID = "superstream-example-producer"; + private static final String COMPRESSION_TYPE = "gzip"; + private static final int BATCH_SIZE = 16384; + + private static final String TOPIC_NAME = "example-topic"; + private static final String MESSAGE_KEY = "test-key"; + private static final String MESSAGE_VALUE = "Hello, Superstream!"; public static void main(String[] args) { String bootstrapServers = System.getenv("KAFKA_BOOTSTRAP_SERVERS"); @@ -34,6 +36,7 @@ public static void main(String[] args) { bootstrapServers = DEFAULT_BOOTSTRAP_SERVERS; } + // Configure the producer Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put("client.id", CLIENT_ID); @@ -48,7 +51,7 @@ public static void main(String[] args) { CONFLUENT_USERNAME, CONFLUENT_PASSWORD )); - // Optional producer tuning + // Set some basic configuration props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, COMPRESSION_TYPE); props.put(ProducerConfig.BATCH_SIZE_CONFIG, BATCH_SIZE); @@ -56,12 +59,24 @@ public static void main(String[] args) { props.forEach((k, v) -> logger.info(" {} = {}", k, v)); try (Producer producer = new KafkaProducer<>(props)) { - String topic = TOPIC; - String key = KEY; - String value = VALUE; + // The Superstream Agent should have intercepted the producer creation + // and potentially optimized the configuration + + // Log the actual configuration used by the producer + logger.info("Actual producer configuration (after potential Superstream optimization):"); + + // Get the actual configuration from the producer via reflection + java.lang.reflect.Field configField = producer.getClass().getDeclaredField("producerConfig"); + configField.setAccessible(true); + org.apache.kafka.clients.producer.ProducerConfig actualConfig = + (org.apache.kafka.clients.producer.ProducerConfig) configField.get(producer); + + logger.info(" compression.type = {}", actualConfig.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); + logger.info(" batch.size = {}", actualConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG)); - logger.info("Sending message to topic {}: key={}, value={}", topic, key, value); - producer.send(new ProducerRecord<>(topic, key, value)).get(); + // Send a test message + logger.info("Sending message to topic {}: key={}, value={}", TOPIC_NAME, MESSAGE_KEY, MESSAGE_VALUE); + producer.send(new ProducerRecord<>(TOPIC_NAME, MESSAGE_KEY, MESSAGE_VALUE)).get(); logger.info("Message sent successfully!"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaProducerExample.java b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaProducerExample.java index 78539e7..eb64a57 100644 --- a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaProducerExample.java +++ b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaProducerExample.java @@ -11,41 +11,37 @@ import java.util.Properties; import java.util.concurrent.ExecutionException; -/** - * Example application that uses the Kafka Clients API to produce messages. - * Run with: - * java -javaagent:path/to/superstream-clients-1.0.0.jar -Dlogback.configurationFile=logback.xml -jar kafka-clients-example-1.0.0-jar-with-dependencies.jar - * - * Prerequisites: - * 1. A Kafka server with the following topics: - * - superstream.metadata_v1 - with a configuration message - * - superstream.clients - for client reports - * - example-topic - for test messages - * - * Environment variables: - * - KAFKA_BOOTSTRAP_SERVERS: The Kafka bootstrap servers (default: localhost:9092) - * - SUPERSTREAM_TOPICS_LIST: Comma-separated list of topics to optimize for (default: example-topic) - */ public class KafkaProducerExample { private static final Logger logger = LoggerFactory.getLogger(KafkaProducerExample.class); + // === Configuration Constants === + private static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092"; + + private static final String CLIENT_ID = "superstream-example-producer"; + private static final String COMPRESSION_TYPE = "gzip"; + private static final int BATCH_SIZE = 16384; + + private static final String TOPIC_NAME = "example-topic"; + private static final String MESSAGE_KEY = "test-key"; + private static final String MESSAGE_VALUE = "Hello, Superstream!"; + public static void main(String[] args) { - // Get bootstrap servers from environment variable or use default String bootstrapServers = System.getenv("KAFKA_BOOTSTRAP_SERVERS"); if (bootstrapServers == null || bootstrapServers.isEmpty()) { - bootstrapServers = "localhost:9092"; + bootstrapServers = DEFAULT_BOOTSTRAP_SERVERS; } // Configure the producer Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - props.put("client.id", "superstream-example-producer"); + props.put("client.id", CLIENT_ID); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Set some basic configuration - props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); - props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, COMPRESSION_TYPE); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, BATCH_SIZE); + logger.info("Creating producer with bootstrap servers: {}", bootstrapServers); logger.info("Original producer configuration:"); props.forEach((k, v) -> logger.info(" {} = {}", k, v)); @@ -63,17 +59,12 @@ public static void main(String[] args) { org.apache.kafka.clients.producer.ProducerConfig actualConfig = (org.apache.kafka.clients.producer.ProducerConfig) configField.get(producer); - // Get the values for key configuration parameters logger.info(" compression.type = {}", actualConfig.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); logger.info(" batch.size = {}", actualConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG)); // Send a test message - String topic = "example-topic"; - String key = "test-key"; - String value = "Hello, Superstream!"; - - logger.info("Sending message to topic {}: key={}, value={}", topic, key, value); - producer.send(new ProducerRecord<>(topic, key, value)).get(); + logger.info("Sending message to topic {}: key={}, value={}", TOPIC_NAME, MESSAGE_KEY, MESSAGE_VALUE); + producer.send(new ProducerRecord<>(TOPIC_NAME, MESSAGE_KEY, MESSAGE_VALUE)).get(); logger.info("Message sent successfully!"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/MskKafkaExample.java b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/MskKafkaExample.java index 463231a..e6d8024 100644 --- a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/MskKafkaExample.java +++ b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/MskKafkaExample.java @@ -17,10 +17,6 @@ public class MskKafkaExample { "b-24-public.superstreamstgmsk.0y88si.c2.kafka.eu-central-1.amazonaws.com:9198," + "b-2-public.superstreamstgmsk.0y88si.c2.kafka.eu-central-1.amazonaws.com:9198"; - private static final String CLIENT_ID = "superstream-example-producer"; - private static final String COMPRESSION_TYPE = "gzip"; - private static final int BATCH_SIZE = 16384; - // AWS IAM Credentials private static final String AWS_ACCESS_KEY_ID = ""; private static final String AWS_SECRET_ACCESS_KEY = ""; @@ -30,9 +26,13 @@ public class MskKafkaExample { private static final String SASL_JAAS_CONFIG = "software.amazon.msk.auth.iam.IAMLoginModule required;"; private static final String SASL_CALLBACK_HANDLER = "software.amazon.msk.auth.iam.IAMClientCallbackHandler"; - private static final String TOPIC = "example-topic"; - private static final String KEY = "test-key"; - private static final String VALUE = "Hello, Superstream!"; + private static final String CLIENT_ID = "superstream-example-producer"; + private static final String COMPRESSION_TYPE = "gzip"; + private static final int BATCH_SIZE = 16384; + + private static final String TOPIC_NAME = "example-topic"; + private static final String MESSAGE_KEY = "test-key"; + private static final String MESSAGE_VALUE = "Hello, Superstream!"; public static void main(String[] args) { @@ -41,11 +41,10 @@ public static void main(String[] args) { bootstrapServers = DEFAULT_BOOTSTRAP_SERVERS; } - // Set AWS credentials (not recommended in code — use ~/.aws/credentials or env vars in real apps) System.setProperty("aws.accessKeyId", AWS_ACCESS_KEY_ID); System.setProperty("aws.secretKey", AWS_SECRET_ACCESS_KEY); - // Kafka properties + // Configure the producer Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put("client.id", CLIENT_ID); @@ -62,15 +61,27 @@ public static void main(String[] args) { props.put(ProducerConfig.BATCH_SIZE_CONFIG, BATCH_SIZE); logger.info("Creating producer with bootstrap servers: {}", bootstrapServers); - props.forEach((k, v) -> logger.info(" {} = {}", k, v.toString().contains("secret") ? "****" : v)); + props.forEach((k, v) -> logger.info(" {} = {}", k, v)); try (Producer producer = new KafkaProducer<>(props)) { - String topic = TOPIC; - String key = KEY; - String value = VALUE; + // The Superstream Agent should have intercepted the producer creation + // and potentially optimized the configuration + + // Log the actual configuration used by the producer + logger.info("Actual producer configuration (after potential Superstream optimization):"); + + // Get the actual configuration from the producer via reflection + java.lang.reflect.Field configField = producer.getClass().getDeclaredField("producerConfig"); + configField.setAccessible(true); + org.apache.kafka.clients.producer.ProducerConfig actualConfig = + (org.apache.kafka.clients.producer.ProducerConfig) configField.get(producer); + + logger.info(" compression.type = {}", actualConfig.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); + logger.info(" batch.size = {}", actualConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG)); - logger.info("Sending message to topic {}: key={}, value={}", topic, key, value); - producer.send(new ProducerRecord<>(topic, key, value)).get(); + // Send a test message + logger.info("Sending message to topic {}: key={}, value={}", TOPIC_NAME, MESSAGE_KEY, MESSAGE_VALUE); + producer.send(new ProducerRecord<>(TOPIC_NAME, MESSAGE_KEY, MESSAGE_VALUE)).get(); logger.info("Message sent successfully!"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); From 055702df841b03843a181842bf113e4b8dc2afb0 Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Tue, 22 Apr 2025 12:58:17 +0300 Subject: [PATCH 05/12] update examples --- .../examples/KafkaProducerExample.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaProducerExample.java b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaProducerExample.java index eb64a57..6ab7ed2 100644 --- a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaProducerExample.java +++ b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaProducerExample.java @@ -11,6 +11,21 @@ import java.util.Properties; import java.util.concurrent.ExecutionException; +/** + * Example application that uses the Kafka Clients API to produce messages. + * Run with: + * java -javaagent:path/to/superstream-clients-1.0.0.jar -Dlogback.configurationFile=logback.xml -jar kafka-clients-example-1.0.0-jar-with-dependencies.jar + * + * Prerequisites: + * 1. A Kafka server with the following topics: + * - superstream.metadata_v1 - with a configuration message + * - superstream.clients - for client reports + * - example-topic - for test messages + * + * Environment variables: + * - KAFKA_BOOTSTRAP_SERVERS: The Kafka bootstrap servers (default: localhost:9092) + * - SUPERSTREAM_TOPICS_LIST: Comma-separated list of topics to optimize for (default: example-topic) + */ public class KafkaProducerExample { private static final Logger logger = LoggerFactory.getLogger(KafkaProducerExample.class); From f4e7e05b40d67e9d5369e3201e327fc9f8f334e9 Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Tue, 22 Apr 2025 13:00:39 +0300 Subject: [PATCH 06/12] update examples --- .../examples/ConfluentProducerExample.java | 15 +++++++++++++++ .../superstream/examples/MskKafkaExample.java | 17 +++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/ConfluentProducerExample.java b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/ConfluentProducerExample.java index aeb86e2..56bc30f 100644 --- a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/ConfluentProducerExample.java +++ b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/ConfluentProducerExample.java @@ -8,6 +8,21 @@ import java.util.Properties; import java.util.concurrent.ExecutionException; +/** + * Example application that uses the Kafka Clients API to produce messages. + * Run with: + * java -javaagent:path/to/superstream-clients-1.0.0.jar -Dlogback.configurationFile=logback.xml -jar kafka-clients-example-1.0.0-jar-with-dependencies.jar + * + * Prerequisites: + * 1. A Kafka server with the following topics: + * - superstream.metadata_v1 - with a configuration message + * - superstream.clients - for client reports + * - example-topic - for test messages + * + * Environment variables: + * - KAFKA_BOOTSTRAP_SERVERS: The Kafka bootstrap servers (default: pkc-7xoy1.eu-central-1.aws.confluent.cloud:9092) + * - SUPERSTREAM_TOPICS_LIST: Comma-separated list of topics to optimize for (default: example-topic) + */ public class ConfluentProducerExample { private static final Logger logger = LoggerFactory.getLogger(ConfluentProducerExample.class); diff --git a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/MskKafkaExample.java b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/MskKafkaExample.java index e6d8024..1a0bc6f 100644 --- a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/MskKafkaExample.java +++ b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/MskKafkaExample.java @@ -8,6 +8,23 @@ import java.util.Properties; import java.util.concurrent.ExecutionException; +/** + * Example application that uses the Kafka Clients API to produce messages. + * Run with: + * java -javaagent:path/to/superstream-clients-1.0.0.jar -Dlogback.configurationFile=logback.xml -jar kafka-clients-example-1.0.0-jar-with-dependencies.jar + * + * Prerequisites: + * 1. A Kafka server with the following topics: + * - superstream.metadata_v1 - with a configuration message + * - superstream.clients - for client reports + * - example-topic - for test messages + * + * Environment variables: + * - KAFKA_BOOTSTRAP_SERVERS: The Kafka bootstrap servers (default: b-23-public.superstreamstgmsk.0y88si.c2.kafka.eu-central-1.amazonaws.com:9198, + * b-24-public.superstreamstgmsk.0y88si.c2.kafka.eu-central-1.amazonaws.com:9198, + * b-2-public.superstreamstgmsk.0y88si.c2.kafka.eu-central-1.amazonaws.com:9198) + * - SUPERSTREAM_TOPICS_LIST: Comma-separated list of topics to optimize for (default: example-topic) + */ public class MskKafkaExample { private static final Logger logger = LoggerFactory.getLogger(MskKafkaExample.class); From ee18c63f09ff392059adfd2c0a7384d181e0877e Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Mon, 28 Apr 2025 08:47:34 +0300 Subject: [PATCH 07/12] fix aiven example --- .../examples/AivenKafkaExample.java | 32 +++++++++---------- .../examples/ConfluentProducerExample.java | 2 +- .../superstream/examples/MskKafkaExample.java | 2 +- 3 files changed, 17 insertions(+), 19 deletions(-) diff --git a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java index 8318226..2b9c691 100644 --- a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java +++ b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java @@ -8,9 +8,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; import java.util.Properties; import java.util.concurrent.ExecutionException; @@ -28,6 +25,10 @@ * Environment variables: * - KAFKA_BOOTSTRAP_SERVERS: The Kafka bootstrap servers (default: superstream-test-superstream-3591.k.aivencloud.com:18837) * - SUPERSTREAM_TOPICS_LIST: Comma-separated list of topics to optimize for (default: example-topic) + * + * SSL Requirements: + * - Truststore file (truststore.jks) containing the broker's CA certificate. + * - Keystore file (keystore.p12) containing the client's certificate and private key. */ public class AivenKafkaExample { private static final Logger logger = LoggerFactory.getLogger(AivenKafkaExample.class); @@ -35,20 +36,16 @@ public class AivenKafkaExample { // === Configuration Constants === private static final String DEFAULT_BOOTSTRAP_SERVERS = "superstream-test-superstream-3591.k.aivencloud.com:18837"; - private static final String TRUSTSTORE_PATH = - "../../examples/kafka-clients-example/src/main/resources/crets/ca.pem"; - private static final String KEYSTORE_CERT_PATH = - "../../examples/kafka-clients-example/src/main/resources/crets/client.cert.pem"; + private static final String TRUSTSTORE_LOCATION = "/Users/shohamroditi/superstream/superstream-clients-java/examples/kafka-clients-example/src/main/resources/crets/truststore.jks"; + private static final String TRUSTSTORE_PASSWORD = "changeit"; private static final String KEYSTORE_KEY_PATH = - "../../examples/kafka-clients-example/src/main/resources/crets/client.pk8.pem"; + "/Users/shohamroditi/superstream/superstream-clients-java/examples/kafka-clients-example/src/main/resources/crets/keystore.p12"; private static final String SECURITY_PROTOCOL = "SSL"; - private static final String TRUSTSTORE_TYPE = "PEM"; - private static final String KEYSTORE_TYPE = "PEM"; - private static final String ENDPOINT_IDENTIFICATION_ALGORITHM = ""; // skip hostname verification + private static final String TRUSTSTORE_TYPE = "PKCS12"; private static final String CLIENT_ID = "superstream-example-producer"; private static final String COMPRESSION_TYPE = "gzip"; - private static final int BATCH_SIZE = 16384; + private static final String BATCH_SIZE = "16384"; private static final String TOPIC_NAME = "example-topic"; private static final String MESSAGE_KEY = "test-key"; @@ -69,12 +66,13 @@ public static void main(String[] args) { props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put("security.protocol", SECURITY_PROTOCOL); + props.put("ssl.truststore.location", TRUSTSTORE_LOCATION); + props.put("ssl.truststore.password", TRUSTSTORE_PASSWORD); props.put("ssl.truststore.type", TRUSTSTORE_TYPE); - props.put("ssl.keystore.type", KEYSTORE_TYPE); - props.put("ssl.truststore.certificates", TRUSTSTORE_PATH); - props.put("ssl.keystore.certificate.chain", KEYSTORE_CERT_PATH); - props.put("ssl.keystore.key", KEYSTORE_KEY_PATH); - props.put("ssl.endpoint.identification.algorithm", ENDPOINT_IDENTIFICATION_ALGORITHM); + props.put("ssl.keystore.location",KEYSTORE_KEY_PATH ); + props.put("ssl.keystore.password", TRUSTSTORE_PASSWORD); + props.put("ssl.keystore.type", TRUSTSTORE_TYPE); + props.put("ssl.endpoint.identification.algorithm", ""); // Set some basic configuration props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, COMPRESSION_TYPE); diff --git a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/ConfluentProducerExample.java b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/ConfluentProducerExample.java index 56bc30f..0779ec0 100644 --- a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/ConfluentProducerExample.java +++ b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/ConfluentProducerExample.java @@ -39,7 +39,7 @@ public class ConfluentProducerExample { private static final String CLIENT_ID = "superstream-example-producer"; private static final String COMPRESSION_TYPE = "gzip"; - private static final int BATCH_SIZE = 16384; + private static final String BATCH_SIZE = "16384"; private static final String TOPIC_NAME = "example-topic"; private static final String MESSAGE_KEY = "test-key"; diff --git a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/MskKafkaExample.java b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/MskKafkaExample.java index 1a0bc6f..b68829d 100644 --- a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/MskKafkaExample.java +++ b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/MskKafkaExample.java @@ -45,7 +45,7 @@ public class MskKafkaExample { private static final String CLIENT_ID = "superstream-example-producer"; private static final String COMPRESSION_TYPE = "gzip"; - private static final int BATCH_SIZE = 16384; + private static final String BATCH_SIZE = "16384"; private static final String TOPIC_NAME = "example-topic"; private static final String MESSAGE_KEY = "test-key"; From 318bc2a34eb5a1d58b5aa62f29cdbfcad59da152 Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Mon, 28 Apr 2025 09:04:53 +0300 Subject: [PATCH 08/12] aiven example --- .../examples/AivenKafkaExample.java | 30 +++++++++++++++++-- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java index 2b9c691..f25f08c 100644 --- a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java +++ b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java @@ -12,7 +12,7 @@ import java.util.concurrent.ExecutionException; /** - * Example application that uses the Kafka Clients API to produce messages. + * Example application that uses the Kafka Clients API to produce messages securely over SSL. * Run with: * java -javaagent:path/to/superstream-clients-1.0.0.jar -Dlogback.configurationFile=logback.xml -jar kafka-clients-example-1.0.0-jar-with-dependencies.jar * @@ -27,8 +27,32 @@ * - SUPERSTREAM_TOPICS_LIST: Comma-separated list of topics to optimize for (default: example-topic) * * SSL Requirements: - * - Truststore file (truststore.jks) containing the broker's CA certificate. - * - Keystore file (keystore.p12) containing the client's certificate and private key. + * Step 1: Create the truststore (truststore.jks) containing the broker's CA certificate: + * + * keytool -importcert \ + * -trustcacerts \ + * -alias aiven-ca \ + * -file /path/to/ca.pem \ + * -keystore /path/to/truststore.jks \ + * -storepass changeit + * + * Step 2: Create the keystore (keystore.p12) containing the client's certificate and private key: + * + * openssl pkcs12 -export \ + * -in /path/to/client.cert.pem \ + * -inkey /path/to/client.pk8.pem \ + * -out /path/to/keystore.p12 \ + * -name kafka-client \ + * -passout pass:changeit + * + * Environment Variables: + * - KAFKA_BOOTSTRAP_SERVERS: The Kafka bootstrap servers (default: superstream-test-superstream-3591.k.aivencloud.com:18837) + * - SUPERSTREAM_TOPICS_LIST: Comma-separated list of topics to optimize for (default: example-topic) + * + * Notes: + * - The truststore ensures that the client trusts the Kafka broker's certificate. + * - The keystore provides the client's authentication to the broker (mutual TLS). + * - If the client certificates are not signed by the broker’s CA, connection will fail. */ public class AivenKafkaExample { private static final Logger logger = LoggerFactory.getLogger(AivenKafkaExample.class); From 224725a3bd0c177c945bf52d51c4c15e46b4bafd Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Mon, 28 Apr 2025 09:23:11 +0300 Subject: [PATCH 09/12] update batch size to string --- .../main/java/ai/superstream/examples/KafkaProducerExample.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaProducerExample.java b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaProducerExample.java index 6ab7ed2..20bd443 100644 --- a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaProducerExample.java +++ b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaProducerExample.java @@ -34,7 +34,7 @@ public class KafkaProducerExample { private static final String CLIENT_ID = "superstream-example-producer"; private static final String COMPRESSION_TYPE = "gzip"; - private static final int BATCH_SIZE = 16384; + private static final String BATCH_SIZE = "16384"; private static final String TOPIC_NAME = "example-topic"; private static final String MESSAGE_KEY = "test-key"; From 4c9f9d3854060caac428548a9f0f5dd3d98e9385 Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Mon, 28 Apr 2025 09:55:38 +0300 Subject: [PATCH 10/12] update aiven example --- .../examples/AivenKafkaExample.java | 37 ++++++++++++------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java index f25f08c..ba8924a 100644 --- a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java +++ b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java @@ -12,7 +12,8 @@ import java.util.concurrent.ExecutionException; /** - * Example application that uses the Kafka Clients API to produce messages securely over SSL. + * Example application that uses the Kafka Clients API to produce messages securely over SSL (TLS). + * * Run with: * java -javaagent:path/to/superstream-clients-1.0.0.jar -Dlogback.configurationFile=logback.xml -jar kafka-clients-example-1.0.0-jar-with-dependencies.jar * @@ -27,7 +28,15 @@ * - SUPERSTREAM_TOPICS_LIST: Comma-separated list of topics to optimize for (default: example-topic) * * SSL Requirements: - * Step 1: Create the truststore (truststore.jks) containing the broker's CA certificate: + * To establish a secure SSL/TLS connection with Kafka brokers, you must provide **three files**: + * + * - CA certificate (`ca.pem`) — the trusted certificate authority (from the Kafka provider) + * - Client certificate (`client.cert.pem`) — identifies the client application + * - Client private key (`client.pk8.pem`) — the private key paired with the client certificate + * + * SSL Setup Steps: + * + * Step 1: Create a Truststore (truststore.jks) containing the CA certificate: * * keytool -importcert \ * -trustcacerts \ @@ -36,7 +45,7 @@ * -keystore /path/to/truststore.jks \ * -storepass changeit * - * Step 2: Create the keystore (keystore.p12) containing the client's certificate and private key: + * Step 2: Create a Keystore (keystore.p12) containing the client certificate and private key: * * openssl pkcs12 -export \ * -in /path/to/client.cert.pem \ @@ -45,25 +54,27 @@ * -name kafka-client \ * -passout pass:changeit * - * Environment Variables: - * - KAFKA_BOOTSTRAP_SERVERS: The Kafka bootstrap servers (default: superstream-test-superstream-3591.k.aivencloud.com:18837) - * - SUPERSTREAM_TOPICS_LIST: Comma-separated list of topics to optimize for (default: example-topic) - * * Notes: - * - The truststore ensures that the client trusts the Kafka broker's certificate. - * - The keystore provides the client's authentication to the broker (mutual TLS). - * - If the client certificates are not signed by the broker’s CA, connection will fail. + * - The Truststore (`truststore.jks`) ensures the client trusts the Kafka broker's SSL certificate. + * - The Keystore (`keystore.p12`) provides client authentication (mutual TLS) toward the broker. + * - Both Truststore and Keystore must be correctly configured for SSL handshake to succeed. + * - The ssl.endpoint.identification.algorithm property should not be disabled for Aiven clusters, as they provide valid certificates matching the hostname. + * + * Security Advice: + * - Use strong passwords instead of "changeit" in production environments. + * - Protect your truststore.jks and keystore.p12 files carefully; leaking them would compromise your SSL security. */ + public class AivenKafkaExample { private static final Logger logger = LoggerFactory.getLogger(AivenKafkaExample.class); // === Configuration Constants === private static final String DEFAULT_BOOTSTRAP_SERVERS = "superstream-test-superstream-3591.k.aivencloud.com:18837"; - private static final String TRUSTSTORE_LOCATION = "/Users/shohamroditi/superstream/superstream-clients-java/examples/kafka-clients-example/src/main/resources/crets/truststore.jks"; + private static final String TRUSTSTORE_LOCATION = "/superstream-clients-java/examples/kafka-clients-example/src/main/resources/crets/truststore.jks"; private static final String TRUSTSTORE_PASSWORD = "changeit"; private static final String KEYSTORE_KEY_PATH = - "/Users/shohamroditi/superstream/superstream-clients-java/examples/kafka-clients-example/src/main/resources/crets/keystore.p12"; + "/superstream-clients-java/examples/kafka-clients-example/src/main/resources/crets/keystore.p12"; private static final String SECURITY_PROTOCOL = "SSL"; private static final String TRUSTSTORE_TYPE = "PKCS12"; @@ -96,8 +107,6 @@ public static void main(String[] args) { props.put("ssl.keystore.location",KEYSTORE_KEY_PATH ); props.put("ssl.keystore.password", TRUSTSTORE_PASSWORD); props.put("ssl.keystore.type", TRUSTSTORE_TYPE); - props.put("ssl.endpoint.identification.algorithm", ""); - // Set some basic configuration props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, COMPRESSION_TYPE); props.put(ProducerConfig.BATCH_SIZE_CONFIG, BATCH_SIZE); From 45501710bc03544b69cb8833ca808b730ce8e8f9 Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Mon, 28 Apr 2025 10:13:48 +0300 Subject: [PATCH 11/12] update aiven example --- .../main/java/ai/superstream/examples/AivenKafkaExample.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java index ba8924a..7905372 100644 --- a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java +++ b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java @@ -61,6 +61,10 @@ * - The ssl.endpoint.identification.algorithm property should not be disabled for Aiven clusters, as they provide valid certificates matching the hostname. * * Security Advice: + * - storepass is the password that protects the truststore (`truststore.jks`) and the keystore (`keystore.p12`). + * - In practice, truststore and keystore can have different passwords, but often the same password is used for simplicity. + * - The password must be at least 6 characters long and must match the password configured in your Java Kafka client. + * - You must configure these passwords properly in your Java Kafka client (`ssl.truststore.password`, `ssl.keystore.password`). * - Use strong passwords instead of "changeit" in production environments. * - Protect your truststore.jks and keystore.p12 files carefully; leaking them would compromise your SSL security. */ From c384feec36c641d69ce19e3b5eb6fdfedf144b28 Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Tue, 29 Apr 2025 22:30:08 +0300 Subject: [PATCH 12/12] fixes --- .../examples/AivenKafkaExample.java | 24 +++++++++---------- .../examples/ConfluentProducerExample.java | 2 +- .../examples/KafkaProducerExample.java | 2 +- .../superstream/examples/MskKafkaExample.java | 2 +- 4 files changed, 14 insertions(+), 16 deletions(-) diff --git a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java index 7905372..2c36474 100644 --- a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java +++ b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java @@ -28,16 +28,15 @@ * - SUPERSTREAM_TOPICS_LIST: Comma-separated list of topics to optimize for (default: example-topic) * * SSL Requirements: - * To establish a secure SSL/TLS connection with Kafka brokers, you must provide **three files**: + * To establish a secure SSL/TLS connection with Kafka brokers, you must generate and configure: * * - CA certificate (`ca.pem`) — the trusted certificate authority (from the Kafka provider) * - Client certificate (`client.cert.pem`) — identifies the client application * - Client private key (`client.pk8.pem`) — the private key paired with the client certificate * - * SSL Setup Steps: + * Steps to prepare truststore and keystore: * * Step 1: Create a Truststore (truststore.jks) containing the CA certificate: - * * keytool -importcert \ * -trustcacerts \ * -alias aiven-ca \ @@ -46,7 +45,6 @@ * -storepass changeit * * Step 2: Create a Keystore (keystore.p12) containing the client certificate and private key: - * * openssl pkcs12 -export \ * -in /path/to/client.cert.pem \ * -inkey /path/to/client.pk8.pem \ @@ -58,11 +56,8 @@ * - The Truststore (`truststore.jks`) ensures the client trusts the Kafka broker's SSL certificate. * - The Keystore (`keystore.p12`) provides client authentication (mutual TLS) toward the broker. * - Both Truststore and Keystore must be correctly configured for SSL handshake to succeed. - * - The ssl.endpoint.identification.algorithm property should not be disabled for Aiven clusters, as they provide valid certificates matching the hostname. * * Security Advice: - * - storepass is the password that protects the truststore (`truststore.jks`) and the keystore (`keystore.p12`). - * - In practice, truststore and keystore can have different passwords, but often the same password is used for simplicity. * - The password must be at least 6 characters long and must match the password configured in your Java Kafka client. * - You must configure these passwords properly in your Java Kafka client (`ssl.truststore.password`, `ssl.keystore.password`). * - Use strong passwords instead of "changeit" in production environments. @@ -75,16 +70,19 @@ public class AivenKafkaExample { // === Configuration Constants === private static final String DEFAULT_BOOTSTRAP_SERVERS = "superstream-test-superstream-3591.k.aivencloud.com:18837"; - private static final String TRUSTSTORE_LOCATION = "/superstream-clients-java/examples/kafka-clients-example/src/main/resources/crets/truststore.jks"; + // Replace with full absolute path to your generated truststore.jks + private static final String TRUSTSTORE_LOCATION = "/absolute/path/to/truststore.jks"; + // The password must be at least 6 characters long and must match the password configured in your Java Kafka client. private static final String TRUSTSTORE_PASSWORD = "changeit"; - private static final String KEYSTORE_KEY_PATH = - "/superstream-clients-java/examples/kafka-clients-example/src/main/resources/crets/keystore.p12"; + // Replace with full absolute path to your generated keystore.p12 + private static final String KEYSTORE_KEY_PATH = "/absolute/path/to/keystore.p12"; private static final String SECURITY_PROTOCOL = "SSL"; - private static final String TRUSTSTORE_TYPE = "PKCS12"; + private static final String TRUSTSTORE_TYPE = "JKS"; + private static final String KEYSTORE_TYPE = "PKCS12"; private static final String CLIENT_ID = "superstream-example-producer"; private static final String COMPRESSION_TYPE = "gzip"; - private static final String BATCH_SIZE = "16384"; + private static final Integer BATCH_SIZE = 16384; private static final String TOPIC_NAME = "example-topic"; private static final String MESSAGE_KEY = "test-key"; @@ -110,7 +108,7 @@ public static void main(String[] args) { props.put("ssl.truststore.type", TRUSTSTORE_TYPE); props.put("ssl.keystore.location",KEYSTORE_KEY_PATH ); props.put("ssl.keystore.password", TRUSTSTORE_PASSWORD); - props.put("ssl.keystore.type", TRUSTSTORE_TYPE); + props.put("ssl.keystore.type", KEYSTORE_TYPE); // Set some basic configuration props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, COMPRESSION_TYPE); props.put(ProducerConfig.BATCH_SIZE_CONFIG, BATCH_SIZE); diff --git a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/ConfluentProducerExample.java b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/ConfluentProducerExample.java index 0779ec0..6da9423 100644 --- a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/ConfluentProducerExample.java +++ b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/ConfluentProducerExample.java @@ -39,7 +39,7 @@ public class ConfluentProducerExample { private static final String CLIENT_ID = "superstream-example-producer"; private static final String COMPRESSION_TYPE = "gzip"; - private static final String BATCH_SIZE = "16384"; + private static final Integer BATCH_SIZE = 16384; private static final String TOPIC_NAME = "example-topic"; private static final String MESSAGE_KEY = "test-key"; diff --git a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaProducerExample.java b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaProducerExample.java index 20bd443..cb84139 100644 --- a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaProducerExample.java +++ b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaProducerExample.java @@ -34,7 +34,7 @@ public class KafkaProducerExample { private static final String CLIENT_ID = "superstream-example-producer"; private static final String COMPRESSION_TYPE = "gzip"; - private static final String BATCH_SIZE = "16384"; + private static final Integer BATCH_SIZE = 16384; private static final String TOPIC_NAME = "example-topic"; private static final String MESSAGE_KEY = "test-key"; diff --git a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/MskKafkaExample.java b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/MskKafkaExample.java index b68829d..564485b 100644 --- a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/MskKafkaExample.java +++ b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/MskKafkaExample.java @@ -45,7 +45,7 @@ public class MskKafkaExample { private static final String CLIENT_ID = "superstream-example-producer"; private static final String COMPRESSION_TYPE = "gzip"; - private static final String BATCH_SIZE = "16384"; + private static final Integer BATCH_SIZE = 16384; private static final String TOPIC_NAME = "example-topic"; private static final String MESSAGE_KEY = "test-key";