From 381d1b0091d0d4458f4648e2eb947f7baf702b83 Mon Sep 17 00:00:00 2001 From: Anders Swanson Date: Mon, 3 Nov 2025 13:24:37 -0800 Subject: [PATCH] Updates for OKafka Lab Signed-off-by: Anders Swanson --- code-teq/okafka-lab/docker-compose.yml | 4 +- .../oraclefree/grant_permissions.sql | 10 +++- .../java/com/example/okafka/CreateTopic.java | 4 +- .../main/java/com/example/okafka/OKafka.java | 2 + .../com/example/okafka/OKafkaConsumer.java | 2 + .../com/example/okafka/OKafkaProducer.java | 23 +++----- .../example/okafka/TransactionalConsumer.java | 51 ++++++++++++++++ .../example/okafka/TransactionalProducer.java | 58 +++++++++++++++++++ 8 files changed, 133 insertions(+), 21 deletions(-) create mode 100644 code-teq/okafka-lab/src/main/java/com/example/okafka/TransactionalConsumer.java create mode 100644 code-teq/okafka-lab/src/main/java/com/example/okafka/TransactionalProducer.java diff --git a/code-teq/okafka-lab/docker-compose.yml b/code-teq/okafka-lab/docker-compose.yml index bbd08000c..989eac689 100644 --- a/code-teq/okafka-lab/docker-compose.yml +++ b/code-teq/okafka-lab/docker-compose.yml @@ -1,7 +1,7 @@ services: - okafkadb: + okafka: image: gvenzl/oracle-free:23.9-slim-faststart - container_name: okafkadb + container_name: okafka ports: - 9092:1521 environment: diff --git a/code-teq/okafka-lab/oraclefree/grant_permissions.sql b/code-teq/okafka-lab/oraclefree/grant_permissions.sql index f1135e61e..227f29360 100644 --- a/code-teq/okafka-lab/oraclefree/grant_permissions.sql +++ b/code-teq/okafka-lab/oraclefree/grant_permissions.sql @@ -4,10 +4,10 @@ alter session set container = freepdb1; -- user for okafka create user TESTUSER identified by testpwd; grant create session to TESTUSER; -grant unlimited tablespace to TESTUSER; -grant connect, resource to TESTUSER; +grant resource, connect, unlimited tablespace to TESTUSER; -- okafka permissions +grant aq_user_role to TESTUSER; grant execute on dbms_aq to TESTUSER; grant execute on dbms_aqadm to TESTUSER; grant select on gv_$session to TESTUSER; @@ -19,3 +19,9 @@ grant select on gv_$pdbs to TESTUSER; grant select on user_queue_partition_assignment_table to TESTUSER; exec dbms_aqadm.GRANT_PRIV_FOR_RM_PLAN('TESTUSER'); commit; + +create table testuser.log ( + id number generated always as identity primary key, + produced timestamp, + consumed timestamp +); diff --git a/code-teq/okafka-lab/src/main/java/com/example/okafka/CreateTopic.java b/code-teq/okafka-lab/src/main/java/com/example/okafka/CreateTopic.java index 884d2ece9..efc761618 100644 --- a/code-teq/okafka-lab/src/main/java/com/example/okafka/CreateTopic.java +++ b/code-teq/okafka-lab/src/main/java/com/example/okafka/CreateTopic.java @@ -10,6 +10,7 @@ import java.util.concurrent.ExecutionException; import static com.example.okafka.OKafka.TOPIC_NAME; +import static com.example.okafka.OKafka.TRANSACTIONAL_TOPIC_NAME; import static com.example.okafka.OKafkaAuthentication.getAuthenticationProperties; public class CreateTopic { @@ -19,7 +20,8 @@ public static void main(String[] args) { try (Admin admin = AdminClient.create(props)) { NewTopic testTopic = new NewTopic(TOPIC_NAME, 1, (short) 1); - admin.createTopics(List.of(testTopic)) + NewTopic transactionalTestTopic = new NewTopic(TRANSACTIONAL_TOPIC_NAME, 1, (short) 1); + admin.createTopics(List.of(testTopic, transactionalTestTopic)) .all() .get(); System.out.println("[ADMIN] Created topic: " + testTopic.name()); diff --git a/code-teq/okafka-lab/src/main/java/com/example/okafka/OKafka.java b/code-teq/okafka-lab/src/main/java/com/example/okafka/OKafka.java index 8470bde6d..ab22412aa 100644 --- a/code-teq/okafka-lab/src/main/java/com/example/okafka/OKafka.java +++ b/code-teq/okafka-lab/src/main/java/com/example/okafka/OKafka.java @@ -3,6 +3,8 @@ public interface OKafka { String TOPIC_NAME = "test_topic"; + String TRANSACTIONAL_TOPIC_NAME = "test_transactional_topic"; + static String getEnv(String key, String defaultValue) { String value = System.getenv(key); if (value == null || value.isEmpty()) { diff --git a/code-teq/okafka-lab/src/main/java/com/example/okafka/OKafkaConsumer.java b/code-teq/okafka-lab/src/main/java/com/example/okafka/OKafkaConsumer.java index 54a23339f..6e148c649 100644 --- a/code-teq/okafka-lab/src/main/java/com/example/okafka/OKafkaConsumer.java +++ b/code-teq/okafka-lab/src/main/java/com/example/okafka/OKafkaConsumer.java @@ -33,6 +33,8 @@ public static void main(String[] args) { for (ConsumerRecord record : records) { System.out.println("Consumed record: " + record.value()); } + + consumer.commitSync(); } } } diff --git a/code-teq/okafka-lab/src/main/java/com/example/okafka/OKafkaProducer.java b/code-teq/okafka-lab/src/main/java/com/example/okafka/OKafkaProducer.java index eba198fba..416f108d8 100644 --- a/code-teq/okafka-lab/src/main/java/com/example/okafka/OKafkaProducer.java +++ b/code-teq/okafka-lab/src/main/java/com/example/okafka/OKafkaProducer.java @@ -6,9 +6,6 @@ import java.time.Instant; import java.util.Properties; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import static com.example.okafka.OKafka.TOPIC_NAME; import static com.example.okafka.OKafkaAuthentication.getAuthenticationProperties; @@ -21,14 +18,6 @@ public static void main(String[] args) throws InterruptedException { props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer producer = new KafkaProducer<>(props); - Runnable producerThread = () -> { - Instant now = Instant.now(); - ProducerRecord record = new ProducerRecord<>(TOPIC_NAME, "Message: " + now); - producer.send(record); - System.out.println("Producer sent message: " + record.value()); - }; - - int pauseMillis = 1000; String pm = System.getenv("PAUSE_MILLIS"); @@ -36,11 +25,13 @@ public static void main(String[] args) throws InterruptedException { pauseMillis = Integer.parseInt(pm); } - try (ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); - ) { - System.out.println("Starting producer"); - scheduler.scheduleAtFixedRate(producerThread, 0, pauseMillis, TimeUnit.MILLISECONDS); + while (true) { + Instant now = Instant.now(); + ProducerRecord record = new ProducerRecord<>(TOPIC_NAME, "Message: " + now); + producer.send(record); + System.out.println("Producer sent message: " + record.value()); + + Thread.sleep(pauseMillis); } - Thread.currentThread().join(); } } diff --git a/code-teq/okafka-lab/src/main/java/com/example/okafka/TransactionalConsumer.java b/code-teq/okafka-lab/src/main/java/com/example/okafka/TransactionalConsumer.java new file mode 100644 index 000000000..f1be0f28c --- /dev/null +++ b/code-teq/okafka-lab/src/main/java/com/example/okafka/TransactionalConsumer.java @@ -0,0 +1,51 @@ +package com.example.okafka; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.oracle.okafka.clients.consumer.KafkaConsumer; + +import java.sql.Connection; +import java.sql.Date; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Properties; + +import static com.example.okafka.OKafka.TRANSACTIONAL_TOPIC_NAME; +import static com.example.okafka.OKafkaAuthentication.getAuthenticationProperties; + +public class TransactionalConsumer { + public static void main(String[] args) throws SQLException { + Properties props = getAuthenticationProperties(); + + // Note the use of standard Kafka properties for OKafka configuration. + props.put("group.id" , "TRANSACTIONAL_CONSUMER"); + props.put("enable.auto.commit","false"); + props.put("max.poll.records", 50); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.LongDeserializer"); + props.put("auto.offset.reset", "earliest"); + + KafkaConsumer consumer = new KafkaConsumer<>(props); + + consumer.subscribe(List.of(TRANSACTIONAL_TOPIC_NAME)); + System.out.println("Subscribed to topic " + TRANSACTIONAL_TOPIC_NAME); + while (true) { + Connection conn = consumer.getDBConnection(); + String sql = "update log set consumed = ? where id = ?"; + ConsumerRecords records = consumer.poll(Duration.ofSeconds(3)); + for (ConsumerRecord record : records) { + try (PreparedStatement ps = conn.prepareStatement(sql)) { + ps.setDate(1, new Date(Instant.now().toEpochMilli())); + ps.setLong(2, record.value()); + ps.executeUpdate(); + } + System.out.println("Consumed record: " + record.value()); + } + + consumer.commitSync(); + } + } +} diff --git a/code-teq/okafka-lab/src/main/java/com/example/okafka/TransactionalProducer.java b/code-teq/okafka-lab/src/main/java/com/example/okafka/TransactionalProducer.java new file mode 100644 index 000000000..37e63b028 --- /dev/null +++ b/code-teq/okafka-lab/src/main/java/com/example/okafka/TransactionalProducer.java @@ -0,0 +1,58 @@ +package com.example.okafka; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.oracle.okafka.clients.producer.KafkaProducer; + +import java.sql.*; +import java.time.Instant; +import java.util.Properties; + +import static com.example.okafka.OKafka.TRANSACTIONAL_TOPIC_NAME; +import static com.example.okafka.OKafkaAuthentication.getAuthenticationProperties; + +public class TransactionalProducer { + public static void main(String[] args) throws InterruptedException, SQLException { + Properties props = getAuthenticationProperties(); + props.put("enable.idempotence", "true"); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.LongSerializer"); + // This property is required for transactional producers + props.put("oracle.transactional.producer", "true"); + KafkaProducer producer = new KafkaProducer<>(props); + producer.initTransactions(); + + int pauseMillis = 1000; + String pm = System.getenv("PAUSE_MILLIS"); + if (pm != null && !pm.isEmpty()) { + pauseMillis = Integer.parseInt(pm); + } + + while (true) { + Instant now = Instant.now(); + long id; + producer.beginTransaction(); + Connection conn = producer.getDBConnection(); + + final String sql = "insert into log (produced) values (?)"; + try (PreparedStatement ps = conn.prepareStatement(sql, new String[]{"id",})) { + ps.setDate(1, new Date(now.toEpochMilli())); + ps.executeUpdate(); + + ResultSet generatedKeys = ps.getGeneratedKeys(); + if (generatedKeys.next()) { + id = generatedKeys.getLong(1); + } else { + throw new SQLException("Create log message failed, no ID obtained"); + } + } + + ProducerRecord record = new ProducerRecord<>(TRANSACTIONAL_TOPIC_NAME, id); + producer.send(record); + + producer.commitTransaction(); + System.out.println("Producer sent message: " + record.value()); + + Thread.sleep(pauseMillis); + } + } +}