Skip to content

Commit

Permalink
Merge pull request #39 from linny0608/updateReadmeAndVersion
Browse files Browse the repository at this point in the history
Update readme and version
  • Loading branch information
ichokshi2109 committed Jun 25, 2024
2 parents 337a351 + ac19698 commit 51df224
Show file tree
Hide file tree
Showing 8 changed files with 253 additions and 22 deletions.
47 changes: 38 additions & 9 deletions connectors/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ exec sys.dbms_aqadm.start_queue('TxEventQ');
exec sys.dbms_aqadm.add_subscriber('TxEventQ', SYS.AQ$_AGENT('SUB1', NULL, 0));
```

If using the Source Connector and ordering of the events are important then the Transactional Event Queue that the Kakfa Source connector will be pulling from should have
the `STICKY_DEQUEUE` parameter set. The `tasks.max` property should also correspond to the number of `SHARD_NUM` assigned to the queue.
**If using the Source Connector and ordering of the events are important then the Transactional Event Queue that the Kakfa Source connector will be pulling from should have the `STICKY_DEQUEUE` parameter set. The `SHARD_NUM` assigned to the queue should be less than or equal to the number of Kafka partitions assigned to the Kafka topic.**

**Note: If running on a database version less than 23.4 with `STICKY_DEQUEUE` the `tasks.max` value must be equal to the `SHARD_NUM` specified. If the `tasks.max` is not equal to the `SHARD_NUM` dequeue from all event streams will not be performed.**

```roomsql
exec sys.dbms_aqadm.create_sharded_queue(queue_name=>"TxEventQ", multiple_consumers => TRUE);
Expand Down Expand Up @@ -180,9 +181,10 @@ Here is the full `connect-txeventq-source.properties` file below.
name=TxEventQ-source
connector.class=oracle.jdbc.txeventq.kafka.connect.source.TxEventQSourceConnector
# If using event streams and ordering of the events is important the number of tasks
# set for this property should be same number of event streams used for the transactional
# event queue.
# If the transactional event queue has STICKY_DEQUEUE set and running on a database version less than 23.4
# the tasks.max number specified must be equal to the number of event streams (SHARD_NUM) for the queue.
# If the `tasks.max` is not equal to the event streams (SHARD_NUM) dequeue from all event streams will not be performed when
# using a database with a version less than 23.4.
tasks.max=1
# The maximum number of messages in a batch. The default batch size is 1024.
Expand Down Expand Up @@ -259,13 +261,18 @@ In another command prompt start the Kafka server by running the following comman
```

In the third command prompt start the connector in either standalone (connect-standalone.bat) or distributed (connect-distributed.bat) mode by running the following command.
The command below is connecting in standalone mode. If connecting is distributed mode replace the bat file with the connect-distributed.bat file. If you want to run the source
connector replace the properties file below with the properties file for the source connector.
The command below is connecting in standalone mode. If you want to run the source connector replace the properties file below with the properties file for the source connector.

```bash
.\bin\windows\connect-standalone.bat .\config\connect-standalone.properties .\config\connect-txeventq-sink.properties
```

If connecting in distributed mode on a Windows environment enter the following command in a command prompt.

```bash
.\bin\windows\connect-distributed.bat .\config\connect-distributed.properties
```

If running Kafka in a Linux environment open 3 different terminals and change to the directory where Kafka has been installed.

Run the following command in one of the terminals to start zookeeper:
Expand All @@ -281,9 +288,31 @@ bin/kafka-server-start.sh config/server.properties
```

In the third terminal start the connector in either standalone (connect-standalone.sh) or distributed (connect-distributed.sh) mode by running the following command.
The command below is connecting in standalone mode. If connecting is distributed mode replace the bat file with the connect-distributed.sh file. If you want to run the source
connector replace the properties file below with the properties file for the source connector.
The command below is connecting in standalone mode. If you want to run the source connector replace the properties file below with the properties file for the source connector.

```bash
bin/connect-standalone.sh config/connect-standalone.properties config/connect-TxEventQ-sink.properties
```

If connecting in distributed mode on a Linux environment enter the following command in a command prompt.

```bash
bin/connect-distributed.sh config/connect-distributed.properties
```

Use REST calls to post configuration properties when running in distributed mode. An example of a JSON configuration is shown below.

```bash
{
"connector.class": "oracle.jdbc.txeventq.kafka.connect.source.TxEventQSourceConnector",
"tasks.max": "5",
"kafka.topic": <Kafka topic>,
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"db_tns_alias": <tns alias>,
"wallet.path": <specify wallet path>,
"tnsnames.path": <specify tnsnames path>,
"txeventq.queue.name": <txEventQ queue name>,
"txeventq.subscriber": <txEventQ subscriber>,
"bootstrap.servers": <broker i.e localhost:9092>
}
```
136 changes: 135 additions & 1 deletion connectors/THIRD_PARTY_LICENSE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,17 @@ org.apache.kafka >> connect-json
* See the License for the specific language governing permissions and
* limitations under the License.
*******************************************************************************/


Oracle Free Distribution, Hosting, and Use Terms and Conditions (FDHUT)
com.oracle.database.security >> oraclepki
com.oracle.database.security >> osdt_core
com.oracle.database.security >> osdt_cert
com.oracle.database.jdbc >> ojdbc11
com.oracle.database.messaging >> aqapi

CDDL
javax.transaction >> jta

CDDL + GPL2
javax.jms >> javax.jms-api

Expand All @@ -32,6 +42,10 @@ org.slf4j >> slf4j-api
Copyright (c) 2004-2022 QOS.ch Sarl (Switzerland)
All rights reserved.

===============================================================================

## Licenses

##############################################################################

Apache License
Expand Down Expand Up @@ -1011,3 +1025,123 @@ based on this library. If you modify this library, you may extend this
exception to your version of the library, but you are not obligated to
do so. If you do not wish to do so, delete this exception statement
from your version.

######################################################################################

### FDHUT


Your use of this Program is governed by the Oracle Free Distribution, Hosting,
and Use Terms and Conditions set forth below, unless you have received this
Program (alone or as part of another Oracle product) under an Oracle license
agreement (including but not limited to the Oracle Master Agreement), in which
case your use of this Program is governed solely by such license agreement with
Oracle.

Oracle Free Distribution, Hosting, and Use Terms and Conditions
Definitions
"Oracle" refers to Oracle America, Inc. "You" and "Your" refers to (a) a
company or organization (each an "Entity") accessing the Programs, if use of
the Programs will be on behalf of such Entity; or (b) an individual accessing
the Programs, if use of the Programs will not be on behalf of an Entity.
"Program(s)" refers to Oracle software provided by Oracle pursuant to the
following terms and any updates, error corrections, and/or Program
Documentation provided by Oracle. "Program Documentation" refers to Program
user manuals and Program installation manuals, if any. If available, Program
Documentation may be delivered with the Programs and/or may be accessed from
www.oracle.com/documentation. "Separate Terms" refers to separate license terms
that are specified in the Program Documentation, readmes or notice files and
that apply to Separately Licensed Technology. "Separately Licensed Technology"
refers to Oracle or third party technology that is licensed under Separate
Terms and not under the terms of this license.

Separately Licensed Technology
Oracle may provide certain notices to You in Program Documentation, readmes or
notice files in connection with Oracle or third party technology provided as or
with the Programs. If specified in the Program Documentation, readmes or notice
files, such technology will be licensed to You under Separate Terms. Your
rights to use Separately Licensed Technology under Separate Terms are not
restricted in any way by the terms herein. For clarity, notwithstanding the
existence of a notice, third party technology that is not Separately Licensed
Technology shall be deemed part of the Programs licensed to You under the terms
of this license.

Source Code for Open Source Software
For software that You receive from Oracle in binary form that is licensed under
an open source license that gives You the right to receive the source code for
that binary, You can obtain a copy of the applicable source code from
https://oss.oracle.com/sources/ or http://www.oracle.com/goto/opensourcecode.
If the source code for such software was not provided to You with the binary,
You can also receive a copy of the source code on physical media by submitting
a written request pursuant to the instructions in the "Written Offer for Source
Code" section of the latter website.

-------------------------------------------------------------------------------
The following license terms apply to those Programs that are not provided to
You under Separate Terms.
License Rights and Restrictions
Oracle grants to You, as a recipient of this Program, a nonexclusive,
nontransferable, limited license to, subject to the conditions stated herein,
use the unmodified Programs, including, without limitation, for the purposes
of:
• developing, testing, prototyping and demonstrating applications;
• running the unmodified Programs for training, personal use, your
business operations, and the business operations of third parties;
• making the unmodified Programs available for use by third parties
in your hosted environment and in cloud services;
• redistributing unmodified Programs and Programs Documentation under
the terms of this License; and
• copying the unmodified Programs and Program Documentation to the extent
reasonably necessary to exercise the license rights granted herein and
for backup purposes.
For the purposes of this license, compiling, interpreting or configuring an
otherwise unmodified Program as necessary to run the Program shall not be
considered modification.

Your license is contingent on Your compliance with the following conditions:
- You include a copy of this license with any distribution by You of the
Programs;
- You do not charge your customers, end users, distributees or other third
parties any additional fees for the distribution or use of the Programs;
however, for clarity, if you comply with the foregoing condition,
distribution or use of the Program as part of your for-fee product or service
that adds substantial additional value is permitted;
- You do not remove markings or notices of either Oracle's or a licensor's
proprietary rights from the Programs or Program Documentation;
- You comply with all U.S. and applicable export control and economic sanctions
laws and regulations that govern Your use of the Programs (including
technical data); and
- You do not cause or permit reverse engineering, disassembly or decompilation
of the Programs (except as allowed by law) by You nor allow an associated
party to do so.
Any source code that may be included in the distribution with the Programs may
not be modified, unless such source code is under Separate Terms permitting
modification.

Ownership
Oracle or its licensors retain all ownership and intellectual property rights
to the Programs.

Information Collection
The Programs' installation and/or auto-update processes, if any, may transmit a
limited amount of data to Oracle or its service provider about those processes
to help Oracle understand and optimize them. Oracle does not associate the data
with personally identifiable information. Refer to Oracle's Privacy Policy at
www.oracle.com/privacy.

Disclaimer of Warranties; Limitation of Liability
THE PROGRAMS ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND. ORACLE FURTHER
DISCLAIMS ALL WARRANTIES, EXPRESS AND IMPLIED, INCLUDING WITHOUT LIMITATION,
ANY IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, OR
NONINFRINGEMENT. IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW WILL ORACLE BE
LIABLE TO YOU FOR DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR
CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE THE PROGRAM
(INCLUDING BUT NOT LIMITED TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR
LOSSES SUSTAINED BY YOU OR THIRD PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE
WITH ANY OTHER PROGRAMS), EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED
OF THE POSSIBILITY OF SUCH DAMAGES.

Version 1.0
Last updated: 28 June 2022

---
4 changes: 2 additions & 2 deletions connectors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@

<groupId>com.oracle.database.messaging</groupId>
<artifactId>txeventq-connector</artifactId>
<version>1.1</version>
<version>23.4.0.24.06</version>

<packaging>jar</packaging>

<!-- Start of fields requiring product specific values -->
<name>TxEventQ-Connector</name>
<description>Oracle's implementation of Kafka Sink and Source Connect for Oracle Transactional Event Queues.</description>
<url>https://docs.oracle.com/en/database/oracle/oracle-database/21/okjdc/index.html</url>
<url>https://docs.oracle.com/en/database/oracle/oracle-database/23/adque/Kafka_cient_interface_TEQ.html#GUID-C329D40D-21D6-454E-8B6A-49D96F0C8795</url>
<!-- End of fields requiring product specific values -->

<licenses>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public class TxEventQSourceTask extends SourceTask {

private int batchSize;

private int tasksMax;

// Used to indicate when a batch has completed.
private CountDownLatch batchCompleteIndicator = null;

Expand Down Expand Up @@ -87,15 +89,18 @@ public void start(Map<String, String> properties) {
this.connectorName = this.config.name();

this.batchSize = this.config.getInt(TxEventQConnectorConfig.TASK_BATCH_SIZE_CONFIG);

log.debug("The batch size is: {}", this.batchSize);

this.tasksMax = this.config.getInt(TxEventQConnectorConfig.TASK_MAX_CONFIG);
log.debug("The tasks.max is: {}", this.tasksMax);

this.consumer.connect();

int kafkaPartitionNum = this.consumer.getKafkaTopicPartitionSize(
this.config.getString(TxEventQConnectorConfig.KAFKA_TOPIC));
int txEventQShardNum = this.consumer.getNumOfShardsForQueue(
this.config.getString(TxEventQConnectorConfig.TXEVENTQ_QUEUE_NAME));

if (kafkaPartitionNum < txEventQShardNum) {
throw new ConnectException("The number of Kafka partitions " + kafkaPartitionNum
+ " must be greater than or equal to " + txEventQShardNum);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ public class TxEventQConnectorConfig extends AbstractConfig {
private static final String TASK_BATCH_SIZE_DOC = "The maximum number of records that a connector task may read from the Oracle TxEventQ broker before writing to Kafka. The task holds these records until they are acknowledged in Kafka, so this may affect memory usage.";
public static final int TASK_BATCH_SIZE_DEFAULT = 1024;

public static final String TASK_MAX_CONFIG = "tasks.max";
private static final String TASK_MAX_DISPLAY = "Tasks Max";
private static final String TASK_MAX_DOC = "Maximum number of tasks to use for this connector.";
public static final int TASK_MAX_DEFAULT = 1;

public final String topic;

public TxEventQConnectorConfig(Map<String, String> originals) {
Expand Down Expand Up @@ -153,6 +158,10 @@ public static ConfigDef getConfig() {
ConfigDef.Importance.MEDIUM, TASK_BATCH_SIZE_DOC, groupName, ++orderInGroup,
ConfigDef.Width.MEDIUM, TASK_BATCH_SIZE_DISPLAY);

configDef.define(TASK_MAX_CONFIG, ConfigDef.Type.INT, TASK_MAX_DEFAULT,
ConfigDef.Importance.HIGH, TASK_MAX_DOC, groupName, ++orderInGroup,
ConfigDef.Width.MEDIUM, TASK_MAX_DISPLAY);

return configDef;
}

Expand Down
Loading

0 comments on commit 51df224

Please sign in to comment.