Skip to content

Commit e0956a3

Browse files
authoredSep 5, 2024
Add OKafka Transactional Examples and Update Kafka Client (#390)
1 parent fe29792 commit e0956a3

File tree

17 files changed

+916
-304
lines changed

17 files changed

+916
-304
lines changed
 

‎txeventq/okafka/.gitignore

-1
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,3 @@ doc/
2424
.settings/
2525

2626
ojdbc.properties
27-
TxProducer/

‎txeventq/okafka/Quickstart/README.md

+62-6
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ END;
105105
106106
## Step 4: Investigate and Try Simple Producer and Consumer
107107

108-
The repository contains 2 common OKafka application examples in `Simple` folder.
108+
The repository contains two common OKafka application examples in `Simple` folder.
109109

110110
1. The Producer `ProducerOKafka.java`
111111

@@ -197,7 +197,7 @@ You should see some output that looks very similar to this:
197197
13:33:31.862 [main] INFO org.oracle.okafka.clients.producer.KafkaProducer -- [Producer clientId=] Overriding the default retries config to the recommended value of 2147483647 since the idempotent producer is enabled.
198198
13:33:31.865 [kafka-producer-network-thread | ] DEBUG org.oracle.okafka.clients.producer.internals.SenderThread -- [Producer clientId=] Starting Kafka producer I/O thread.
199199
13:33:31.866 [kafka-producer-network-thread | ] DEBUG org.oracle.okafka.clients.producer.internals.SenderThread -- [Producer clientId=] Sender waiting for 100
200-
13:33:31.866 [main] INFO org.apache.kafka.common.utils.AppInfoParser -- Kafka version: 2.8.1
200+
13:33:31.866 [main] INFO org.apache.kafka.common.utils.AppInfoParser -- Kafka version: 3.7.1
201201
13:33:31.867 [main] INFO org.apache.kafka.common.utils.AppInfoParser -- Kafka commitId: 839b886f9b732b15
202202
13:33:31.867 [main] INFO org.apache.kafka.common.utils.AppInfoParser -- Kafka startTimeMs: 1724258011865
203203
13:33:31.867 [main] DEBUG org.oracle.okafka.clients.producer.KafkaProducer -- [Producer clientId=] Kafka producer started
@@ -229,8 +229,6 @@ Initiating close
229229
13:33:48.738 [main] INFO org.apache.kafka.common.utils.AppInfoParser -- App info kafka.producer for unregistered
230230
13:33:48.738 [main] DEBUG org.oracle.okafka.clients.producer.KafkaProducer -- [Producer clientId=] Kafka producer has been closed
231231
232-
BUILD SUCCESSFUL in 17s
233-
3 actionable tasks: 3 executed
234232
```
235233

236234
And, querying the topic `TOPIC_1` at the Database, you should see some output that looks very similar to this:
@@ -290,7 +288,7 @@ gradle :Simple:Consumer:run
290288
.....
291289
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
292290
293-
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.8.1
291+
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.1
294292
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 839b886f9b732b15
295293
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1724268189943
296294
[main] INFO org.oracle.okafka.clients.NetworkClient - [Consumer clientId=consumer-consumer_grp_1-1, groupId=consumer_grp_1] Available Nodes 1
@@ -301,10 +299,68 @@ gradle :Simple:Consumer:run
301299
[main] INFO org.oracle.okafka.clients.NetworkClient - [Consumer clientId=consumer-consumer_grp_1-1, groupId=consumer_grp_1] Reconnect successful to node 1:localhost:1521:FREEPDB1:FREE:OKAFKA_USER
302300
[main] INFO org.oracle.okafka.clients.Metadata - Cluster ID: FREE
303301
[main] INFO org.oracle.okafka.clients.NetworkClient - [Consumer clientId=consumer-consumer_grp_1-1, groupId=consumer_grp_1] Available Nodes 1
302+
No Record Fetched. Retrying in 1 second
303+
partition = 0, offset = 0, key = Just some key for OKafka0, value =0This is test with 128 characters Payload used to test Oracle Kafka. Read https://github.com/oracle/okafka/blob/master/README.md
304+
partition = 0, offset = 1, key = Just some key for OKafka1, value =1This is test with 128 characters Payload used to test Oracle Kafka. Read https://github.com/oracle/okafka/blob/master/README.md
305+
partition = 0, offset = 2, key = Just some key for OKafka2, value =2This is test with 128 characters Payload used to test Oracle Kafka. Read https://github.com/oracle/okafka/blob/master/README.md
306+
partition = 0, offset = 3, key = Just some key for OKafka3, value =3This is test with 128 characters Payload used to test Oracle Kafka. Read https://github.com/oracle/okafka/blob/master/README.md
307+
partition = 0, offset = 4, key = Just some key for OKafka4, value =4This is test with 128 characters Payload used to test Oracle Kafka. Read https://github.com/oracle/okafka/blob/master/README.md
308+
partition = 0, offset = 5, key = Just some key for OKafka5, value =5This is test with 128 characters Payload used to test Oracle Kafka. Read https://github.com/oracle/okafka/blob/master/README.md
309+
partition = 0, offset = 6, key = Just some key for OKafka6, value =6This is test with 128 characters Payload used to test Oracle Kafka. Read https://github.com/oracle/okafka/blob/master/README.md
310+
partition = 0, offset = 7, key = Just some key for OKafka7, value =7This is test with 128 characters Payload used to test Oracle Kafka. Read https://github.com/oracle/okafka/blob/master/README.md
311+
partition = 0, offset = 8, key = Just some key for OKafka8, value =8This is test with 128 characters Payload used to test Oracle Kafka. Read https://github.com/oracle/okafka/blob/master/README.md
312+
partition = 0, offset = 9, key = Just some key for OKafka9, value =9This is test with 128 characters Payload used to test Oracle Kafka. Read https://github.com/oracle/okafka/blob/master/README.md
313+
Committing records10
314+
No Record Fetched. Retrying in 1 second
315+
```
304316

305-
.....
317+
## Step 5: Investigate and Try Administration API
318+
319+
With Administration API it is possible create and delete Topics.
320+
321+
### Task 1: Try the Producer
322+
323+
Let’s build and run the Admin Example Class. Use your IDE or open a command line (or terminal) and navigate to the folder
324+
where you have the project files `<Quickstart Directory>/`. We can build and run the application by issuing the following command:
325+
326+
```cmd
327+
gradle Simple:Admin:run
328+
Usage: java OKafkaAdminTopic [CREATE|DELETE] topic1 ... topicN
329+
```
330+
331+
This command requires at least two parameters. The first is specify if you wants to create or delete the topics informed
332+
in sequence. For example:
333+
334+
```shell
335+
gradle Simple:Admin:run --args="CREATE TOPIC_ADMIN_2 TOPIC_ADMIN_3"
336+
```
337+
338+
As a result you will see the two new topics created.
339+
340+
```sql
341+
SQL> select name, queue_table, dequeue_enabled,enqueue_enabled, sharded, queue_category, recipients
342+
2 from all_queues
343+
3 where OWNER='OKAFKA_USER'
344+
4* and QUEUE_TYPE<>'EXCEPTION_QUEUE';
345+
346+
NAME QUEUE_TABLE DEQUEUE_ENABLED ENQUEUE_ENABLED SHARDED QUEUE_CATEGORY RECIPIENTS
347+
________________ ________________ __________________ __________________ __________ ____________________________ _____________
348+
......
349+
TOPIC_ADMIN_2 TOPIC_ADMIN_2 YES YES TRUE Sharded Queue MULTIPLE
350+
TOPIC_ADMIN_3 TOPIC_ADMIN_3 YES YES TRUE Sharded Queue MULTIPLE
306351
```
307352

353+
354+
## Transaction in OKafka Examples
355+
356+
Kafka Client for Oracle Transactional Event Queues allow developers use the transaction API effectively.
357+
358+
Transactions allow for atomic writes across multiple TxEventQ topics and partitions, ensuring that either all messages
359+
within the transaction are successfully written, or none are. For instance, if an error occurs during processing, the
360+
transaction may be aborted, preventing any of the messages from being committed to the topic or accessed by consumers.
361+
362+
You can now build and run the [Transactional Examples](./Transactional/TRANSACTIONAL_EXAMPLES.MD).
363+
308364
## Want to Learn More?
309365

310366
- [Kafka APIs for Oracle Transactional Event Queues](https://docs.oracle.com/en/database/oracle/oracle-database/19/adque/)

‎txeventq/okafka/Quickstart/Simple/Admin/src/main/java/org/oracle/okafka/examples/OKafkaDeleteTopic.java

-58
This file was deleted.

‎txeventq/okafka/Quickstart/Simple/Admin/src/main/resources/config.properties

-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ bootstrap.servers=<server address:server port>
66
oracle.service.name=<oracle database service>
77
oracle.net.tns_admin=<location of ojdbc.properties file>
88

9-
109
#Option 2: Connect to Oracle Database deployed in Oracle Autonomous Cloud using Wallet
1110
security.protocol=PLAINTEXT
1211
#oracle.net.tns_admin=<location of Oracle Wallet, tnanames.ora and ojdbc.properties file>

‎txeventq/okafka/Quickstart/Simple/Consumer/src/main/resources/config.properties

+4-2
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,16 @@ oracle.net.tns_admin=<location of ojdbc.properties file>
1212
#tns.alias=<tns alias>
1313

1414
# Application specific OKafka consumer properties
15-
topic.name=<Oracle Database TxEventQ Topic [topic_1]>
15+
topic.name=<Oracle Database TxEventQ Topic [TOPIC_1]>
1616
group.id=<Oracle Database TxEventQ Subscriber [consumer_grp_1]>
1717

18-
1918
enable.auto.commit=true
2019
max.poll.records=1000
2120
default.api.timeout.ms=180000
2221

22+
# Start consuming from the beginning (Default = latest);
23+
auto.offset.reset=earliest
24+
2325
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
2426
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
2527

‎txeventq/okafka/Quickstart/Simple/Producer/src/main/resources/config.properties

+2-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ oracle.net.tns_admin=<location of ojdbc.properties file>
1212
#tns.alias=<tns alias>
1313

1414
#Appliction specific OKafka Producer properties
15-
topic.name=<Oracle Database TxEventQ Topic [topic_1]>
15+
allow.auto.create.topics=FALSE
16+
topic.name=<Oracle Database TxEventQ Topic [TOPIC_1]>
1617

1718
batch.size=200
1819
linger.ms=100

‎txeventq/okafka/Quickstart/Transactional/Consumer/src/main/java/org/oracle/okafka/examples/TransactionalConsumerOKafka.java

+71-53
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77

88
package org.oracle.okafka.examples;
99

10+
import java.io.FileNotFoundException;
11+
import java.io.IOException;
12+
import java.io.InputStream;
1013
import java.util.Properties;
1114
import java.sql.Connection;
1215
import java.time.Duration;
@@ -24,68 +27,32 @@
2427

2528
import org.oracle.okafka.clients.consumer.KafkaConsumer;
2629

27-
2830
public class TransactionalConsumerOKafka {
2931

30-
// Dummy implementation of ConsumerRebalanceListener interface
31-
// It only maintains the list of assigned partitions in assignedPartitions list
32-
static class ConsumerRebalance implements ConsumerRebalanceListener {
33-
34-
public List<TopicPartition> assignedPartitions = new ArrayList();
32+
public static void main(String[] args) {
33+
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG");
3534

36-
@Override
37-
public synchronized void onPartitionsAssigned(Collection<TopicPartition> partitions) {
38-
System.out.println("Newly Assigned Partitions:");
39-
for (TopicPartition tp :partitions ) {
40-
System.out.println(tp);
41-
assignedPartitions.add(tp);
35+
// Get application properties
36+
Properties appProperties = null;
37+
try {
38+
appProperties = getProperties();
39+
if (appProperties == null) {
40+
System.out.println("Application properties not found!");
41+
System.exit(-1);
4242
}
43+
} catch (Exception e) {
44+
System.out.println("Application properties not found!");
45+
System.out.println("Exception: " + e);
46+
System.exit(-1);
4347
}
4448

45-
@Override
46-
public synchronized void onPartitionsRevoked(Collection<TopicPartition> partitions) {
47-
System.out.println("Revoked previously assigned partitions. ");
48-
for (TopicPartition tp :assignedPartitions ) {
49-
System.out.println(tp);
50-
}
51-
assignedPartitions.clear();
52-
}
53-
}
49+
String topicName = appProperties.getProperty("topic.name", "TXEQ");
50+
appProperties.remove("topic.name"); // Pass props to build OKafkaProducer
5451

55-
public static void main(String[] args) {
56-
Properties props = new Properties();
57-
58-
// Option 1: Connect to Oracle Database with database username and password
59-
props.put("security.protocol","PLAINTEXT");
60-
//IP or Host name where Oracle Database 23ai is running and Database Listener's Port
61-
props.put("bootstrap.servers", "localhost:1521");
62-
props.put("oracle.service.name", "freepdb1"); //name of the service running on the database instance
63-
// location for ojdbc.properties file where user and password properties are saved
64-
props.put("oracle.net.tns_admin",".");
65-
66-
/*
67-
//Option 2: Connect to Oracle Autonomous Database using Oracle Wallet
68-
//This option to be used when connecting to Oracle autonomous database instance on OracleCloud
69-
props.put("security.protocol","SSL");
70-
// location for Oracle Wallet, tnsnames.ora file and ojdbc.properties file
71-
props.put("oracle.net.tns_admin",".");
72-
props.put("tns.alias","Oracle23ai_high");
73-
*/
74-
75-
//Consumer Group Name
76-
props.put("group.id" , "CG1");
77-
props.put("enable.auto.commit","false");
78-
79-
// Maximum number of records fetched in single poll call
80-
props.put("max.poll.records", 10);
81-
82-
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
83-
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
84-
85-
Consumer<String , String> consumer = new KafkaConsumer<String, String>(props);
52+
Consumer<String , String> consumer = new KafkaConsumer<String, String>(appProperties);
8653
ConsumerRebalanceListener rebalanceListener = new ConsumerRebalance();
8754

88-
consumer.subscribe(Arrays.asList("TXEQ"), rebalanceListener);
55+
consumer.subscribe(Arrays.asList(topicName), rebalanceListener);
8956

9057
int expectedMsgCnt = 100;
9158
int msgCnt = 0;
@@ -152,4 +119,55 @@ private static void processRecord(Connection conn, ConsumerRecord<String, String
152119
{
153120
//Application specific logic to process the message
154121
}
122+
123+
124+
private static java.util.Properties getProperties() throws IOException {
125+
InputStream inputStream = null;
126+
Properties appProperties;
127+
128+
try {
129+
Properties prop = new Properties();
130+
String propFileName = "config.properties";
131+
inputStream = TransactionalConsumerOKafka.class.getClassLoader().getResourceAsStream(propFileName);
132+
if (inputStream != null) {
133+
prop.load(inputStream);
134+
} else {
135+
throw new FileNotFoundException("property file '" + propFileName + "' not found.");
136+
}
137+
appProperties = prop;
138+
139+
} catch (Exception e) {
140+
System.out.println("Exception: " + e);
141+
throw e;
142+
} finally {
143+
if (inputStream != null)
144+
inputStream.close();
145+
}
146+
return appProperties;
147+
}
148+
149+
// Dummy implementation of ConsumerRebalanceListener interface
150+
// It only maintains the list of assigned partitions in assignedPartitions list
151+
static class ConsumerRebalance implements ConsumerRebalanceListener {
152+
153+
public List<TopicPartition> assignedPartitions = new ArrayList();
154+
155+
@Override
156+
public synchronized void onPartitionsAssigned(Collection<TopicPartition> partitions) {
157+
System.out.println("Newly Assigned Partitions:");
158+
for (TopicPartition tp :partitions ) {
159+
System.out.println(tp);
160+
assignedPartitions.add(tp);
161+
}
162+
}
163+
164+
@Override
165+
public synchronized void onPartitionsRevoked(Collection<TopicPartition> partitions) {
166+
System.out.println("Revoked previously assigned partitions. ");
167+
for (TopicPartition tp :assignedPartitions ) {
168+
System.out.println(tp);
169+
}
170+
assignedPartitions.clear();
171+
}
172+
}
155173
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# OKafka Producer example properties
2+
3+
#Properties to connect to Oracle Database
4+
#Option 1: Connect to Oracle database using plaintext
5+
6+
bootstrap.servers=<server address:server port>
7+
oracle.service.name=<oracle database service>
8+
oracle.net.tns_admin=<location of ojdbc.properties file>
9+
10+
11+
#Option 2: Connect to Oracle Database deployed in Oracle Autonomous Cloud using Wallet
12+
#security.protocol=SSL
13+
security.protocol=PLAINTEXT
14+
15+
#oracle.net.tns_admin=<location of Oracle Wallet, tnanames.ora and ojdbc.properties file>
16+
#tns.alias=<tns alias>
17+
18+
# Application specific OKafka consumer properties
19+
topic.name=<Oracle Database TxEventQ Topic [TOPIC_1]>
20+
group.id=<Oracle Database TxEventQ Subscriber [consumer_grp_1]>
21+
22+
enable.auto.commit=false
23+
24+
# Start consuming from the beginning (Default = latest);
25+
auto.offset.reset=earliest
26+
27+
# Maximum number of records fetched in single poll call
28+
max.poll.records=10
29+
30+
default.api.timeout.ms=180000
31+
32+
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
33+
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
34+

0 commit comments

Comments
 (0)
Failed to load comments.