Skip to content

Commit 9679e6b

Browse files
authoredAug 21, 2024
Add Kafka Client for TxEventQ Examples (Quickstart Guide). (#383)
* WIP First commit okafka examples Signed-off-by: Paulo Alberto Simoes <paulo.simoes@oracle.com> * WIP Okafka Examples - Transactional Signed-off-by: Paulo Alberto Simoes <paulo.simoes@oracle.com> * WIP OKafka Examples Signed-off-by: Paulo Alberto Simoes <paulo.simoes@oracle.com> * WIP OKafka Examples Admin Signed-off-by: Paulo Alberto Simoes <paulo.simoes@oracle.com> * Create Quickstart Guide - Examples Signed-off-by: Paulo Alberto Simoes <paulo.simoes@oracle.com> * WIP Examples Documentation (quickstart guide) Signed-off-by: Paulo Alberto Simoes <paulo.simoes@oracle.com> * Release of README/Guidance Signed-off-by: Paulo Alberto Simoes <paulo.simoes@oracle.com> * Cleanup code dir. Signed-off-by: Paulo Alberto Simoes <paulo.simoes@oracle.com> --------- Signed-off-by: Paulo Alberto Simoes <paulo.simoes@oracle.com>
1 parent 6fa8756 commit 9679e6b

File tree

19 files changed

+2925
-0
lines changed

19 files changed

+2925
-0
lines changed
 

‎txeventq/okafka/.gitignore

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
.idea
2+
.vscode
3+
.gradle
4+
5+
# Maven assets
6+
.mvn
7+
mvnw
8+
mvnw.cmd
9+
10+
.java-version
11+
12+
target/
13+
build/
14+
bin/
15+
16+
.dccache
17+
.DS_Store
18+
19+
doc/
20+
/.metadata/
21+
22+
.classpath
23+
.project
24+
.settings/
25+
26+
ojdbc.properties
27+
TxProducer/

‎txeventq/okafka/Quickstart/LICENSE.txt

+876
Large diffs are not rendered by default.

‎txeventq/okafka/Quickstart/README.md

+305
Large diffs are not rendered by default.
+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# Reporting security vulnerabilities
2+
3+
Oracle values the independent security research community and believes that
4+
responsible disclosure of security vulnerabilities helps us ensure the security
5+
and privacy of all our users.
6+
7+
Please do NOT raise a GitHub Issue to report a security vulnerability. If you
8+
believe you have found a security vulnerability, please submit a report to
9+
[secalert_us@oracle.com][1] preferably with a proof of concept. Please review
10+
some additional information on [how to report security vulnerabilities to Oracle][2].
11+
We encourage people who contact Oracle Security to use email encryption using
12+
[our encryption key][3].
13+
14+
We ask that you do not use other channels or contact the project maintainers
15+
directly.
16+
17+
Non-vulnerability related security issues including ideas for new or improved
18+
security features are welcome on GitHub Issues.
19+
20+
## Security updates, alerts and bulletins
21+
22+
Security updates will be released on a regular cadence. Many of our projects
23+
will typically release security fixes in conjunction with the
24+
Oracle Critical Patch Update program. Additional
25+
information, including past advisories, is available on our [security alerts][4]
26+
page.
27+
28+
## Security-related information
29+
30+
We will provide security related information such as a threat model, considerations
31+
for secure use, or any known security issues in our documentation. Please note
32+
that labs and sample code are intended to demonstrate a concept and may not be
33+
sufficiently hardened for production use.
34+
35+
[1]: mailto:secalert_us@oracle.com
36+
[2]: https://www.oracle.com/corporate/security-practices/assurance/vulnerability/reporting.html
37+
[3]: https://www.oracle.com/security-alerts/encryptionkey.html
38+
[4]: https://www.oracle.com/security-alerts/
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
** OKafka Java Client version 23.4.
3+
**
4+
** Copyright (c) 2019, 2024 Oracle and/or its affiliates.
5+
** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
6+
*/
7+
8+
package org.oracle.okafka.examples;
9+
10+
import java.io.FileNotFoundException;
11+
import java.io.IOException;
12+
import java.io.InputStream;
13+
import java.util.*;
14+
import java.util.concurrent.ExecutionException;
15+
16+
import org.apache.kafka.clients.admin.Admin;
17+
import org.apache.kafka.clients.admin.CreateTopicsResult;
18+
import org.apache.kafka.clients.admin.DeleteTopicsResult;
19+
import org.apache.kafka.clients.admin.NewTopic;
20+
import org.apache.kafka.common.KafkaFuture;
21+
22+
import org.oracle.okafka.clients.admin.AdminClient;
23+
24+
public class OKafkaAdminTopic {
25+
26+
public static void main(String[] args) {
27+
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "INFO");
28+
29+
// Get application properties
30+
Properties appProperties = null;
31+
try {
32+
appProperties = getProperties();
33+
if (appProperties == null) {
34+
System.out.println("Application properties not found!");
35+
System.exit(-1);
36+
}
37+
} catch (Exception e) {
38+
System.out.println("Application properties not found!");
39+
System.out.println("Exception: " + e);
40+
System.exit(-1);
41+
}
42+
43+
if (args.length < 2) {
44+
System.out.println("Usage: java OKafkaAdminTopic [CREATE|DELETE] topic1 ... topicN");
45+
return;
46+
}
47+
48+
ArrayList<String> topicsName = new ArrayList<>();
49+
for (int i = 1; i < args.length; i++) {
50+
topicsName.add(args[i]);
51+
}
52+
53+
String operation = args[0].toUpperCase();
54+
switch (operation) {
55+
case "CREATE":
56+
createTopic(appProperties, topicsName);
57+
break;
58+
59+
case "DELETE":
60+
deleteTopic(appProperties, topicsName);
61+
break;
62+
63+
default:
64+
System.out.println("Error: Invalid operation.");
65+
}
66+
67+
}
68+
69+
private static void createTopic(Properties appProperties, ArrayList<String> topicsName) {
70+
try (Admin admin = AdminClient.create(appProperties)) {
71+
//Create Topic named TXEQ_1 and TXEQ_2 with 10 Partitions.
72+
73+
ArrayList<NewTopic> topics = new ArrayList<>();
74+
75+
for (String topicName : topicsName) {
76+
NewTopic nt = new NewTopic(topicName, 10, (short)0);
77+
topics.add(nt);
78+
}
79+
80+
CreateTopicsResult result = admin.createTopics(topics);
81+
try {
82+
KafkaFuture<Void> ftr = result.all();
83+
ftr.get();
84+
} catch ( InterruptedException | ExecutionException e ) {
85+
throw new IllegalStateException(e);
86+
}
87+
88+
System.out.println("Topic(s) created. Closing OKafka admin now");
89+
}
90+
catch(Exception e)
91+
{
92+
System.out.println("Exception while creating topic " + e);
93+
e.printStackTrace();
94+
}
95+
}
96+
97+
private static void deleteTopic(Properties appProperties, ArrayList<String> topicsName) {
98+
try (Admin admin = AdminClient.create(appProperties)) {
99+
DeleteTopicsResult delResult = admin.deleteTopics(topicsName);
100+
Thread.sleep(5000);
101+
System.out.println("Closing OKafka admin now");
102+
}
103+
catch(Exception e)
104+
{
105+
System.out.println("Exception while deleting topic " + e);
106+
e.printStackTrace();
107+
}
108+
}
109+
110+
private static Properties getProperties() throws IOException {
111+
InputStream inputStream = null;
112+
Properties appProperties = null;
113+
114+
try {
115+
Properties prop = new Properties();
116+
String propFileName = "config.properties";
117+
inputStream = OKafkaAdminTopic.class.getClassLoader().getResourceAsStream(propFileName);
118+
if (inputStream != null) {
119+
prop.load(inputStream);
120+
} else {
121+
throw new FileNotFoundException("property file '" + propFileName + "' not found.");
122+
}
123+
appProperties = prop;
124+
125+
} catch (Exception e) {
126+
System.out.println("Exception: " + e);
127+
throw e;
128+
} finally {
129+
inputStream.close();
130+
}
131+
return appProperties;
132+
}
133+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
** OKafka Java Client version 23.4.
3+
**
4+
** Copyright (c) 2019, 2024 Oracle and/or its affiliates.
5+
** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
6+
*/
7+
8+
package org.oracle.okafka.examples;
9+
10+
import java.util.Collections;
11+
import java.util.Properties;
12+
import java.util.concurrent.ExecutionException;
13+
14+
import org.apache.kafka.clients.admin.Admin;
15+
import org.apache.kafka.clients.admin.CreateTopicsResult;
16+
import org.apache.kafka.clients.admin.NewTopic;
17+
import org.apache.kafka.common.KafkaFuture;
18+
import org.oracle.okafka.clients.admin.AdminClient;
19+
import org.oracle.okafka.clients.admin.DeleteTopicsResult;
20+
import org.oracle.okafka.clients.admin.KafkaAdminClient;
21+
22+
public class OKafkaDeleteTopic {
23+
24+
public static void main(String[] args) {
25+
Properties props = new Properties();
26+
//IP or Host name where Oracle Database 23c is running and Database Listener's Port
27+
props.put("bootstrap.servers", "localhost:1521");
28+
//name of the service running on the database instance
29+
props.put("oracle.service.name", "FREEPDB1");
30+
props.put("security.protocol","PLAINTEXT");
31+
// location for ojdbc.properties file where user and password properties are saved
32+
props.put("oracle.net.tns_admin",".");
33+
34+
/*
35+
//Option 2: Connect to Oracle Autonomous Database using Oracle Wallet
36+
//This option to be used when connecting to Oracle autonomous database instance on OracleCloud
37+
props.put("security.protocol","SSL");
38+
// location for Oracle Wallet, tnsnames.ora file and ojdbc.properties file
39+
props.put("oracle.net.tns_admin",".");
40+
props.put("tns.alias","Oracle23ai_high");
41+
*/
42+
try (Admin admin = AdminClient.create(props)) {
43+
44+
org.apache.kafka.clients.admin.DeleteTopicsResult delResult = admin.deleteTopics(Collections.singletonList("TXEQ"));
45+
46+
//DeleteTopicsResult delResult = kAdminClient.deleteTopics(Collections.singletonList("TEQ2"), new org.oracle.okafka.clients.admin.DeleteTopicsOptions());
47+
48+
Thread.sleep(5000);
49+
System.out.println("Auto Clsoing admin now");
50+
}
51+
catch(Exception e)
52+
{
53+
System.out.println("Exception while creating topic " + e);
54+
e.printStackTrace();
55+
}
56+
}
57+
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# OKafka Consumer example properties
2+
3+
#Properties to connect to Oracle Database
4+
#Option 1: Connect to Oracle database using plaintext
5+
bootstrap.servers=<server address:server port>
6+
oracle.service.name=<oracle database service>
7+
oracle.net.tns_admin=<location of ojdbc.properties file>
8+
9+
10+
#Option 2: Connect to Oracle Database deployed in Oracle Autonomous Cloud using Wallet
11+
security.protocol=PLAINTEXT
12+
#oracle.net.tns_admin=<location of Oracle Wallet, tnanames.ora and ojdbc.properties file>
13+
#tns.alias=<tns alias>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
** OKafka Java Client version 23.4.
3+
**
4+
** Copyright (c) 2019, 2024 Oracle and/or its affiliates.
5+
** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
6+
*/
7+
8+
package org.oracle.okafka.examples;
9+
10+
import java.io.FileNotFoundException;
11+
import java.io.IOException;
12+
import java.io.InputStream;
13+
import java.util.Properties;
14+
import java.time.Duration;
15+
import java.util.Arrays;
16+
17+
import org.oracle.okafka.clients.consumer.KafkaConsumer;
18+
19+
import org.apache.kafka.clients.consumer.ConsumerRecords;
20+
import org.apache.kafka.clients.consumer.ConsumerRecord;
21+
import org.slf4j.Logger;
22+
23+
public class ConsumerOKafka {
24+
public static void main(String[] args) {
25+
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "INFO");
26+
27+
// Get application properties
28+
Properties appProperties = null;
29+
try {
30+
appProperties = getProperties();
31+
if (appProperties == null) {
32+
System.out.println("Application properties not found!");
33+
System.exit(-1);
34+
}
35+
} catch (Exception e) {
36+
System.out.println("Application properties not found!");
37+
System.out.println("Exception: " + e);
38+
System.exit(-1);
39+
}
40+
41+
String topic = appProperties.getProperty("topic.name", "TXEQ");
42+
appProperties.remove("topic.name"); // Pass props to build OKafkaProducer
43+
44+
KafkaConsumer<String , String> consumer = new KafkaConsumer<>(appProperties);
45+
consumer.subscribe(Arrays.asList(topic));
46+
47+
48+
try {
49+
while(true) {
50+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
51+
52+
for (ConsumerRecord<String, String> record : records)
53+
System.out.printf("partition = %d, offset = %d, key = %s, value =%s\n ", record.partition(), record.offset(), record.key(), record.value());
54+
55+
if (records != null && records.count() > 0) {
56+
System.out.println("Committing records" + records.count());
57+
consumer.commitSync();
58+
} else {
59+
System.out.println("No Record Fetched. Retrying in 1 second");
60+
Thread.sleep(1000);
61+
}
62+
}
63+
}catch(Exception e)
64+
{
65+
System.out.println("Exception from consumer " + e);
66+
e.printStackTrace();
67+
}
68+
finally {
69+
consumer.close();
70+
}
71+
72+
}
73+
74+
private static java.util.Properties getProperties() throws IOException {
75+
InputStream inputStream = null;
76+
Properties appProperties = null;
77+
78+
try {
79+
Properties prop = new Properties();
80+
String propFileName = "config.properties";
81+
inputStream = ConsumerOKafka.class.getClassLoader().getResourceAsStream(propFileName);
82+
if (inputStream != null) {
83+
prop.load(inputStream);
84+
} else {
85+
throw new FileNotFoundException("property file '" + propFileName + "' not found.");
86+
}
87+
appProperties = prop;
88+
89+
} catch (Exception e) {
90+
System.out.println("Exception: " + e);
91+
throw e;
92+
} finally {
93+
inputStream.close();
94+
}
95+
return appProperties;
96+
}
97+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# OKafka Consumer example properties
2+
3+
#Properties to connect to Oracle Database
4+
#Option 1: Connect to Oracle database using plaintext
5+
bootstrap.servers=<server address:server port>
6+
oracle.service.name=<oracle database service>
7+
oracle.net.tns_admin=<location of ojdbc.properties file>
8+
9+
#Option 2: Connect to Oracle Database deployed in Oracle Autonomous Cloud using Wallet
10+
#security.protocol=SSL
11+
#oracle.net.tns_admin=<location of Oracle Wallet, tnanames.ora and ojdbc.properties file>
12+
#tns.alias=<tns alias>
13+
14+
# Application specific OKafka consumer properties
15+
topic.name=<Oracle Database TxEventQ Topic [topic_1]>
16+
group.id=<Oracle Database TxEventQ Subscriber [consumer_grp_1]>
17+
18+
19+
enable.auto.commit=true
20+
max.poll.records=1000
21+
default.api.timeout.ms=180000
22+
23+
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
24+
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
25+

0 commit comments

Comments
 (0)
Failed to load comments.