From 9c3293addd46f50f5607642ddddc8902993f925b Mon Sep 17 00:00:00 2001 From: lasanthaS Date: Fri, 18 Oct 2019 00:09:13 +0530 Subject: [PATCH 1/5] Fix ignoring records while concurrent writes --- .../siddhi/io/cdc/source/CDCSource.java | 51 +++- .../io/cdc/source/polling/CDCPoller.java | 253 +++--------------- .../strategies/DefaultPollingStrategy.java | 139 ++++++++++ .../polling/strategies/PollingStrategy.java | 182 +++++++++++++ .../WaitOnMissingRecordPollingStrategy.java | 180 +++++++++++++ .../io/cdc/util/CDCSourceConstants.java | 3 + findbugs-exclude.xml | 4 +- 7 files changed, 584 insertions(+), 228 deletions(-) create mode 100644 component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/DefaultPollingStrategy.java create mode 100644 component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/PollingStrategy.java create mode 100644 component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/WaitOnMissingRecordPollingStrategy.java diff --git a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/CDCSource.java b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/CDCSource.java index 555a1a1..398458e 100644 --- a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/CDCSource.java +++ b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/CDCSource.java @@ -184,6 +184,38 @@ defaultValue = "{host}_{port}", optional = true, type = DataType.STRING + ), + @Parameter( + name = "wait.on.missed.record", + description = "Indicates whether you need to wait on missing/out-of-order records. When " + + "this flag is set to true the process will be hold once it identifies a missing " + + "record. The missing record is identified by the sequence of the polling.column. " + + "\nThis can be used only with a number field and should not be used with time values " + + "as it will not be an incremental value." + + "\nThis should be enabled ONLY in a situation where the records are written out of " + + "order, e.g. in a concurrent system as this degrades the performance.", + type = DataType.BOOL, + optional = true, + defaultValue = "false" + ), + @Parameter( + name = "missed.record.retry.interval", + description = "The time interval (specified in milliseconds) to wait and retry if the " + + "process identifies a missing/out-of-order record. This should be used along with " + + "the wait.on.missed.record parameter.", + type = DataType.INT, + optional = true, + defaultValue = "-1" + ), + @Parameter( + name = "missed.record.waiting.timeout", + description = "The timeout (specified in milliseconds) to retry for missing/out-of-order " + + "record. This should be used along with the wait.on.missed.record parameter. If the " + + "parameter is not set, the process will indefinitely wait for the record without " + + "proceeding.", + type = DataType.INT, + optional = true, + defaultValue = "-1" ) }, examples = { @@ -341,23 +373,33 @@ public void init(SourceEventListener sourceEventListener, OptionHolder optionHol String pollingColumn = optionHolder.validateAndGetStaticValue(CDCSourceConstants.POLLING_COLUMN); boolean isDatasourceNameAvailable = optionHolder.isOptionExists(CDCSourceConstants.DATASOURCE_NAME); boolean isJndiResourceAvailable = optionHolder.isOptionExists(CDCSourceConstants.JNDI_RESOURCE); - pollingInterval = Integer.parseInt( + int pollingInterval = Integer.parseInt( optionHolder.validateAndGetStaticValue(CDCSourceConstants.POLLING_INTERVAL, Integer.toString(CDCSourceConstants.DEFAULT_POLLING_INTERVAL_SECONDS))); validatePollingModeParameters(); String poolPropertyString = optionHolder.validateAndGetStaticValue(CDCSourceConstants.POOL_PROPERTIES, null); + boolean waitOnMissedRecord = Boolean.parseBoolean( + optionHolder.validateAndGetStaticValue(CDCSourceConstants.WAIT_ON_MISSED_RECORD, "false")); + int missedRecordRetryIntervalMS = Integer.parseInt( + optionHolder.validateAndGetStaticValue(CDCSourceConstants.MISSED_RECORD_RETRY_INTERVAL_MS, + "-1")); + int missedRecordWaitingTimeoutMS = Integer.parseInt( + optionHolder.validateAndGetStaticValue( + CDCSourceConstants.MISSED_RECORD_WAITING_TIMEOUT_MS, "-1")); if (isDatasourceNameAvailable) { String datasourceName = optionHolder.validateAndGetStaticValue(CDCSourceConstants.DATASOURCE_NAME); cdcPoller = new CDCPoller(null, null, null, tableName, null, datasourceName, null, pollingColumn, pollingInterval, - poolPropertyString, sourceEventListener, configReader); + poolPropertyString, sourceEventListener, configReader, waitOnMissedRecord, + missedRecordRetryIntervalMS, missedRecordWaitingTimeoutMS); } else if (isJndiResourceAvailable) { String jndiResource = optionHolder.validateAndGetStaticValue(CDCSourceConstants.JNDI_RESOURCE); cdcPoller = new CDCPoller(null, null, null, tableName, null, null, jndiResource, pollingColumn, pollingInterval, poolPropertyString, - sourceEventListener, configReader); + sourceEventListener, configReader, waitOnMissedRecord, missedRecordRetryIntervalMS, + missedRecordWaitingTimeoutMS); } else { String driverClassName; try { @@ -372,7 +414,8 @@ public void init(SourceEventListener sourceEventListener, OptionHolder optionHol } cdcPoller = new CDCPoller(url, username, password, tableName, driverClassName, null, null, pollingColumn, pollingInterval, poolPropertyString, - sourceEventListener, configReader); + sourceEventListener, configReader, waitOnMissedRecord, missedRecordRetryIntervalMS, + missedRecordWaitingTimeoutMS); } break; default: diff --git a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/CDCPoller.java b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/CDCPoller.java index 3d31b75..3d0a736 100644 --- a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/CDCPoller.java +++ b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/CDCPoller.java @@ -26,31 +26,17 @@ import org.osgi.framework.ServiceReference; import org.wso2.carbon.datasource.core.api.DataSourceService; import org.wso2.carbon.datasource.core.exception.DataSourceException; -import org.wso2.extension.siddhi.io.cdc.source.config.Database; -import org.wso2.extension.siddhi.io.cdc.source.config.QueryConfiguration; +import org.wso2.extension.siddhi.io.cdc.source.polling.strategies.DefaultPollingStrategy; +import org.wso2.extension.siddhi.io.cdc.source.polling.strategies.PollingStrategy; +import org.wso2.extension.siddhi.io.cdc.source.polling.strategies.WaitOnMissingRecordPollingStrategy; import org.wso2.extension.siddhi.io.cdc.util.CDCPollingUtil; import org.wso2.extension.siddhi.io.cdc.util.CDCSourceConstants; -import org.wso2.extension.siddhi.io.cdc.util.MyYamlConstructor; import org.wso2.siddhi.core.stream.input.source.SourceEventListener; import org.wso2.siddhi.core.util.config.ConfigReader; -import org.yaml.snakeyaml.TypeDescription; -import org.yaml.snakeyaml.Yaml; -import java.io.IOException; -import java.io.InputStream; -import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.util.HashMap; import java.util.List; -import java.util.Locale; -import java.util.Map; import java.util.Properties; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; + import javax.naming.InitialContext; import javax.naming.NamingException; @@ -60,48 +46,51 @@ public class CDCPoller implements Runnable { private static final Logger log = Logger.getLogger(CDCPoller.class); - private static final String PLACE_HOLDER_TABLE_NAME = "{{TABLE_NAME}}"; - private static final String PLACE_HOLDER_COLUMN_LIST = "{{COLUMN_LIST}}"; - private static final String PLACE_HOLDER_CONDITION = "{{CONDITION}}"; - private static final String SELECT_QUERY_CONFIG_FILE = "query-config.yaml"; - private static final String RECORD_SELECT_QUERY = "recordSelectQuery"; - private String selectQueryStructure = ""; private String url; private String tableName; private String username; private String password; private String driverClassName; private HikariDataSource dataSource; - private String lastReadPollingColumnValue; - private SourceEventListener sourceEventListener; - private String pollingColumn; private String datasourceName; - private int pollingInterval; private CompletionCallback completionCallback; - private boolean paused = false; - private ReentrantLock pauseLock = new ReentrantLock(); - private Condition pauseLockCondition = pauseLock.newCondition(); - private ConfigReader configReader; private String poolPropertyString; private String jndiResource; private boolean isLocalDataSource = false; + private PollingStrategy pollingStrategy; + public CDCPoller(String url, String username, String password, String tableName, String driverClassName, String datasourceName, String jndiResource, String pollingColumn, int pollingInterval, String poolPropertyString, - SourceEventListener sourceEventListener, ConfigReader configReader) { + SourceEventListener sourceEventListener, ConfigReader configReader, boolean waitOnMissedRecord, + int missedRecordRetryIntervalMS, int missedRecordWaitingTimeoutMS) { this.url = url; this.tableName = tableName; this.username = username; this.password = password; this.driverClassName = driverClassName; - this.sourceEventListener = sourceEventListener; - this.pollingColumn = pollingColumn; - this.pollingInterval = pollingInterval; - this.configReader = configReader; this.poolPropertyString = poolPropertyString; this.datasourceName = datasourceName; this.jndiResource = jndiResource; + + try { + initializeDatasource(); + } catch (NamingException e) { + throw new CDCPollingModeException("Error in initializing connection for " + tableName + ". " + + "Current mode: " + CDCSourceConstants.MODE_POLLING, e); + } + + if (waitOnMissedRecord) { + log.debug(WaitOnMissingRecordPollingStrategy.class + " is selected as the polling strategy."); + this.pollingStrategy = new WaitOnMissingRecordPollingStrategy(dataSource, configReader, sourceEventListener, + tableName, pollingColumn, pollingInterval, missedRecordRetryIntervalMS, + missedRecordWaitingTimeoutMS); + } else { + log.debug(DefaultPollingStrategy.class + " is selected as the polling strategy"); + this.pollingStrategy = new DefaultPollingStrategy(dataSource, configReader, sourceEventListener, + tableName, pollingColumn, pollingInterval); + } } public HikariDataSource getDataSource() { @@ -174,205 +163,25 @@ public boolean isLocalDataSource() { } public String getLastReadPollingColumnValue() { - return lastReadPollingColumnValue; + return pollingStrategy.getLastReadPollingColumnValue(); } public void setLastReadPollingColumnValue(String lastReadPollingColumnValue) { - this.lastReadPollingColumnValue = lastReadPollingColumnValue; - } - - private Connection getConnection() { - Connection conn; - try { - conn = this.dataSource.getConnection(); - if (log.isDebugEnabled()) { - log.debug("A connection is initialized "); - } - } catch (SQLException e) { - throw new CDCPollingModeException("Error initializing datasource connection. Current mode: " + - CDCSourceConstants.MODE_POLLING, e); - } - return conn; - } - - private String getSelectQuery(String columnList, String condition) { - String selectQuery; - if (selectQueryStructure.isEmpty()) { - //Get the database product name - String databaseName; - Connection conn = null; - try { - conn = getConnection(); - DatabaseMetaData dmd = conn.getMetaData(); - databaseName = dmd.getDatabaseProductName(); - } catch (SQLException e) { - throw new CDCPollingModeException("Error in looking up database type. Current mode: " + - CDCSourceConstants.MODE_POLLING, e); - } finally { - CDCPollingUtil.cleanupConnection(null, null, conn); - } - - //Read configs from config reader. - selectQueryStructure = configReader.readConfig(databaseName + "." + RECORD_SELECT_QUERY, ""); - - if (selectQueryStructure.isEmpty()) { - //Read configs from yaml file - QueryConfiguration queryConfiguration; - InputStream inputStream = null; - try { - MyYamlConstructor constructor = new MyYamlConstructor(QueryConfiguration.class); - TypeDescription queryTypeDescription = new TypeDescription(QueryConfiguration.class); - queryTypeDescription.putListPropertyType("databases", Database.class); - constructor.addTypeDescription(queryTypeDescription); - Yaml yaml = new Yaml(constructor); - ClassLoader classLoader = getClass().getClassLoader(); - inputStream = classLoader.getResourceAsStream(SELECT_QUERY_CONFIG_FILE); - if (inputStream == null) { - throw new CDCPollingModeException(SELECT_QUERY_CONFIG_FILE - + " is not found in the classpath. Current mode: " + CDCSourceConstants.MODE_POLLING); - } - queryConfiguration = (QueryConfiguration) yaml.load(inputStream); - } finally { - if (inputStream != null) { - try { - inputStream.close(); - } catch (IOException e) { - log.error("Failed to close the input stream for " + SELECT_QUERY_CONFIG_FILE + ". " + - "Current mode: " + CDCSourceConstants.MODE_POLLING); - } - } - } - - //Get database related select query structure - if (queryConfiguration != null) { - for (Database database : queryConfiguration.getDatabases()) { - if (database.getName().equalsIgnoreCase(databaseName)) { - selectQueryStructure = database.getSelectQuery(); - break; - } - } - } - } - - if (selectQueryStructure.isEmpty()) { - throw new CDCPollingModeException("Unsupported database: " + databaseName + ". Configure system" + - " parameter: " + databaseName + "." + RECORD_SELECT_QUERY + ". Current mode: " + - CDCSourceConstants.MODE_POLLING); - } - } - - //create the select query with given constraints - selectQuery = selectQueryStructure.replace(PLACE_HOLDER_TABLE_NAME, tableName) - .replace(PLACE_HOLDER_COLUMN_LIST, columnList) - .replace(PLACE_HOLDER_CONDITION, condition); - - return selectQuery; - } - - /** - * Poll for inserts and updates. - */ - private void pollForChanges() { - try { - initializeDatasource(); - } catch (NamingException e) { - throw new CDCPollingModeException("Error in initializing connection for " + tableName + ". " + - "Current mode: " + CDCSourceConstants.MODE_POLLING, e); - } - - String selectQuery; - ResultSetMetaData metadata; - Map detailsMap; - Connection connection = getConnection(); - PreparedStatement statement = null; - ResultSet resultSet = null; - - try { - //If lastReadPollingColumnValue is null, assign it with last record of the table. - if (lastReadPollingColumnValue == null) { - selectQuery = getSelectQuery("MAX(" + pollingColumn + ")", "").trim(); - statement = connection.prepareStatement(selectQuery); - resultSet = statement.executeQuery(); - if (resultSet.next()) { - lastReadPollingColumnValue = resultSet.getString(1); - } - //if the table is empty, set last offset to a negative value. - if (lastReadPollingColumnValue == null) { - lastReadPollingColumnValue = "-1"; - } - } - - selectQuery = getSelectQuery("*", "WHERE " + pollingColumn + " > ?"); - statement = connection.prepareStatement(selectQuery); - - while (true) { - if (paused) { - pauseLock.lock(); - try { - while (paused) { - pauseLockCondition.await(); - } - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } finally { - pauseLock.unlock(); - } - } - try { - statement.setString(1, lastReadPollingColumnValue); - resultSet = statement.executeQuery(); - metadata = resultSet.getMetaData(); - while (resultSet.next()) { - detailsMap = new HashMap<>(); - for (int i = 1; i <= metadata.getColumnCount(); i++) { - String key = metadata.getColumnName(i); - Object value = resultSet.getObject(key); - detailsMap.put(key.toLowerCase(Locale.ENGLISH), value); - } - lastReadPollingColumnValue = resultSet.getString(pollingColumn); - handleEvent(detailsMap); - } - } catch (SQLException ex) { - log.error(ex); - } finally { - CDCPollingUtil.cleanupConnection(resultSet, null, null); - } - try { - Thread.sleep((long) pollingInterval * 1000); - } catch (InterruptedException e) { - log.error("Error while polling. Current mode: " + CDCSourceConstants.MODE_POLLING, e); - } - } - } catch (SQLException ex) { - throw new CDCPollingModeException("Error in polling for changes on " + tableName + ". Current mode: " + - CDCSourceConstants.MODE_POLLING, ex); - } finally { - CDCPollingUtil.cleanupConnection(resultSet, statement, connection); - } - } - - private void handleEvent(Map detailsMap) { - sourceEventListener.onEvent(detailsMap, null); + pollingStrategy.setLastReadPollingColumnValue(lastReadPollingColumnValue); } public void pause() { - paused = true; + pollingStrategy.pause(); } public void resume() { - paused = false; - try { - pauseLock.lock(); - pauseLockCondition.signal(); - } finally { - pauseLock.unlock(); - } + pollingStrategy.resume(); } @Override public void run() { try { - pollForChanges(); + pollingStrategy.poll(); } catch (CDCPollingModeException e) { completionCallback.handle(e); } diff --git a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/DefaultPollingStrategy.java b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/DefaultPollingStrategy.java new file mode 100644 index 0000000..bd2f0c7 --- /dev/null +++ b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/DefaultPollingStrategy.java @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.extension.siddhi.io.cdc.source.polling.strategies; + +import com.zaxxer.hikari.HikariDataSource; +import org.apache.log4j.Logger; +import org.wso2.extension.siddhi.io.cdc.source.polling.CDCPollingModeException; +import org.wso2.extension.siddhi.io.cdc.util.CDCPollingUtil; +import org.wso2.extension.siddhi.io.cdc.util.CDCSourceConstants; +import org.wso2.siddhi.core.stream.input.source.SourceEventListener; +import org.wso2.siddhi.core.util.config.ConfigReader; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; + +/** + * Default implementation of the polling strategy. This uses {@code pollingColumn} and {@code pollingInterval} to poll + * on. + */ +public class DefaultPollingStrategy extends PollingStrategy { + private static final Logger log = Logger.getLogger(DefaultPollingStrategy.class); + + private String pollingColumn; + private int pollingInterval; + private String lastReadPollingColumnValue; + + public DefaultPollingStrategy(HikariDataSource dataSource, ConfigReader configReader, + SourceEventListener sourceEventListener, String tableName, String pollingColumn, + int pollingInterval) { + super(dataSource, configReader, sourceEventListener, tableName); + this.pollingColumn = pollingColumn; + this.pollingInterval = pollingInterval; + } + + @Override + public void poll() { + String selectQuery; + ResultSetMetaData metadata; + Map detailsMap; + Connection connection = getConnection(); + PreparedStatement statement = null; + ResultSet resultSet = null; + + try { + //If lastReadPollingColumnValue is null, assign it with last record of the table. + if (lastReadPollingColumnValue == null) { + selectQuery = getSelectQuery("MAX(" + pollingColumn + ")", "").trim(); + statement = connection.prepareStatement(selectQuery); + resultSet = statement.executeQuery(); + if (resultSet.next()) { + lastReadPollingColumnValue = resultSet.getString(1); + } + //if the table is empty, set last offset to a negative value. + if (lastReadPollingColumnValue == null) { + lastReadPollingColumnValue = "-1"; + } + } + + selectQuery = getSelectQuery("*", "WHERE " + pollingColumn + " > ?"); + statement = connection.prepareStatement(selectQuery); + + while (true) { + if (paused) { + pauseLock.lock(); + try { + while (paused) { + pauseLockCondition.await(); + } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } finally { + pauseLock.unlock(); + } + } + try { + statement.setString(1, lastReadPollingColumnValue); + resultSet = statement.executeQuery(); + metadata = resultSet.getMetaData(); + while (resultSet.next()) { + detailsMap = new HashMap<>(); + for (int i = 1; i <= metadata.getColumnCount(); i++) { + String key = metadata.getColumnName(i); + Object value = resultSet.getObject(key); + detailsMap.put(key.toLowerCase(Locale.ENGLISH), value); + } + lastReadPollingColumnValue = resultSet.getString(pollingColumn); + handleEvent(detailsMap); + } + } catch (SQLException ex) { + log.error(ex); + } finally { + CDCPollingUtil.cleanupConnection(resultSet, null, null); + } + try { + Thread.sleep((long) pollingInterval * 1000); + } catch (InterruptedException e) { + log.error("Error while polling. Current mode: " + CDCSourceConstants.MODE_POLLING, e); + } + } + } catch (SQLException ex) { + throw new CDCPollingModeException("Error in polling for changes on " + tableName + ". Current mode: " + + CDCSourceConstants.MODE_POLLING, ex); + } finally { + CDCPollingUtil.cleanupConnection(resultSet, statement, connection); + } + } + + @Override + public String getLastReadPollingColumnValue() { + return lastReadPollingColumnValue; + } + + @Override + public void setLastReadPollingColumnValue(String lastReadPollingColumnValue) { + this.lastReadPollingColumnValue = lastReadPollingColumnValue; + } +} diff --git a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/PollingStrategy.java b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/PollingStrategy.java new file mode 100644 index 0000000..612ee00 --- /dev/null +++ b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/PollingStrategy.java @@ -0,0 +1,182 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.extension.siddhi.io.cdc.source.polling.strategies; + +import com.zaxxer.hikari.HikariDataSource; +import org.apache.log4j.Logger; +import org.wso2.extension.siddhi.io.cdc.source.config.Database; +import org.wso2.extension.siddhi.io.cdc.source.config.QueryConfiguration; +import org.wso2.extension.siddhi.io.cdc.source.polling.CDCPollingModeException; +import org.wso2.extension.siddhi.io.cdc.util.CDCPollingUtil; +import org.wso2.extension.siddhi.io.cdc.util.CDCSourceConstants; +import org.wso2.extension.siddhi.io.cdc.util.MyYamlConstructor; +import org.wso2.siddhi.core.stream.input.source.SourceEventListener; +import org.wso2.siddhi.core.util.config.ConfigReader; +import org.yaml.snakeyaml.TypeDescription; +import org.yaml.snakeyaml.Yaml; + +import java.io.IOException; +import java.io.InputStream; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.SQLException; +import java.util.Map; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Abstract definition of polling strategy to poll DB changes. + */ +public abstract class PollingStrategy { + private static final Logger log = Logger.getLogger(PollingStrategy.class); + private static final String PLACE_HOLDER_TABLE_NAME = "{{TABLE_NAME}}"; + private static final String PLACE_HOLDER_COLUMN_LIST = "{{COLUMN_LIST}}"; + private static final String PLACE_HOLDER_CONDITION = "{{CONDITION}}"; + private static final String SELECT_QUERY_CONFIG_FILE = "query-config.yaml"; + private static final String RECORD_SELECT_QUERY = "recordSelectQuery"; + + private HikariDataSource dataSource; + private String selectQueryStructure = ""; + private ConfigReader configReader; + private SourceEventListener sourceEventListener; + + protected boolean paused = false; + protected ReentrantLock pauseLock = new ReentrantLock(); + protected Condition pauseLockCondition = pauseLock.newCondition(); + protected String tableName; + + public PollingStrategy(HikariDataSource dataSource, ConfigReader configReader, + SourceEventListener sourceEventListener, String tableName) { + this.dataSource = dataSource; + this.configReader = configReader; + this.sourceEventListener = sourceEventListener; + this.tableName = tableName; + } + + public abstract void poll(); + + public abstract String getLastReadPollingColumnValue(); + + public abstract void setLastReadPollingColumnValue(String lastReadPollingColumnValue); + + public void pause() { + paused = true; + } + + public void resume() { + paused = false; + try { + pauseLock.lock(); + pauseLockCondition.signal(); + } finally { + pauseLock.unlock(); + } + } + + protected Connection getConnection() { + Connection conn; + try { + conn = this.dataSource.getConnection(); + if (log.isDebugEnabled()) { + log.debug("A connection is initialized "); + } + } catch (SQLException e) { + throw new CDCPollingModeException("Error initializing datasource connection. Current mode: " + + CDCSourceConstants.MODE_POLLING, e); + } + return conn; + } + + protected String getSelectQuery(String columnList, String condition) { + String selectQuery; + if (selectQueryStructure.isEmpty()) { + //Get the database product name + String databaseName; + Connection conn = null; + try { + conn = getConnection(); + DatabaseMetaData dmd = conn.getMetaData(); + databaseName = dmd.getDatabaseProductName(); + } catch (SQLException e) { + throw new CDCPollingModeException("Error in looking up database type. Current mode: " + + CDCSourceConstants.MODE_POLLING, e); + } finally { + CDCPollingUtil.cleanupConnection(null, null, conn); + } + + //Read configs from config reader. + selectQueryStructure = configReader.readConfig(databaseName + "." + RECORD_SELECT_QUERY, ""); + + if (selectQueryStructure.isEmpty()) { + //Read configs from yaml file + QueryConfiguration queryConfiguration; + InputStream inputStream = null; + try { + MyYamlConstructor constructor = new MyYamlConstructor(QueryConfiguration.class); + TypeDescription queryTypeDescription = new TypeDescription(QueryConfiguration.class); + queryTypeDescription.putListPropertyType("databases", Database.class); + constructor.addTypeDescription(queryTypeDescription); + Yaml yaml = new Yaml(constructor); + ClassLoader classLoader = getClass().getClassLoader(); + inputStream = classLoader.getResourceAsStream(SELECT_QUERY_CONFIG_FILE); + if (inputStream == null) { + throw new CDCPollingModeException(SELECT_QUERY_CONFIG_FILE + + " is not found in the classpath. Current mode: " + CDCSourceConstants.MODE_POLLING); + } + queryConfiguration = (QueryConfiguration) yaml.load(inputStream); + } finally { + if (inputStream != null) { + try { + inputStream.close(); + } catch (IOException e) { + log.error("Failed to close the input stream for " + SELECT_QUERY_CONFIG_FILE + ". " + + "Current mode: " + CDCSourceConstants.MODE_POLLING); + } + } + } + + //Get database related select query structure + if (queryConfiguration != null) { + for (Database database : queryConfiguration.getDatabases()) { + if (database.getName().equalsIgnoreCase(databaseName)) { + selectQueryStructure = database.getSelectQuery(); + break; + } + } + } + } + + if (selectQueryStructure.isEmpty()) { + throw new CDCPollingModeException("Unsupported database: " + databaseName + ". Configure system" + + " parameter: " + databaseName + "." + RECORD_SELECT_QUERY + ". Current mode: " + + CDCSourceConstants.MODE_POLLING); + } + } + //create the select query with given constraints + selectQuery = selectQueryStructure.replace(PLACE_HOLDER_TABLE_NAME, tableName) + .replace(PLACE_HOLDER_COLUMN_LIST, columnList) + .replace(PLACE_HOLDER_CONDITION, condition); + + return selectQuery; + } + + protected void handleEvent(Map detailsMap) { + sourceEventListener.onEvent(detailsMap, null); + } +} diff --git a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/WaitOnMissingRecordPollingStrategy.java b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/WaitOnMissingRecordPollingStrategy.java new file mode 100644 index 0000000..47eb97e --- /dev/null +++ b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/WaitOnMissingRecordPollingStrategy.java @@ -0,0 +1,180 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.extension.siddhi.io.cdc.source.polling.strategies; + +import com.zaxxer.hikari.HikariDataSource; +import org.apache.log4j.Logger; +import org.wso2.extension.siddhi.io.cdc.source.polling.CDCPollingModeException; +import org.wso2.extension.siddhi.io.cdc.util.CDCPollingUtil; +import org.wso2.extension.siddhi.io.cdc.util.CDCSourceConstants; +import org.wso2.siddhi.core.stream.input.source.SourceEventListener; +import org.wso2.siddhi.core.util.config.ConfigReader; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; + +/** + * Polling strategy implementation to wait-on-missing records. If the polling strategy identifies a missed record in + * the polled chunk, it holds the rest of the processing until the record comes in. This uses {@code pollingColumn}, + * {@code pollingInterval}, {@code missedRecordRetryIntervalMS} and {@code missedRecordWaitingTimeoutMS}. + */ +public class WaitOnMissingRecordPollingStrategy extends PollingStrategy { + private static final Logger log = Logger.getLogger(WaitOnMissingRecordPollingStrategy.class); + + private String pollingColumn; + private int pollingInterval; + private int retryIntervalMS; + private int waitingTimeoutMS; + // The 'wait on missed records' events only work with numeric type. Hence assuming the polling.column is a number. + private Integer lastReadPollingColumnValue; + + public WaitOnMissingRecordPollingStrategy(HikariDataSource dataSource, ConfigReader configReader, + SourceEventListener sourceEventListener, String tableName, + String pollingColumn, int pollingInterval, + int retryIntervalMS, int waitingTimeoutMS) { + super(dataSource, configReader, sourceEventListener, tableName); + this.pollingColumn = pollingColumn; + this.pollingInterval = pollingInterval; + this.retryIntervalMS = retryIntervalMS; + this.waitingTimeoutMS = waitingTimeoutMS; + } + + @Override + public void poll() { + String selectQuery; + ResultSetMetaData metadata; + Map detailsMap; + Connection connection = getConnection(); + PreparedStatement statement = null; + ResultSet resultSet = null; + boolean breakOnMissingRecord = false; + if (retryIntervalMS <= 0) { + retryIntervalMS = pollingInterval * 1000; + log.debug("Missed record retry interval is set to " + retryIntervalMS + "ms."); + } + try { + // If lastReadPollingColumnValue is null, assign it with last record of the table. + if (lastReadPollingColumnValue == null) { + selectQuery = getSelectQuery("MAX(" + pollingColumn + ")", "").trim(); + statement = connection.prepareStatement(selectQuery); + resultSet = statement.executeQuery(); + if (resultSet.next()) { + lastReadPollingColumnValue = resultSet.getInt(1); + } + // If the table is empty, set last offset to a negative value. + if (lastReadPollingColumnValue == null) { + lastReadPollingColumnValue = -1; + } + } + + selectQuery = getSelectQuery("*", "WHERE " + pollingColumn + " > ?"); + statement = connection.prepareStatement(selectQuery); + + int waitingFor = -1; + long waitingFrom = -1; + + while (true) { + if (paused) { + pauseLock.lock(); + try { + while (paused) { + pauseLockCondition.await(); + } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } finally { + pauseLock.unlock(); + } + } + try { + statement.setInt(1, lastReadPollingColumnValue); + resultSet = statement.executeQuery(); + metadata = resultSet.getMetaData(); + while (resultSet.next()) { + int currentPollingColumnValue = resultSet.getInt(pollingColumn); + if (currentPollingColumnValue - lastReadPollingColumnValue > 1) { + if (waitingFor == -1) { + // This is the first time to wait for the current record. Hence set the expected record + // id and the starting timestamp. + waitingFor = lastReadPollingColumnValue + 1; + waitingFrom = System.currentTimeMillis(); + } + + if ((waitingTimeoutMS == -1) || + (waitingFrom + waitingTimeoutMS >= System.currentTimeMillis())) { + log.debug("Missing record found at " + waitingFor + ". Hence pausing the process and " + + "retry in " + retryIntervalMS + "ms."); + breakOnMissingRecord = true; + break; + } + } + if (waitingFor > -1) { + log.debug("Missed record received or timed-out. Hence resuming the process."); + waitingFor = -1; + waitingFrom = -1; + } + detailsMap = new HashMap<>(); + for (int i = 1; i <= metadata.getColumnCount(); i++) { + String key = metadata.getColumnName(i); + Object value = resultSet.getObject(key); + detailsMap.put(key.toLowerCase(Locale.ENGLISH), value); + } + lastReadPollingColumnValue = resultSet.getInt(pollingColumn); + handleEvent(detailsMap); + } + } catch (SQLException ex) { + log.error(ex); + } finally { + CDCPollingUtil.cleanupConnection(resultSet, null, null); + } + try { + if (breakOnMissingRecord) { + Thread.sleep(retryIntervalMS); + breakOnMissingRecord = false; + } else { + Thread.sleep((long) pollingInterval * 1000); + } + } catch (InterruptedException e) { + log.error("Error while polling. Current mode: " + CDCSourceConstants.MODE_POLLING, e); + } + } + } catch (SQLException ex) { + throw new CDCPollingModeException("Error in polling for changes on " + tableName + ". Current mode: " + + CDCSourceConstants.MODE_POLLING, ex); + } finally { + CDCPollingUtil.cleanupConnection(resultSet, statement, connection); + } + } + + @Override + public String getLastReadPollingColumnValue() { + return String.valueOf(lastReadPollingColumnValue); + } + + @Override + public void setLastReadPollingColumnValue(String lastReadPollingColumnValue) { + this.lastReadPollingColumnValue = Integer.parseInt(lastReadPollingColumnValue); + } +} diff --git a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/util/CDCSourceConstants.java b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/util/CDCSourceConstants.java index 38b3afe..0d81853 100644 --- a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/util/CDCSourceConstants.java +++ b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/util/CDCSourceConstants.java @@ -70,4 +70,7 @@ public class CDCSourceConstants { public static final int DEFAULT_POLLING_INTERVAL_SECONDS = 1; public static final String DATASOURCE_NAME = "datasource.name"; public static final String JNDI_RESOURCE = "jndi.resource"; + public static final String WAIT_ON_MISSED_RECORD = "wait.on.missed.record"; + public static final String MISSED_RECORD_RETRY_INTERVAL_MS = "missed.record.retry.interval"; + public static final String MISSED_RECORD_WAITING_TIMEOUT_MS = "missed.record.waiting.timeout"; } diff --git a/findbugs-exclude.xml b/findbugs-exclude.xml index eac53d2..ab812c4 100644 --- a/findbugs-exclude.xml +++ b/findbugs-exclude.xml @@ -19,9 +19,9 @@ --> - + - + From ef972d2f5ef0991b6c9b3df06e6a022ebe488472 Mon Sep 17 00:00:00 2001 From: lasanthaS Date: Fri, 18 Oct 2019 12:46:29 +0530 Subject: [PATCH 2/5] Remove polling mode retry interval --- .../siddhi/io/cdc/source/CDCSource.java | 61 +++++++++---------- .../io/cdc/source/polling/CDCPoller.java | 5 +- .../WaitOnMissingRecordPollingStrategy.java | 38 ++++++------ .../io/cdc/util/CDCSourceConstants.java | 3 +- 4 files changed, 50 insertions(+), 57 deletions(-) diff --git a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/CDCSource.java b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/CDCSource.java index 398458e..d1a13a2 100644 --- a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/CDCSource.java +++ b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/CDCSource.java @@ -187,32 +187,22 @@ ), @Parameter( name = "wait.on.missed.record", - description = "Indicates whether you need to wait on missing/out-of-order records. When " + - "this flag is set to true the process will be hold once it identifies a missing " + - "record. The missing record is identified by the sequence of the polling.column. " + - "\nThis can be used only with a number field and should not be used with time values " + - "as it will not be an incremental value." + - "\nThis should be enabled ONLY in a situation where the records are written out of " + - "order, e.g. in a concurrent system as this degrades the performance.", + description = "Indicates whether the process needs to wait on missing/out-of-order records. " + + "\nWhen this flag is set to 'true' the process will be held once it identifies a " + + "missing record. The missing recrod is identified by the sequence of the " + + "polling.column value. This can be used only with number fields and not recommended " + + "to use with time values as it will not be sequential." + + "\nThis should be enabled ONLY where the records can be written out-of-order, (eg. " + + "concurrent writers) as this degrades the performance.", type = DataType.BOOL, optional = true, defaultValue = "false" ), - @Parameter( - name = "missed.record.retry.interval", - description = "The time interval (specified in milliseconds) to wait and retry if the " + - "process identifies a missing/out-of-order record. This should be used along with " + - "the wait.on.missed.record parameter.", - type = DataType.INT, - optional = true, - defaultValue = "-1" - ), @Parameter( name = "missed.record.waiting.timeout", - description = "The timeout (specified in milliseconds) to retry for missing/out-of-order " + - "record. This should be used along with the wait.on.missed.record parameter. If the " + - "parameter is not set, the process will indefinitely wait for the record without " + - "proceeding.", + description = "The timeout (specified in seconds) to retry for missing/out-of-order record. " + + "This should be used along with the wait.on.missed.record parameter. If the " + + "parameter is not set, the process will indefinitely wait for the missing record.", type = DataType.INT, optional = true, defaultValue = "-1" @@ -284,6 +274,20 @@ "\ndefine stream inputStream (name string);", description = "In this example, the CDC source polls the 'students' table for inserts " + "and updates. The polling column is a timestamp field." + ), + @Example( + syntax = "@source(type='cdc', jdbc.driver.name='com.mysql.jdbc.Driver', " + + "url='jdbc:mysql://localhost:3306/SimpleDB', username='cdcuser', " + + "password='pswd4cdc', table.name='students', mode='polling', polling.column='id', " + + "operation='insert', wait.on.missed.record='true', " + + "missed.record.waiting.timeout='10'," + + "\n@map(type='keyvalue'), " + + "\n@attributes(batch_no='batch_no', item='item', qty='qty'))" + + "\ndefine stream inputStream (id int, name string);", + description = "In this example, the CDC source polls the 'students' table for inserts. The " + + "polling column is a numeric field. This source expects the records in the database " + + "to be written concurrently/out-of-order so it waits if it encounters a missing " + + "record. If the record doesn't appear within 10 seconds it resumes the process." ) } ) @@ -373,7 +377,7 @@ public void init(SourceEventListener sourceEventListener, OptionHolder optionHol String pollingColumn = optionHolder.validateAndGetStaticValue(CDCSourceConstants.POLLING_COLUMN); boolean isDatasourceNameAvailable = optionHolder.isOptionExists(CDCSourceConstants.DATASOURCE_NAME); boolean isJndiResourceAvailable = optionHolder.isOptionExists(CDCSourceConstants.JNDI_RESOURCE); - int pollingInterval = Integer.parseInt( + pollingInterval = Integer.parseInt( optionHolder.validateAndGetStaticValue(CDCSourceConstants.POLLING_INTERVAL, Integer.toString(CDCSourceConstants.DEFAULT_POLLING_INTERVAL_SECONDS))); validatePollingModeParameters(); @@ -381,25 +385,21 @@ public void init(SourceEventListener sourceEventListener, OptionHolder optionHol null); boolean waitOnMissedRecord = Boolean.parseBoolean( optionHolder.validateAndGetStaticValue(CDCSourceConstants.WAIT_ON_MISSED_RECORD, "false")); - int missedRecordRetryIntervalMS = Integer.parseInt( - optionHolder.validateAndGetStaticValue(CDCSourceConstants.MISSED_RECORD_RETRY_INTERVAL_MS, - "-1")); - int missedRecordWaitingTimeoutMS = Integer.parseInt( + int missedRecordWaitingTimeout = Integer.parseInt( optionHolder.validateAndGetStaticValue( - CDCSourceConstants.MISSED_RECORD_WAITING_TIMEOUT_MS, "-1")); + CDCSourceConstants.MISSED_RECORD_WAITING_TIMEOUT, "-1")); if (isDatasourceNameAvailable) { String datasourceName = optionHolder.validateAndGetStaticValue(CDCSourceConstants.DATASOURCE_NAME); cdcPoller = new CDCPoller(null, null, null, tableName, null, datasourceName, null, pollingColumn, pollingInterval, poolPropertyString, sourceEventListener, configReader, waitOnMissedRecord, - missedRecordRetryIntervalMS, missedRecordWaitingTimeoutMS); + missedRecordWaitingTimeout); } else if (isJndiResourceAvailable) { String jndiResource = optionHolder.validateAndGetStaticValue(CDCSourceConstants.JNDI_RESOURCE); cdcPoller = new CDCPoller(null, null, null, tableName, null, null, jndiResource, pollingColumn, pollingInterval, poolPropertyString, - sourceEventListener, configReader, waitOnMissedRecord, missedRecordRetryIntervalMS, - missedRecordWaitingTimeoutMS); + sourceEventListener, configReader, waitOnMissedRecord, missedRecordWaitingTimeout); } else { String driverClassName; try { @@ -414,8 +414,7 @@ public void init(SourceEventListener sourceEventListener, OptionHolder optionHol } cdcPoller = new CDCPoller(url, username, password, tableName, driverClassName, null, null, pollingColumn, pollingInterval, poolPropertyString, - sourceEventListener, configReader, waitOnMissedRecord, missedRecordRetryIntervalMS, - missedRecordWaitingTimeoutMS); + sourceEventListener, configReader, waitOnMissedRecord, missedRecordWaitingTimeout); } break; default: diff --git a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/CDCPoller.java b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/CDCPoller.java index 3d0a736..a2de6b5 100644 --- a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/CDCPoller.java +++ b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/CDCPoller.java @@ -64,7 +64,7 @@ public CDCPoller(String url, String username, String password, String tableName, String datasourceName, String jndiResource, String pollingColumn, int pollingInterval, String poolPropertyString, SourceEventListener sourceEventListener, ConfigReader configReader, boolean waitOnMissedRecord, - int missedRecordRetryIntervalMS, int missedRecordWaitingTimeoutMS) { + int missedRecordWaitingTimeout) { this.url = url; this.tableName = tableName; this.username = username; @@ -84,8 +84,7 @@ public CDCPoller(String url, String username, String password, String tableName, if (waitOnMissedRecord) { log.debug(WaitOnMissingRecordPollingStrategy.class + " is selected as the polling strategy."); this.pollingStrategy = new WaitOnMissingRecordPollingStrategy(dataSource, configReader, sourceEventListener, - tableName, pollingColumn, pollingInterval, missedRecordRetryIntervalMS, - missedRecordWaitingTimeoutMS); + tableName, pollingColumn, pollingInterval, missedRecordWaitingTimeout); } else { log.debug(DefaultPollingStrategy.class + " is selected as the polling strategy"); this.pollingStrategy = new DefaultPollingStrategy(dataSource, configReader, sourceEventListener, diff --git a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/WaitOnMissingRecordPollingStrategy.java b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/WaitOnMissingRecordPollingStrategy.java index 47eb97e..3efcbaf 100644 --- a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/WaitOnMissingRecordPollingStrategy.java +++ b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/WaitOnMissingRecordPollingStrategy.java @@ -45,20 +45,17 @@ public class WaitOnMissingRecordPollingStrategy extends PollingStrategy { private String pollingColumn; private int pollingInterval; - private int retryIntervalMS; - private int waitingTimeoutMS; + private int waitTimeout; // The 'wait on missed records' events only work with numeric type. Hence assuming the polling.column is a number. private Integer lastReadPollingColumnValue; public WaitOnMissingRecordPollingStrategy(HikariDataSource dataSource, ConfigReader configReader, SourceEventListener sourceEventListener, String tableName, - String pollingColumn, int pollingInterval, - int retryIntervalMS, int waitingTimeoutMS) { + String pollingColumn, int pollingInterval, int waitTimeout) { super(dataSource, configReader, sourceEventListener, tableName); this.pollingColumn = pollingColumn; this.pollingInterval = pollingInterval; - this.retryIntervalMS = retryIntervalMS; - this.waitingTimeoutMS = waitingTimeoutMS; + this.waitTimeout = waitTimeout; } @Override @@ -70,10 +67,8 @@ public void poll() { PreparedStatement statement = null; ResultSet resultSet = null; boolean breakOnMissingRecord = false; - if (retryIntervalMS <= 0) { - retryIntervalMS = pollingInterval * 1000; - log.debug("Missed record retry interval is set to " + retryIntervalMS + "ms."); - } + int waitingFor = -1; + long waitingFrom = -1; try { // If lastReadPollingColumnValue is null, assign it with last record of the table. if (lastReadPollingColumnValue == null) { @@ -92,9 +87,6 @@ public void poll() { selectQuery = getSelectQuery("*", "WHERE " + pollingColumn + " > ?"); statement = connection.prepareStatement(selectQuery); - int waitingFor = -1; - long waitingFrom = -1; - while (true) { if (paused) { pauseLock.lock(); @@ -113,25 +105,31 @@ public void poll() { resultSet = statement.executeQuery(); metadata = resultSet.getMetaData(); while (resultSet.next()) { + boolean isTimedout = false; int currentPollingColumnValue = resultSet.getInt(pollingColumn); if (currentPollingColumnValue - lastReadPollingColumnValue > 1) { if (waitingFor == -1) { // This is the first time to wait for the current record. Hence set the expected record - // id and the starting timestamp. + // id and the current timestamp. waitingFor = lastReadPollingColumnValue + 1; waitingFrom = System.currentTimeMillis(); } - if ((waitingTimeoutMS == -1) || - (waitingFrom + waitingTimeoutMS >= System.currentTimeMillis())) { + isTimedout = waitTimeout > -1 && waitingFrom + waitTimeout < System.currentTimeMillis(); + if (!isTimedout) { log.debug("Missing record found at " + waitingFor + ". Hence pausing the process and " + - "retry in " + retryIntervalMS + "ms."); + "retry in " + pollingInterval + " seconds."); breakOnMissingRecord = true; break; } } if (waitingFor > -1) { - log.debug("Missed record received or timed-out. Hence resuming the process."); + if (isTimedout) { + log.debug("Waiting for missed record " + waitingFor + " timed-out. Hence resuming " + + "the process."); + } else { + log.debug("Received the missed record " + waitingFor + ". Hence resuming the process."); + } waitingFor = -1; waitingFrom = -1; } @@ -151,11 +149,9 @@ public void poll() { } try { if (breakOnMissingRecord) { - Thread.sleep(retryIntervalMS); breakOnMissingRecord = false; - } else { - Thread.sleep((long) pollingInterval * 1000); } + Thread.sleep((long) pollingInterval * 1000); } catch (InterruptedException e) { log.error("Error while polling. Current mode: " + CDCSourceConstants.MODE_POLLING, e); } diff --git a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/util/CDCSourceConstants.java b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/util/CDCSourceConstants.java index 0d81853..bd18b76 100644 --- a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/util/CDCSourceConstants.java +++ b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/util/CDCSourceConstants.java @@ -71,6 +71,5 @@ public class CDCSourceConstants { public static final String DATASOURCE_NAME = "datasource.name"; public static final String JNDI_RESOURCE = "jndi.resource"; public static final String WAIT_ON_MISSED_RECORD = "wait.on.missed.record"; - public static final String MISSED_RECORD_RETRY_INTERVAL_MS = "missed.record.retry.interval"; - public static final String MISSED_RECORD_WAITING_TIMEOUT_MS = "missed.record.waiting.timeout"; + public static final String MISSED_RECORD_WAITING_TIMEOUT = "missed.record.waiting.timeout"; } From 5f55364bb8bdf372e15219dfe0b6bc422daec77d Mon Sep 17 00:00:00 2001 From: lasanthaS Date: Fri, 18 Oct 2019 12:47:19 +0530 Subject: [PATCH 3/5] Add test case --- .../cdc/source/TestCaseOfCDCPollingMode.java | 88 +++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/component/src/test/java/org/wso2/extension/siddhi/io/cdc/source/TestCaseOfCDCPollingMode.java b/component/src/test/java/org/wso2/extension/siddhi/io/cdc/source/TestCaseOfCDCPollingMode.java index 389eaa4..d8a5838 100644 --- a/component/src/test/java/org/wso2/extension/siddhi/io/cdc/source/TestCaseOfCDCPollingMode.java +++ b/component/src/test/java/org/wso2/extension/siddhi/io/cdc/source/TestCaseOfCDCPollingMode.java @@ -300,4 +300,92 @@ public void receive(Event[] events) { siddhiAppRuntime.shutdown(); siddhiManager.shutdown(); } + + @Test + public void testOutOfOrderRecords() throws InterruptedException { + log.info("------------------------------------------------------------------------------------------------"); + log.info("CDC TestCase: Test missed/out-of-order events in polling mode."); + log.info("------------------------------------------------------------------------------------------------"); + + SiddhiManager siddhiManager = new SiddhiManager(); + + int pollingInterval = 1; + String cdcinStreamDefinition = "@source(type = 'cdc', " + + "mode='polling', " + + "polling.column='" + pollingColumn + "', " + + "jdbc.driver.name='" + jdbcDriverName + "', " + + "url = '" + databaseURL + "', " + + "username = '" + username + "', " + + "password = '" + password + "', " + + "table.name = 'students', " + + "polling.interval = '" + pollingInterval + "', " + + "operation = 'insert', " + + "wait.on.missed.record = 'true'," + + "missed.record.waiting.timeout = '10'," + + "@map(type='keyvalue'), " + + "@attributes(id = 'id', name = 'name'))" + + "define stream outputStream (id int, name string);\n"; + + String rdbmsStoreDefinition = "define stream inputStream (id int, name string);" + + "@Store(type='rdbms', " + + "jdbc.url='" + databaseURL + "', " + + "username='" + username + "', " + + "password='" + password + "' , " + + "jdbc.driver.name='" + jdbcDriverName + "')" + + "define table students (id int, name string);"; + + String rdbmsQuery = "@info(name='query2') " + + "from inputStream " + + "insert into students;"; + + QueryCallback rdbmsQueryCallback = new QueryCallback() { + @Override + public void receive(long timestamp, Event[] inEvents, Event[] removeEvents) { + for (Event event : inEvents) { + log.info("insert done: " + event); + } + } + }; + + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(cdcinStreamDefinition + + rdbmsStoreDefinition + rdbmsQuery); + siddhiAppRuntime.addCallback("query2", rdbmsQueryCallback); + + StreamCallback outputStreamCallback = new StreamCallback() { + @Override + public void receive(Event[] events) { + for (Event event : events) { + eventCount.getAndIncrement(); + log.info(eventCount + ". " + event); + } + } + }; + + siddhiAppRuntime.addCallback("outputStream", outputStreamCallback); + siddhiAppRuntime.start(); + + // Wait till CDC poller initializes. + Thread.sleep(5000); + + // Do inserts and wait CDC app to capture the events. + InputHandler inputHandler = siddhiAppRuntime.getInputHandler("inputStream"); + Object[] ann = new Object[]{11, "Ann"}; + Object[] bob = new Object[]{12, "Bob"}; + Object[] charles = new Object[]{13, "Charles"}; + Object[] david = new Object[]{14, "David"}; + + inputHandler.send(ann); + inputHandler.send(bob); + inputHandler.send(david); + Thread.sleep(1000); + inputHandler.send(charles); + + SiddhiTestHelper.waitForEvents(waitTime, 4, eventCount, timeout); + + // Assert received event count. + Assert.assertEquals(eventCount.get(), 4); + + siddhiAppRuntime.shutdown(); + siddhiManager.shutdown(); + } } From b22150953abc920effa709b3ea9cfb11c319fd20 Mon Sep 17 00:00:00 2001 From: lasanthaS Date: Fri, 18 Oct 2019 14:37:04 +0530 Subject: [PATCH 4/5] Modify error messages to be more descriptive --- .../siddhi/io/cdc/source/CDCSource.java | 13 +++++--- .../io/cdc/source/polling/CDCPoller.java | 16 +++++---- .../strategies/DefaultPollingStrategy.java | 12 +++---- .../polling/strategies/PollingStrategy.java | 33 ++++++++++--------- .../WaitOnMissingRecordPollingStrategy.java | 27 +++++++-------- 5 files changed, 55 insertions(+), 46 deletions(-) diff --git a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/CDCSource.java b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/CDCSource.java index d1a13a2..f2b549a 100644 --- a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/CDCSource.java +++ b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/CDCSource.java @@ -309,11 +309,14 @@ public class CDCSource extends Source { public void init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] requestedTransportPropertyNames, ConfigReader configReader, SiddhiAppContext siddhiAppContext) { + + //initialize mode mode = optionHolder.validateAndGetStaticValue(CDCSourceConstants.MODE, CDCSourceConstants.MODE_LISTENING); //initialize common mandatory parameters String tableName = optionHolder.validateAndGetOption(CDCSourceConstants.TABLE_NAME).getValue(); + String siddhiAppName = siddhiAppContext.getName(); switch (mode) { case CDCSourceConstants.MODE_LISTENING: @@ -321,8 +324,6 @@ public void init(SourceEventListener sourceEventListener, OptionHolder optionHol String url = optionHolder.validateAndGetOption(CDCSourceConstants.DATABASE_CONNECTION_URL).getValue(); String username = optionHolder.validateAndGetOption(CDCSourceConstants.USERNAME).getValue(); String password = optionHolder.validateAndGetOption(CDCSourceConstants.PASSWORD).getValue(); - - String siddhiAppName = siddhiAppContext.getName(); String streamName = sourceEventListener.getStreamDefinition().getId(); //initialize mandatory parameters @@ -394,12 +395,13 @@ public void init(SourceEventListener sourceEventListener, OptionHolder optionHol cdcPoller = new CDCPoller(null, null, null, tableName, null, datasourceName, null, pollingColumn, pollingInterval, poolPropertyString, sourceEventListener, configReader, waitOnMissedRecord, - missedRecordWaitingTimeout); + missedRecordWaitingTimeout, siddhiAppName); } else if (isJndiResourceAvailable) { String jndiResource = optionHolder.validateAndGetStaticValue(CDCSourceConstants.JNDI_RESOURCE); cdcPoller = new CDCPoller(null, null, null, tableName, null, null, jndiResource, pollingColumn, pollingInterval, poolPropertyString, - sourceEventListener, configReader, waitOnMissedRecord, missedRecordWaitingTimeout); + sourceEventListener, configReader, waitOnMissedRecord, missedRecordWaitingTimeout, + siddhiAppName); } else { String driverClassName; try { @@ -414,7 +416,8 @@ public void init(SourceEventListener sourceEventListener, OptionHolder optionHol } cdcPoller = new CDCPoller(url, username, password, tableName, driverClassName, null, null, pollingColumn, pollingInterval, poolPropertyString, - sourceEventListener, configReader, waitOnMissedRecord, missedRecordWaitingTimeout); + sourceEventListener, configReader, waitOnMissedRecord, missedRecordWaitingTimeout, + siddhiAppName); } break; default: diff --git a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/CDCPoller.java b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/CDCPoller.java index a2de6b5..950b6c7 100644 --- a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/CDCPoller.java +++ b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/CDCPoller.java @@ -57,6 +57,8 @@ public class CDCPoller implements Runnable { private String poolPropertyString; private String jndiResource; private boolean isLocalDataSource = false; + private String appName; + private String streamName; private PollingStrategy pollingStrategy; @@ -64,7 +66,7 @@ public CDCPoller(String url, String username, String password, String tableName, String datasourceName, String jndiResource, String pollingColumn, int pollingInterval, String poolPropertyString, SourceEventListener sourceEventListener, ConfigReader configReader, boolean waitOnMissedRecord, - int missedRecordWaitingTimeout) { + int missedRecordWaitingTimeout, String appName) { this.url = url; this.tableName = tableName; this.username = username; @@ -73,22 +75,24 @@ public CDCPoller(String url, String username, String password, String tableName, this.poolPropertyString = poolPropertyString; this.datasourceName = datasourceName; this.jndiResource = jndiResource; + this.appName = appName; + this.streamName = sourceEventListener.getStreamDefinition().getId(); try { initializeDatasource(); } catch (NamingException e) { - throw new CDCPollingModeException("Error in initializing connection for " + tableName + ". " + - "Current mode: " + CDCSourceConstants.MODE_POLLING, e); + throw new CDCPollingModeException("Error in initializing connection for " + tableName + ". {mode=" + + CDCSourceConstants.MODE_POLLING + ", app=" + appName + ", stream=" + streamName + "}", e); } if (waitOnMissedRecord) { log.debug(WaitOnMissingRecordPollingStrategy.class + " is selected as the polling strategy."); this.pollingStrategy = new WaitOnMissingRecordPollingStrategy(dataSource, configReader, sourceEventListener, - tableName, pollingColumn, pollingInterval, missedRecordWaitingTimeout); + tableName, pollingColumn, pollingInterval, missedRecordWaitingTimeout, appName); } else { - log.debug(DefaultPollingStrategy.class + " is selected as the polling strategy"); + log.debug(DefaultPollingStrategy.class + " is selected as the polling strategy."); this.pollingStrategy = new DefaultPollingStrategy(dataSource, configReader, sourceEventListener, - tableName, pollingColumn, pollingInterval); + tableName, pollingColumn, pollingInterval, appName); } } diff --git a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/DefaultPollingStrategy.java b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/DefaultPollingStrategy.java index bd2f0c7..90e656f 100644 --- a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/DefaultPollingStrategy.java +++ b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/DefaultPollingStrategy.java @@ -22,7 +22,6 @@ import org.apache.log4j.Logger; import org.wso2.extension.siddhi.io.cdc.source.polling.CDCPollingModeException; import org.wso2.extension.siddhi.io.cdc.util.CDCPollingUtil; -import org.wso2.extension.siddhi.io.cdc.util.CDCSourceConstants; import org.wso2.siddhi.core.stream.input.source.SourceEventListener; import org.wso2.siddhi.core.util.config.ConfigReader; @@ -48,8 +47,8 @@ public class DefaultPollingStrategy extends PollingStrategy { public DefaultPollingStrategy(HikariDataSource dataSource, ConfigReader configReader, SourceEventListener sourceEventListener, String tableName, String pollingColumn, - int pollingInterval) { - super(dataSource, configReader, sourceEventListener, tableName); + int pollingInterval, String appName) { + super(dataSource, configReader, sourceEventListener, tableName, appName); this.pollingColumn = pollingColumn; this.pollingInterval = pollingInterval; } @@ -109,19 +108,18 @@ public void poll() { handleEvent(detailsMap); } } catch (SQLException ex) { - log.error(ex); + log.error(buildError("Error occurred while processing records in table %s.", tableName), ex); } finally { CDCPollingUtil.cleanupConnection(resultSet, null, null); } try { Thread.sleep((long) pollingInterval * 1000); } catch (InterruptedException e) { - log.error("Error while polling. Current mode: " + CDCSourceConstants.MODE_POLLING, e); + log.error(buildError("Error while polling the table %s.", tableName), e); } } } catch (SQLException ex) { - throw new CDCPollingModeException("Error in polling for changes on " + tableName + ". Current mode: " + - CDCSourceConstants.MODE_POLLING, ex); + throw new CDCPollingModeException(buildError("Error in polling for changes on %s.", tableName), ex); } finally { CDCPollingUtil.cleanupConnection(resultSet, statement, connection); } diff --git a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/PollingStrategy.java b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/PollingStrategy.java index 612ee00..d56f726 100644 --- a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/PollingStrategy.java +++ b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/PollingStrategy.java @@ -55,6 +55,8 @@ public abstract class PollingStrategy { private String selectQueryStructure = ""; private ConfigReader configReader; private SourceEventListener sourceEventListener; + private String appName; + private String streamName; protected boolean paused = false; protected ReentrantLock pauseLock = new ReentrantLock(); @@ -62,11 +64,13 @@ public abstract class PollingStrategy { protected String tableName; public PollingStrategy(HikariDataSource dataSource, ConfigReader configReader, - SourceEventListener sourceEventListener, String tableName) { + SourceEventListener sourceEventListener, String tableName, String appName) { this.dataSource = dataSource; this.configReader = configReader; this.sourceEventListener = sourceEventListener; this.tableName = tableName; + this.appName = appName; + this.streamName = sourceEventListener.getStreamDefinition().getId(); } public abstract void poll(); @@ -93,12 +97,9 @@ protected Connection getConnection() { Connection conn; try { conn = this.dataSource.getConnection(); - if (log.isDebugEnabled()) { - log.debug("A connection is initialized "); - } + log.debug("A connection is initialized."); } catch (SQLException e) { - throw new CDCPollingModeException("Error initializing datasource connection. Current mode: " + - CDCSourceConstants.MODE_POLLING, e); + throw new CDCPollingModeException(buildError("Error initializing datasource connection."), e); } return conn; } @@ -114,8 +115,7 @@ protected String getSelectQuery(String columnList, String condition) { DatabaseMetaData dmd = conn.getMetaData(); databaseName = dmd.getDatabaseProductName(); } catch (SQLException e) { - throw new CDCPollingModeException("Error in looking up database type. Current mode: " + - CDCSourceConstants.MODE_POLLING, e); + throw new CDCPollingModeException(buildError("Error in looking up database type."), e); } finally { CDCPollingUtil.cleanupConnection(null, null, conn); } @@ -136,8 +136,8 @@ protected String getSelectQuery(String columnList, String condition) { ClassLoader classLoader = getClass().getClassLoader(); inputStream = classLoader.getResourceAsStream(SELECT_QUERY_CONFIG_FILE); if (inputStream == null) { - throw new CDCPollingModeException(SELECT_QUERY_CONFIG_FILE - + " is not found in the classpath. Current mode: " + CDCSourceConstants.MODE_POLLING); + throw new CDCPollingModeException(buildError("%s is not found in the classpath", + SELECT_QUERY_CONFIG_FILE)); } queryConfiguration = (QueryConfiguration) yaml.load(inputStream); } finally { @@ -145,8 +145,7 @@ protected String getSelectQuery(String columnList, String condition) { try { inputStream.close(); } catch (IOException e) { - log.error("Failed to close the input stream for " + SELECT_QUERY_CONFIG_FILE + ". " + - "Current mode: " + CDCSourceConstants.MODE_POLLING); + log.error(buildError("Failed to close the input stream for %s.", SELECT_QUERY_CONFIG_FILE)); } } } @@ -163,9 +162,8 @@ protected String getSelectQuery(String columnList, String condition) { } if (selectQueryStructure.isEmpty()) { - throw new CDCPollingModeException("Unsupported database: " + databaseName + ". Configure system" + - " parameter: " + databaseName + "." + RECORD_SELECT_QUERY + ". Current mode: " + - CDCSourceConstants.MODE_POLLING); + throw new CDCPollingModeException(buildError("Unsupported database: %s. Configure system " + + "parameter: %s.%s.", databaseName, databaseName, RECORD_SELECT_QUERY)); } } //create the select query with given constraints @@ -179,4 +177,9 @@ protected String getSelectQuery(String columnList, String condition) { protected void handleEvent(Map detailsMap) { sourceEventListener.onEvent(detailsMap, null); } + + protected String buildError(String message, Object... args) { + return String.format(message, args) + " {mode=" + CDCSourceConstants.MODE_POLLING + ", app=" + appName + + ", stream=" + streamName + "}"; + } } diff --git a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/WaitOnMissingRecordPollingStrategy.java b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/WaitOnMissingRecordPollingStrategy.java index 3efcbaf..28722a9 100644 --- a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/WaitOnMissingRecordPollingStrategy.java +++ b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/WaitOnMissingRecordPollingStrategy.java @@ -22,7 +22,6 @@ import org.apache.log4j.Logger; import org.wso2.extension.siddhi.io.cdc.source.polling.CDCPollingModeException; import org.wso2.extension.siddhi.io.cdc.util.CDCPollingUtil; -import org.wso2.extension.siddhi.io.cdc.util.CDCSourceConstants; import org.wso2.siddhi.core.stream.input.source.SourceEventListener; import org.wso2.siddhi.core.util.config.ConfigReader; @@ -51,8 +50,9 @@ public class WaitOnMissingRecordPollingStrategy extends PollingStrategy { public WaitOnMissingRecordPollingStrategy(HikariDataSource dataSource, ConfigReader configReader, SourceEventListener sourceEventListener, String tableName, - String pollingColumn, int pollingInterval, int waitTimeout) { - super(dataSource, configReader, sourceEventListener, tableName); + String pollingColumn, int pollingInterval, int waitTimeout, + String appName) { + super(dataSource, configReader, sourceEventListener, tableName, appName); this.pollingColumn = pollingColumn; this.pollingInterval = pollingInterval; this.waitTimeout = waitTimeout; @@ -117,18 +117,20 @@ public void poll() { isTimedout = waitTimeout > -1 && waitingFrom + waitTimeout < System.currentTimeMillis(); if (!isTimedout) { - log.debug("Missing record found at " + waitingFor + ". Hence pausing the process and " + - "retry in " + pollingInterval + " seconds."); + log.debug("Missed record found at " + waitingFor + " in table " + tableName + + ". Hence pausing the process and " + "retry in " + pollingInterval + + " seconds."); breakOnMissingRecord = true; break; } } if (waitingFor > -1) { if (isTimedout) { - log.debug("Waiting for missed record " + waitingFor + " timed-out. Hence resuming " + - "the process."); + log.debug("Waiting for missed record " + waitingFor + " in table " + tableName + + " timed-out. Hence resuming the process."); } else { - log.debug("Received the missed record " + waitingFor + ". Hence resuming the process."); + log.debug("Received the missed record " + waitingFor + " in table " + tableName + + ". Hence resuming the process."); } waitingFor = -1; waitingFrom = -1; @@ -142,8 +144,8 @@ public void poll() { lastReadPollingColumnValue = resultSet.getInt(pollingColumn); handleEvent(detailsMap); } - } catch (SQLException ex) { - log.error(ex); + } catch (SQLException e) { + log.error(buildError("Error occurred while processing records in table %s.", tableName), e); } finally { CDCPollingUtil.cleanupConnection(resultSet, null, null); } @@ -153,12 +155,11 @@ public void poll() { } Thread.sleep((long) pollingInterval * 1000); } catch (InterruptedException e) { - log.error("Error while polling. Current mode: " + CDCSourceConstants.MODE_POLLING, e); + log.error(buildError("Error while polling the table %s.", tableName), e); } } } catch (SQLException ex) { - throw new CDCPollingModeException("Error in polling for changes on " + tableName + ". Current mode: " + - CDCSourceConstants.MODE_POLLING, ex); + throw new CDCPollingModeException(buildError("Error in polling for changes on %s.", tableName), ex); } finally { CDCPollingUtil.cleanupConnection(resultSet, statement, connection); } From 45ea9c2be7aa762801a3cbdd992df2da0e13d084 Mon Sep 17 00:00:00 2001 From: lasanthaS Date: Fri, 18 Oct 2019 15:03:39 +0530 Subject: [PATCH 5/5] Fix intermittent test failure --- .../siddhi/io/cdc/source/TestCaseOfCDCPollingMode.java | 4 ++-- component/src/test/resources/testng-suit2.xml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/component/src/test/java/org/wso2/extension/siddhi/io/cdc/source/TestCaseOfCDCPollingMode.java b/component/src/test/java/org/wso2/extension/siddhi/io/cdc/source/TestCaseOfCDCPollingMode.java index d8a5838..ce523cb 100644 --- a/component/src/test/java/org/wso2/extension/siddhi/io/cdc/source/TestCaseOfCDCPollingMode.java +++ b/component/src/test/java/org/wso2/extension/siddhi/io/cdc/source/TestCaseOfCDCPollingMode.java @@ -180,7 +180,7 @@ public void receive(Event[] events) { /** * Test case to test state persistence of polling mode. */ - @Test + @Test(dependsOnMethods = {"testCDCPollingMode"}) public void testCDCPollingModeStatePersistence() throws InterruptedException { log.info("------------------------------------------------------------------------------------------------"); log.info("CDC TestCase: Testing state persistence of the polling mode."); @@ -301,7 +301,7 @@ public void receive(Event[] events) { siddhiManager.shutdown(); } - @Test + @Test(dependsOnMethods = {"testCDCPollingModeStatePersistence"}) public void testOutOfOrderRecords() throws InterruptedException { log.info("------------------------------------------------------------------------------------------------"); log.info("CDC TestCase: Test missed/out-of-order events in polling mode."); diff --git a/component/src/test/resources/testng-suit2.xml b/component/src/test/resources/testng-suit2.xml index ed8e564..01ec67f 100644 --- a/component/src/test/resources/testng-suit2.xml +++ b/component/src/test/resources/testng-suit2.xml @@ -22,7 +22,7 @@ - +