diff --git a/connectors/README.md b/connectors/README.md index d88167e..5f7a829 100644 --- a/connectors/README.md +++ b/connectors/README.md @@ -56,8 +56,9 @@ exec sys.dbms_aqadm.start_queue('TxEventQ'); exec sys.dbms_aqadm.add_subscriber('TxEventQ', SYS.AQ$_AGENT('SUB1', NULL, 0)); ``` -If using the Source Connector and ordering of the events are important then the Transactional Event Queue that the Kakfa Source connector will be pulling from should have -the `STICKY_DEQUEUE` parameter set. The `tasks.max` property should also correspond to the number of `SHARD_NUM` assigned to the queue. +**If using the Source Connector and ordering of the events are important then the Transactional Event Queue that the Kakfa Source connector will be pulling from should have the `STICKY_DEQUEUE` parameter set. The `SHARD_NUM` assigned to the queue should be less than or equal to the number of Kafka partitions assigned to the Kafka topic.** + +**Note: If running on a database version less than 23.4 with `STICKY_DEQUEUE` the `tasks.max` value must be equal to the `SHARD_NUM` specified. If the `tasks.max` is not equal to the `SHARD_NUM` dequeue from all event streams will not be performed.** ```roomsql exec sys.dbms_aqadm.create_sharded_queue(queue_name=>"TxEventQ", multiple_consumers => TRUE); @@ -180,9 +181,10 @@ Here is the full `connect-txeventq-source.properties` file below. name=TxEventQ-source connector.class=oracle.jdbc.txeventq.kafka.connect.source.TxEventQSourceConnector -# If using event streams and ordering of the events is important the number of tasks -# set for this property should be same number of event streams used for the transactional -# event queue. +# If the transactional event queue has STICKY_DEQUEUE set and running on a database version less than 23.4 +# the tasks.max number specified must be equal to the number of event streams (SHARD_NUM) for the queue. +# If the `tasks.max` is not equal to the event streams (SHARD_NUM) dequeue from all event streams will not be performed when +# using a database with a version less than 23.4. tasks.max=1 # The maximum number of messages in a batch. The default batch size is 1024. @@ -259,13 +261,18 @@ In another command prompt start the Kafka server by running the following comman ``` In the third command prompt start the connector in either standalone (connect-standalone.bat) or distributed (connect-distributed.bat) mode by running the following command. -The command below is connecting in standalone mode. If connecting is distributed mode replace the bat file with the connect-distributed.bat file. If you want to run the source -connector replace the properties file below with the properties file for the source connector. +The command below is connecting in standalone mode. If you want to run the source connector replace the properties file below with the properties file for the source connector. ```bash .\bin\windows\connect-standalone.bat .\config\connect-standalone.properties .\config\connect-txeventq-sink.properties ``` +If connecting in distributed mode on a Windows environment enter the following command in a command prompt. + +```bash +.\bin\windows\connect-distributed.bat .\config\connect-distributed.properties +``` + If running Kafka in a Linux environment open 3 different terminals and change to the directory where Kafka has been installed. Run the following command in one of the terminals to start zookeeper: @@ -281,9 +288,31 @@ bin/kafka-server-start.sh config/server.properties ``` In the third terminal start the connector in either standalone (connect-standalone.sh) or distributed (connect-distributed.sh) mode by running the following command. -The command below is connecting in standalone mode. If connecting is distributed mode replace the bat file with the connect-distributed.sh file. If you want to run the source -connector replace the properties file below with the properties file for the source connector. +The command below is connecting in standalone mode. If you want to run the source connector replace the properties file below with the properties file for the source connector. ```bash bin/connect-standalone.sh config/connect-standalone.properties config/connect-TxEventQ-sink.properties ``` + +If connecting in distributed mode on a Linux environment enter the following command in a command prompt. + +```bash +bin/connect-distributed.sh config/connect-distributed.properties +``` + +Use REST calls to post configuration properties when running in distributed mode. An example of a JSON configuration is shown below. + +```bash +{ + "connector.class": "oracle.jdbc.txeventq.kafka.connect.source.TxEventQSourceConnector", + "tasks.max": "5", + "kafka.topic": , + "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", + "db_tns_alias": , + "wallet.path": , + "tnsnames.path": , + "txeventq.queue.name": , + "txeventq.subscriber": , + "bootstrap.servers": +} +``` diff --git a/connectors/THIRD_PARTY_LICENSE.txt b/connectors/THIRD_PARTY_LICENSE.txt index ebb8704..f22f0e8 100644 --- a/connectors/THIRD_PARTY_LICENSE.txt +++ b/connectors/THIRD_PARTY_LICENSE.txt @@ -23,7 +23,17 @@ org.apache.kafka >> connect-json * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ - + +Oracle Free Distribution, Hosting, and Use Terms and Conditions (FDHUT) +com.oracle.database.security >> oraclepki +com.oracle.database.security >> osdt_core +com.oracle.database.security >> osdt_cert +com.oracle.database.jdbc >> ojdbc11 +com.oracle.database.messaging >> aqapi + +CDDL +javax.transaction >> jta + CDDL + GPL2 javax.jms >> javax.jms-api @@ -32,6 +42,10 @@ org.slf4j >> slf4j-api Copyright (c) 2004-2022 QOS.ch Sarl (Switzerland) All rights reserved. +=============================================================================== + +## Licenses + ############################################################################## Apache License @@ -1011,3 +1025,123 @@ based on this library. If you modify this library, you may extend this exception to your version of the library, but you are not obligated to do so. If you do not wish to do so, delete this exception statement from your version. + +###################################################################################### + +### FDHUT + + +Your use of this Program is governed by the Oracle Free Distribution, Hosting, +and Use Terms and Conditions set forth below, unless you have received this +Program (alone or as part of another Oracle product) under an Oracle license +agreement (including but not limited to the Oracle Master Agreement), in which +case your use of this Program is governed solely by such license agreement with +Oracle. + +Oracle Free Distribution, Hosting, and Use Terms and Conditions +Definitions +"Oracle" refers to Oracle America, Inc. "You" and "Your" refers to (a) a +company or organization (each an "Entity") accessing the Programs, if use of +the Programs will be on behalf of such Entity; or (b) an individual accessing +the Programs, if use of the Programs will not be on behalf of an Entity. +"Program(s)" refers to Oracle software provided by Oracle pursuant to the +following terms and any updates, error corrections, and/or Program +Documentation provided by Oracle. "Program Documentation" refers to Program +user manuals and Program installation manuals, if any. If available, Program +Documentation may be delivered with the Programs and/or may be accessed from +www.oracle.com/documentation. "Separate Terms" refers to separate license terms +that are specified in the Program Documentation, readmes or notice files and +that apply to Separately Licensed Technology. "Separately Licensed Technology" +refers to Oracle or third party technology that is licensed under Separate +Terms and not under the terms of this license. + +Separately Licensed Technology +Oracle may provide certain notices to You in Program Documentation, readmes or +notice files in connection with Oracle or third party technology provided as or +with the Programs. If specified in the Program Documentation, readmes or notice +files, such technology will be licensed to You under Separate Terms. Your +rights to use Separately Licensed Technology under Separate Terms are not +restricted in any way by the terms herein. For clarity, notwithstanding the +existence of a notice, third party technology that is not Separately Licensed +Technology shall be deemed part of the Programs licensed to You under the terms +of this license. + +Source Code for Open Source Software +For software that You receive from Oracle in binary form that is licensed under +an open source license that gives You the right to receive the source code for +that binary, You can obtain a copy of the applicable source code from +https://oss.oracle.com/sources/ or http://www.oracle.com/goto/opensourcecode. +If the source code for such software was not provided to You with the binary, +You can also receive a copy of the source code on physical media by submitting +a written request pursuant to the instructions in the "Written Offer for Source +Code" section of the latter website. + +------------------------------------------------------------------------------- +The following license terms apply to those Programs that are not provided to +You under Separate Terms. +License Rights and Restrictions +Oracle grants to You, as a recipient of this Program, a nonexclusive, +nontransferable, limited license to, subject to the conditions stated herein, +use the unmodified Programs, including, without limitation, for the purposes +of: +• developing, testing, prototyping and demonstrating applications; +• running the unmodified Programs for training, personal use, your + business operations, and the business operations of third parties; +• making the unmodified Programs available for use by third parties + in your hosted environment and in cloud services; +• redistributing unmodified Programs and Programs Documentation under + the terms of this License; and +• copying the unmodified Programs and Program Documentation to the extent + reasonably necessary to exercise the license rights granted herein and + for backup purposes. +For the purposes of this license, compiling, interpreting or configuring an +otherwise unmodified Program as necessary to run the Program shall not be +considered modification. + +Your license is contingent on Your compliance with the following conditions: +- You include a copy of this license with any distribution by You of the + Programs; +- You do not charge your customers, end users, distributees or other third + parties any additional fees for the distribution or use of the Programs; + however, for clarity, if you comply with the foregoing condition, + distribution or use of the Program as part of your for-fee product or service + that adds substantial additional value is permitted; +- You do not remove markings or notices of either Oracle's or a licensor's + proprietary rights from the Programs or Program Documentation; +- You comply with all U.S. and applicable export control and economic sanctions + laws and regulations that govern Your use of the Programs (including + technical data); and +- You do not cause or permit reverse engineering, disassembly or decompilation + of the Programs (except as allowed by law) by You nor allow an associated + party to do so. +Any source code that may be included in the distribution with the Programs may +not be modified, unless such source code is under Separate Terms permitting +modification. + +Ownership +Oracle or its licensors retain all ownership and intellectual property rights +to the Programs. + +Information Collection +The Programs' installation and/or auto-update processes, if any, may transmit a +limited amount of data to Oracle or its service provider about those processes +to help Oracle understand and optimize them. Oracle does not associate the data +with personally identifiable information. Refer to Oracle's Privacy Policy at +www.oracle.com/privacy. + +Disclaimer of Warranties; Limitation of Liability +THE PROGRAMS ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND. ORACLE FURTHER +DISCLAIMS ALL WARRANTIES, EXPRESS AND IMPLIED, INCLUDING WITHOUT LIMITATION, +ANY IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, OR +NONINFRINGEMENT. IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW WILL ORACLE BE +LIABLE TO YOU FOR DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR +CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE THE PROGRAM +(INCLUDING BUT NOT LIMITED TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR +LOSSES SUSTAINED BY YOU OR THIRD PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE +WITH ANY OTHER PROGRAMS), EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED +OF THE POSSIBILITY OF SUCH DAMAGES. + +Version 1.0 +Last updated: 28 June 2022 + +--- diff --git a/connectors/pom.xml b/connectors/pom.xml index 7ae42a8..1e0a1f0 100644 --- a/connectors/pom.xml +++ b/connectors/pom.xml @@ -6,14 +6,14 @@ com.oracle.database.messaging txeventq-connector - 1.1 + 23.4.0.24.06 jar TxEventQ-Connector Oracle's implementation of Kafka Sink and Source Connect for Oracle Transactional Event Queues. - https://docs.oracle.com/en/database/oracle/oracle-database/21/okjdc/index.html + https://docs.oracle.com/en/database/oracle/oracle-database/23/adque/Kafka_cient_interface_TEQ.html#GUID-C329D40D-21D6-454E-8B6A-49D96F0C8795 diff --git a/connectors/src/main/java/oracle/jdbc/txeventq/kafka/connect/source/task/TxEventQSourceTask.java b/connectors/src/main/java/oracle/jdbc/txeventq/kafka/connect/source/task/TxEventQSourceTask.java index 3043190..26946df 100644 --- a/connectors/src/main/java/oracle/jdbc/txeventq/kafka/connect/source/task/TxEventQSourceTask.java +++ b/connectors/src/main/java/oracle/jdbc/txeventq/kafka/connect/source/task/TxEventQSourceTask.java @@ -50,6 +50,8 @@ public class TxEventQSourceTask extends SourceTask { private int batchSize; + private int tasksMax; + // Used to indicate when a batch has completed. private CountDownLatch batchCompleteIndicator = null; @@ -87,15 +89,18 @@ public void start(Map properties) { this.connectorName = this.config.name(); this.batchSize = this.config.getInt(TxEventQConnectorConfig.TASK_BATCH_SIZE_CONFIG); - log.debug("The batch size is: {}", this.batchSize); + this.tasksMax = this.config.getInt(TxEventQConnectorConfig.TASK_MAX_CONFIG); + log.debug("The tasks.max is: {}", this.tasksMax); + this.consumer.connect(); int kafkaPartitionNum = this.consumer.getKafkaTopicPartitionSize( this.config.getString(TxEventQConnectorConfig.KAFKA_TOPIC)); int txEventQShardNum = this.consumer.getNumOfShardsForQueue( this.config.getString(TxEventQConnectorConfig.TXEVENTQ_QUEUE_NAME)); + if (kafkaPartitionNum < txEventQShardNum) { throw new ConnectException("The number of Kafka partitions " + kafkaPartitionNum + " must be greater than or equal to " + txEventQShardNum); diff --git a/connectors/src/main/java/oracle/jdbc/txeventq/kafka/connect/source/utils/TxEventQConnectorConfig.java b/connectors/src/main/java/oracle/jdbc/txeventq/kafka/connect/source/utils/TxEventQConnectorConfig.java index 14925f4..8c3719f 100644 --- a/connectors/src/main/java/oracle/jdbc/txeventq/kafka/connect/source/utils/TxEventQConnectorConfig.java +++ b/connectors/src/main/java/oracle/jdbc/txeventq/kafka/connect/source/utils/TxEventQConnectorConfig.java @@ -79,6 +79,11 @@ public class TxEventQConnectorConfig extends AbstractConfig { private static final String TASK_BATCH_SIZE_DOC = "The maximum number of records that a connector task may read from the Oracle TxEventQ broker before writing to Kafka. The task holds these records until they are acknowledged in Kafka, so this may affect memory usage."; public static final int TASK_BATCH_SIZE_DEFAULT = 1024; + public static final String TASK_MAX_CONFIG = "tasks.max"; + private static final String TASK_MAX_DISPLAY = "Tasks Max"; + private static final String TASK_MAX_DOC = "Maximum number of tasks to use for this connector."; + public static final int TASK_MAX_DEFAULT = 1; + public final String topic; public TxEventQConnectorConfig(Map originals) { @@ -153,6 +158,10 @@ public static ConfigDef getConfig() { ConfigDef.Importance.MEDIUM, TASK_BATCH_SIZE_DOC, groupName, ++orderInGroup, ConfigDef.Width.MEDIUM, TASK_BATCH_SIZE_DISPLAY); + configDef.define(TASK_MAX_CONFIG, ConfigDef.Type.INT, TASK_MAX_DEFAULT, + ConfigDef.Importance.HIGH, TASK_MAX_DOC, groupName, ++orderInGroup, + ConfigDef.Width.MEDIUM, TASK_MAX_DISPLAY); + return configDef; } diff --git a/connectors/src/main/java/oracle/jdbc/txeventq/kafka/connect/source/utils/TxEventQConsumer.java b/connectors/src/main/java/oracle/jdbc/txeventq/kafka/connect/source/utils/TxEventQConsumer.java index 653b2f4..6f5e199 100644 --- a/connectors/src/main/java/oracle/jdbc/txeventq/kafka/connect/source/utils/TxEventQConsumer.java +++ b/connectors/src/main/java/oracle/jdbc/txeventq/kafka/connect/source/utils/TxEventQConsumer.java @@ -98,6 +98,8 @@ public class TxEventQConsumer implements Closeable { private String getQueueType; private static final int MINIMUM_VERSION = 21; + private int databaseMajorVersion = 0; + private int databaseMinorVersion = 0; public TxEventQConsumer(TxEventQConnectorConfig config) { this.config = config; @@ -146,29 +148,49 @@ public void connect() { } /** - * Check whether the database is version 21 or later + * Check whether the database version is 21 or later * * @throws ConnectException If we cannot get the database metadata or if the database version is * less than 21 */ private void versionCheck() { DatabaseMetaData md = null; - int version = 0; try { md = this.conn.getMetaData(); - version = md.getDatabaseMajorVersion(); - log.debug("DB Version: {}", version); + this.databaseMajorVersion = md.getDatabaseMajorVersion(); + this.databaseMinorVersion = md.getDatabaseMinorVersion(); + + log.debug("Database major version: {}", this.databaseMajorVersion); + log.debug("Database minor version: [{}]", this.databaseMinorVersion); } catch (SQLException e) { throw new ConnectException("Unable to obtain a database connection"); } - if (version < MINIMUM_VERSION) { + if (this.databaseMajorVersion < MINIMUM_VERSION) { throw new ConnectException( "TxEventQ Connector requires Oracle Database 21c or greater"); } } + /** + * Gets the database major version that is being using. + * + * @return The database major version that is being used. + */ + public int getDatabaseMajorVersion() { + return this.databaseMajorVersion; + } + + /** + * Gets the database minor version that is being using. + * + * @return The database minor version that is being used. + */ + public int getDatabaseMinorVersion() { + return this.databaseMinorVersion; + } + /** * Gets the database connection that is currently being used. * @@ -621,6 +643,37 @@ public int getNumOfShardsForQueue(String queue) { return numShard; } + /** + * Determines whether the queue is using sticky dequeue or not. + * + * @param queue The queue to get the sticky dequeue for. + * @return True if the queue is using sticky dequeue, otherwise false. + */ + public boolean isStickyDequeue(String queue) { + log.trace("[{}]:[{}] Entry {}.isStickyDequeue", Thread.currentThread().getId(), this.conn, + this.getClass().getName()); + + int stickyDequeue; + try (CallableStatement getnumshrdStmt = this.conn + .prepareCall("{call dbms_aqadm.get_queue_parameter(?,?,?)}")) { + getnumshrdStmt.setString(1, queue); + getnumshrdStmt.setString(2, "STICKY_DEQUEUE"); + getnumshrdStmt.registerOutParameter(3, Types.INTEGER); + getnumshrdStmt.execute(); + stickyDequeue = getnumshrdStmt.getInt(3); + } catch (SQLException e) { + throw new ConnectException( + "Error attempting to get sticky dequeue value for the specified queue: " + + e.getMessage()); + } + + log.debug("Sticky dequeue for {}: {}", queue, stickyDequeue); + log.trace("[{}]:[{}] Exit {}.isStickyDequeue", Thread.currentThread().getId(), this.conn, + this.getClass().getName()); + + return stickyDequeue == 1; + } + /** * Gets the partition size for the specified Kafka topic. * diff --git a/connectors/src/main/resources/connect-txeventq-source.properties b/connectors/src/main/resources/connect-txeventq-source.properties index 0b4af68..e4f0ecb 100644 --- a/connectors/src/main/resources/connect-txeventq-source.properties +++ b/connectors/src/main/resources/connect-txeventq-source.properties @@ -23,9 +23,10 @@ name=TxEventQ-source connector.class=oracle.jdbc.txeventq.kafka.connect.source.TxEventQSourceConnector -# If using event streams and ordering of the events is important the number of tasks -# set for this property should be same number of event streams used for the transactional -# event queue. +# If the transactional event queue has STICKY_DEQUEUE set and running on a database version less than 23.4 +# the tasks.max number specified must be equal to the number of event streams (SHARD_NUM) for the queue. +# If the `tasks.max` is not equal to the event streams (SHARD_NUM) dequeue from all event streams will +# not be performed when using a database with a version less than 23.4. tasks.max=1 # The maximum number of messages in a batch. diff --git a/connectors/src/main/resources/kafka-connect-oracle-version.properties b/connectors/src/main/resources/kafka-connect-oracle-version.properties index f530c42..c3a148f 100644 --- a/connectors/src/main/resources/kafka-connect-oracle-version.properties +++ b/connectors/src/main/resources/kafka-connect-oracle-version.properties @@ -1,3 +1,3 @@ name="Kafka Connect for Oracle" -version=1.0 +version=23.4.0.24.06 commitId=1 \ No newline at end of file