Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions code-teq/okafka-lab/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
services:
okafkadb:
image: gvenzl/oracle-free:23.9-slim-faststart
container_name: okafkadb
ports:
- 9092:1521
environment:
- ORACLE_PASSWORD=Welcome12345
volumes:
- ./oraclefree:/container-entrypoint-initdb.d
healthcheck:
test: ["CMD-SHELL", "lsnrctl status | grep READY"]
interval: 15s
timeout: 10s
retries: 5
start_period: 30s
21 changes: 21 additions & 0 deletions code-teq/okafka-lab/oraclefree/grant_permissions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- Set as appropriate for your database.
alter session set container = freepdb1;

-- user for okafka
create user TESTUSER identified by testpwd;
grant create session to TESTUSER;
grant unlimited tablespace to TESTUSER;
grant connect, resource to TESTUSER;

-- okafka permissions
grant execute on dbms_aq to TESTUSER;
grant execute on dbms_aqadm to TESTUSER;
grant select on gv_$session to TESTUSER;
grant select on v_$session to TESTUSER;
grant select on gv_$instance to TESTUSER;
grant select on gv_$listener_network to TESTUSER;
grant select on SYS.DBA_RSRC_PLAN_DIRECTIVES to TESTUSER;
grant select on gv_$pdbs to TESTUSER;
grant select on user_queue_partition_assignment_table to TESTUSER;
exec dbms_aqadm.GRANT_PRIV_FOR_RM_PLAN('TESTUSER');
commit;
40 changes: 40 additions & 0 deletions code-teq/okafka-lab/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<artifactId>oracle-database-kafka-apis</artifactId>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>9</source>
<target>9</target>
</configuration>
</plugin>
</plugins>
</build>
<groupId>com.example.okafka</groupId>
<name>oracle-kafka-apis</name>
<description>Oracle Kafka API Example</description>
<version>1.0.0</version>

<properties>
<java.version>21</java.version>
</properties>

<dependencies>
<!-- OKafka All-in-one -->
<dependency>
<groupId>com.oracle.database.messaging</groupId>
<artifactId>okafka</artifactId>
<version>23.6.1.0</version>
</dependency>

</dependencies>


</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.example.okafka;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.errors.TopicExistsException;
import org.oracle.okafka.clients.admin.AdminClient;

import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import static com.example.okafka.OKafka.TOPIC_NAME;
import static com.example.okafka.OKafkaAuthentication.getAuthenticationProperties;

public class CreateTopic {
public static void main(String[] args) {
// Authentication properties to connect to Kafka
Properties props = getAuthenticationProperties();

try (Admin admin = AdminClient.create(props)) {
NewTopic testTopic = new NewTopic(TOPIC_NAME, 1, (short) 1);
admin.createTopics(List.of(testTopic))
.all()
.get();
System.out.println("[ADMIN] Created topic: " + testTopic.name());
} catch (ExecutionException | InterruptedException e) {
if (e.getCause() instanceof TopicExistsException) {
System.out.println("[ADMIN] Topic already exists");
} else {
throw new RuntimeException(e);
}
}
}
}
13 changes: 13 additions & 0 deletions code-teq/okafka-lab/src/main/java/com/example/okafka/OKafka.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.example.okafka;

public interface OKafka {
String TOPIC_NAME = "test_topic";

static String getEnv(String key, String defaultValue) {
String value = System.getenv(key);
if (value == null || value.isEmpty()) {
return defaultValue;
}
return value;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.example.okafka;

import java.util.Properties;

import static com.example.okafka.OKafka.getEnv;

public class OKafkaAuthentication {
// For this example, we'll configure our authentication parameters with environment variables.
// The security.protocol property supports PLAINTEXT (insecure) and SSL (secure) authentication.
private static final String securityProtocol = getEnv("SECURITY_PROTOCOL", "PLAINTEXT");

// For PLAINTEXT authentication, provide the HOSTNAME:PORT as the bootstrap.servers property.
private static final String bootstrapServers = getEnv("BOOTSTRAP_SERVERS", "localhost:9092");

// The TNS Admin alias / Oracle Database Service name.
private static final String tnsAdmin = getEnv("TNS_ADMIN", "freepdb1");

// The directory containing the database wallet. For PLAINTEXT, this directory need only
// contain an ojdbc.properties file with the "user" and "password" properties configured.
private static final String walletDir = getEnv("WALLET_DIR", "./wallet");

/**
* Create a Java Properties object for Oracle AI Database OKafka connection.
* Configure using the SECURITY_PROTOCOL, BOOTSTRAP_SERVERS, TNS_ADMIN, and WALLET_DIR environment variables.
* @return configured Properties object.
*/
public static Properties getAuthenticationProperties() {
// Just like kafka-clients, we can use a Java Properties object to configure connection parameters.
Properties props = new Properties();

// oracle.service.name is a custom property to configure the Database service name.
props.put("oracle.service.name", tnsAdmin);
// oracle.net.tns_admin is a custom property to configure the directory containing Oracle Database connection files.
// If you are using mTLS authentication, client certificates must be present in this directory.
props.put("oracle.net.tns_admin", walletDir);
// security.protocol is a standard Kafka property, set to PLAINTEXT or SSL for Oracle Database.
// (SASL is not supported with Oracle Database).
props.put("security.protocol", securityProtocol);
if (securityProtocol.equals("SSL")) {
// For SSL authentication, pass the TNS alias (such as "mydb_tp") to be used from the tnsnames.ora file
// found in the WALLET_DIR directory.
props.put("tns.alias", tnsAdmin);
} else {
// For PLAINTEXT authentication, we provide the database URL in the format
// HOSTNAME:PORT as the bootstrap.servers property.
props.put("bootstrap.servers", bootstrapServers);
}

return props;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.example.okafka;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.oracle.okafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.List;
import java.util.Properties;

import static com.example.okafka.OKafka.TOPIC_NAME;
import static com.example.okafka.OKafkaAuthentication.getAuthenticationProperties;

public class OKafkaConsumer {
public static void main(String[] args) {
Properties props = getAuthenticationProperties();

// Note the use of standard Kafka properties for OKafka configuration.
props.put("group.id" , "TEST_CONSUMER");
props.put("enable.auto.commit","false");
props.put("max.poll.records", 50);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");

Consumer<String, String> consumer = new KafkaConsumer<>(props);

consumer.subscribe(List.of(TOPIC_NAME));
System.out.println("Subscribed to topic " + TOPIC_NAME);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(3));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Consumed record: " + record.value());
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.example.okafka;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.oracle.okafka.clients.producer.KafkaProducer;

import java.time.Instant;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static com.example.okafka.OKafka.TOPIC_NAME;
import static com.example.okafka.OKafkaAuthentication.getAuthenticationProperties;

public class OKafkaProducer {

public static void main(String[] args) throws InterruptedException {
Properties props = getAuthenticationProperties();
props.put("enable.idempotence", "true");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
Runnable producerThread = () -> {
Instant now = Instant.now();
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "Message: " + now);
producer.send(record);
System.out.println("Producer sent message: " + record.value());
};



int pauseMillis = 1000;
String pm = System.getenv("PAUSE_MILLIS");
if (pm != null && !pm.isEmpty()) {
pauseMillis = Integer.parseInt(pm);
}

try (ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
) {
System.out.println("Starting producer");
scheduler.scheduleAtFixedRate(producerThread, 0, pauseMillis, TimeUnit.MILLISECONDS);
}
Thread.currentThread().join();
}
}
2 changes: 2 additions & 0 deletions code-teq/okafka-lab/wallet/ojdbc.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
user = testuser
password = testpwd