diff --git a/code-teq/okafka-lab/docker-compose.yml b/code-teq/okafka-lab/docker-compose.yml new file mode 100644 index 000000000..bbd08000c --- /dev/null +++ b/code-teq/okafka-lab/docker-compose.yml @@ -0,0 +1,16 @@ +services: + okafkadb: + image: gvenzl/oracle-free:23.9-slim-faststart + container_name: okafkadb + ports: + - 9092:1521 + environment: + - ORACLE_PASSWORD=Welcome12345 + volumes: + - ./oraclefree:/container-entrypoint-initdb.d + healthcheck: + test: ["CMD-SHELL", "lsnrctl status | grep READY"] + interval: 15s + timeout: 10s + retries: 5 + start_period: 30s \ No newline at end of file diff --git a/code-teq/okafka-lab/oraclefree/grant_permissions.sql b/code-teq/okafka-lab/oraclefree/grant_permissions.sql new file mode 100644 index 000000000..f1135e61e --- /dev/null +++ b/code-teq/okafka-lab/oraclefree/grant_permissions.sql @@ -0,0 +1,21 @@ +-- Set as appropriate for your database. +alter session set container = freepdb1; + +-- user for okafka +create user TESTUSER identified by testpwd; +grant create session to TESTUSER; +grant unlimited tablespace to TESTUSER; +grant connect, resource to TESTUSER; + +-- okafka permissions +grant execute on dbms_aq to TESTUSER; +grant execute on dbms_aqadm to TESTUSER; +grant select on gv_$session to TESTUSER; +grant select on v_$session to TESTUSER; +grant select on gv_$instance to TESTUSER; +grant select on gv_$listener_network to TESTUSER; +grant select on SYS.DBA_RSRC_PLAN_DIRECTIVES to TESTUSER; +grant select on gv_$pdbs to TESTUSER; +grant select on user_queue_partition_assignment_table to TESTUSER; +exec dbms_aqadm.GRANT_PRIV_FOR_RM_PLAN('TESTUSER'); +commit; diff --git a/code-teq/okafka-lab/pom.xml b/code-teq/okafka-lab/pom.xml new file mode 100644 index 000000000..b9f31a3ef --- /dev/null +++ b/code-teq/okafka-lab/pom.xml @@ -0,0 +1,40 @@ + + + 4.0.0 + + oracle-database-kafka-apis + + + + org.apache.maven.plugins + maven-compiler-plugin + + 9 + 9 + + + + + com.example.okafka + oracle-kafka-apis + Oracle Kafka API Example + 1.0.0 + + + 21 + + + + + + com.oracle.database.messaging + okafka + 23.6.1.0 + + + + + + \ No newline at end of file diff --git a/code-teq/okafka-lab/src/main/java/com/example/okafka/CreateTopic.java b/code-teq/okafka-lab/src/main/java/com/example/okafka/CreateTopic.java new file mode 100644 index 000000000..884d2ece9 --- /dev/null +++ b/code-teq/okafka-lab/src/main/java/com/example/okafka/CreateTopic.java @@ -0,0 +1,34 @@ +package com.example.okafka; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.errors.TopicExistsException; +import org.oracle.okafka.clients.admin.AdminClient; + +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +import static com.example.okafka.OKafka.TOPIC_NAME; +import static com.example.okafka.OKafkaAuthentication.getAuthenticationProperties; + +public class CreateTopic { + public static void main(String[] args) { + // Authentication properties to connect to Kafka + Properties props = getAuthenticationProperties(); + + try (Admin admin = AdminClient.create(props)) { + NewTopic testTopic = new NewTopic(TOPIC_NAME, 1, (short) 1); + admin.createTopics(List.of(testTopic)) + .all() + .get(); + System.out.println("[ADMIN] Created topic: " + testTopic.name()); + } catch (ExecutionException | InterruptedException e) { + if (e.getCause() instanceof TopicExistsException) { + System.out.println("[ADMIN] Topic already exists"); + } else { + throw new RuntimeException(e); + } + } + } +} diff --git a/code-teq/okafka-lab/src/main/java/com/example/okafka/OKafka.java b/code-teq/okafka-lab/src/main/java/com/example/okafka/OKafka.java new file mode 100644 index 000000000..8470bde6d --- /dev/null +++ b/code-teq/okafka-lab/src/main/java/com/example/okafka/OKafka.java @@ -0,0 +1,13 @@ +package com.example.okafka; + +public interface OKafka { + String TOPIC_NAME = "test_topic"; + + static String getEnv(String key, String defaultValue) { + String value = System.getenv(key); + if (value == null || value.isEmpty()) { + return defaultValue; + } + return value; + } +} diff --git a/code-teq/okafka-lab/src/main/java/com/example/okafka/OKafkaAuthentication.java b/code-teq/okafka-lab/src/main/java/com/example/okafka/OKafkaAuthentication.java new file mode 100644 index 000000000..b199da8f1 --- /dev/null +++ b/code-teq/okafka-lab/src/main/java/com/example/okafka/OKafkaAuthentication.java @@ -0,0 +1,51 @@ +package com.example.okafka; + +import java.util.Properties; + +import static com.example.okafka.OKafka.getEnv; + +public class OKafkaAuthentication { + // For this example, we'll configure our authentication parameters with environment variables. + // The security.protocol property supports PLAINTEXT (insecure) and SSL (secure) authentication. + private static final String securityProtocol = getEnv("SECURITY_PROTOCOL", "PLAINTEXT"); + + // For PLAINTEXT authentication, provide the HOSTNAME:PORT as the bootstrap.servers property. + private static final String bootstrapServers = getEnv("BOOTSTRAP_SERVERS", "localhost:9092"); + + // The TNS Admin alias / Oracle Database Service name. + private static final String tnsAdmin = getEnv("TNS_ADMIN", "freepdb1"); + + // The directory containing the database wallet. For PLAINTEXT, this directory need only + // contain an ojdbc.properties file with the "user" and "password" properties configured. + private static final String walletDir = getEnv("WALLET_DIR", "./wallet"); + + /** + * Create a Java Properties object for Oracle AI Database OKafka connection. + * Configure using the SECURITY_PROTOCOL, BOOTSTRAP_SERVERS, TNS_ADMIN, and WALLET_DIR environment variables. + * @return configured Properties object. + */ + public static Properties getAuthenticationProperties() { + // Just like kafka-clients, we can use a Java Properties object to configure connection parameters. + Properties props = new Properties(); + + // oracle.service.name is a custom property to configure the Database service name. + props.put("oracle.service.name", tnsAdmin); + // oracle.net.tns_admin is a custom property to configure the directory containing Oracle Database connection files. + // If you are using mTLS authentication, client certificates must be present in this directory. + props.put("oracle.net.tns_admin", walletDir); + // security.protocol is a standard Kafka property, set to PLAINTEXT or SSL for Oracle Database. + // (SASL is not supported with Oracle Database). + props.put("security.protocol", securityProtocol); + if (securityProtocol.equals("SSL")) { + // For SSL authentication, pass the TNS alias (such as "mydb_tp") to be used from the tnsnames.ora file + // found in the WALLET_DIR directory. + props.put("tns.alias", tnsAdmin); + } else { + // For PLAINTEXT authentication, we provide the database URL in the format + // HOSTNAME:PORT as the bootstrap.servers property. + props.put("bootstrap.servers", bootstrapServers); + } + + return props; + } +} diff --git a/code-teq/okafka-lab/src/main/java/com/example/okafka/OKafkaConsumer.java b/code-teq/okafka-lab/src/main/java/com/example/okafka/OKafkaConsumer.java new file mode 100644 index 000000000..54a23339f --- /dev/null +++ b/code-teq/okafka-lab/src/main/java/com/example/okafka/OKafkaConsumer.java @@ -0,0 +1,38 @@ +package com.example.okafka; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.oracle.okafka.clients.consumer.KafkaConsumer; + +import java.time.Duration; +import java.util.List; +import java.util.Properties; + +import static com.example.okafka.OKafka.TOPIC_NAME; +import static com.example.okafka.OKafkaAuthentication.getAuthenticationProperties; + +public class OKafkaConsumer { + public static void main(String[] args) { + Properties props = getAuthenticationProperties(); + + // Note the use of standard Kafka properties for OKafka configuration. + props.put("group.id" , "TEST_CONSUMER"); + props.put("enable.auto.commit","false"); + props.put("max.poll.records", 50); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("auto.offset.reset", "earliest"); + + Consumer consumer = new KafkaConsumer<>(props); + + consumer.subscribe(List.of(TOPIC_NAME)); + System.out.println("Subscribed to topic " + TOPIC_NAME); + while (true) { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(3)); + for (ConsumerRecord record : records) { + System.out.println("Consumed record: " + record.value()); + } + } + } +} diff --git a/code-teq/okafka-lab/src/main/java/com/example/okafka/OKafkaProducer.java b/code-teq/okafka-lab/src/main/java/com/example/okafka/OKafkaProducer.java new file mode 100644 index 000000000..eba198fba --- /dev/null +++ b/code-teq/okafka-lab/src/main/java/com/example/okafka/OKafkaProducer.java @@ -0,0 +1,46 @@ +package com.example.okafka; + +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.oracle.okafka.clients.producer.KafkaProducer; + +import java.time.Instant; +import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static com.example.okafka.OKafka.TOPIC_NAME; +import static com.example.okafka.OKafkaAuthentication.getAuthenticationProperties; + +public class OKafkaProducer { + + public static void main(String[] args) throws InterruptedException { + Properties props = getAuthenticationProperties(); + props.put("enable.idempotence", "true"); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + Producer producer = new KafkaProducer<>(props); + Runnable producerThread = () -> { + Instant now = Instant.now(); + ProducerRecord record = new ProducerRecord<>(TOPIC_NAME, "Message: " + now); + producer.send(record); + System.out.println("Producer sent message: " + record.value()); + }; + + + + int pauseMillis = 1000; + String pm = System.getenv("PAUSE_MILLIS"); + if (pm != null && !pm.isEmpty()) { + pauseMillis = Integer.parseInt(pm); + } + + try (ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + ) { + System.out.println("Starting producer"); + scheduler.scheduleAtFixedRate(producerThread, 0, pauseMillis, TimeUnit.MILLISECONDS); + } + Thread.currentThread().join(); + } +} diff --git a/code-teq/okafka-lab/wallet/ojdbc.properties b/code-teq/okafka-lab/wallet/ojdbc.properties new file mode 100644 index 000000000..b8606281c --- /dev/null +++ b/code-teq/okafka-lab/wallet/ojdbc.properties @@ -0,0 +1,2 @@ +user = testuser +password = testpwd