diff --git a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/CDCSource.java b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/CDCSource.java index 555a1a1..f2b549a 100644 --- a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/CDCSource.java +++ b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/CDCSource.java @@ -184,6 +184,28 @@ defaultValue = "{host}_{port}", optional = true, type = DataType.STRING + ), + @Parameter( + name = "wait.on.missed.record", + description = "Indicates whether the process needs to wait on missing/out-of-order records. " + + "\nWhen this flag is set to 'true' the process will be held once it identifies a " + + "missing record. The missing recrod is identified by the sequence of the " + + "polling.column value. This can be used only with number fields and not recommended " + + "to use with time values as it will not be sequential." + + "\nThis should be enabled ONLY where the records can be written out-of-order, (eg. " + + "concurrent writers) as this degrades the performance.", + type = DataType.BOOL, + optional = true, + defaultValue = "false" + ), + @Parameter( + name = "missed.record.waiting.timeout", + description = "The timeout (specified in seconds) to retry for missing/out-of-order record. " + + "This should be used along with the wait.on.missed.record parameter. If the " + + "parameter is not set, the process will indefinitely wait for the missing record.", + type = DataType.INT, + optional = true, + defaultValue = "-1" ) }, examples = { @@ -252,6 +274,20 @@ "\ndefine stream inputStream (name string);", description = "In this example, the CDC source polls the 'students' table for inserts " + "and updates. The polling column is a timestamp field." + ), + @Example( + syntax = "@source(type='cdc', jdbc.driver.name='com.mysql.jdbc.Driver', " + + "url='jdbc:mysql://localhost:3306/SimpleDB', username='cdcuser', " + + "password='pswd4cdc', table.name='students', mode='polling', polling.column='id', " + + "operation='insert', wait.on.missed.record='true', " + + "missed.record.waiting.timeout='10'," + + "\n@map(type='keyvalue'), " + + "\n@attributes(batch_no='batch_no', item='item', qty='qty'))" + + "\ndefine stream inputStream (id int, name string);", + description = "In this example, the CDC source polls the 'students' table for inserts. The " + + "polling column is a numeric field. This source expects the records in the database " + + "to be written concurrently/out-of-order so it waits if it encounters a missing " + + "record. If the record doesn't appear within 10 seconds it resumes the process." ) } ) @@ -273,11 +309,14 @@ public class CDCSource extends Source { public void init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] requestedTransportPropertyNames, ConfigReader configReader, SiddhiAppContext siddhiAppContext) { + + //initialize mode mode = optionHolder.validateAndGetStaticValue(CDCSourceConstants.MODE, CDCSourceConstants.MODE_LISTENING); //initialize common mandatory parameters String tableName = optionHolder.validateAndGetOption(CDCSourceConstants.TABLE_NAME).getValue(); + String siddhiAppName = siddhiAppContext.getName(); switch (mode) { case CDCSourceConstants.MODE_LISTENING: @@ -285,8 +324,6 @@ public void init(SourceEventListener sourceEventListener, OptionHolder optionHol String url = optionHolder.validateAndGetOption(CDCSourceConstants.DATABASE_CONNECTION_URL).getValue(); String username = optionHolder.validateAndGetOption(CDCSourceConstants.USERNAME).getValue(); String password = optionHolder.validateAndGetOption(CDCSourceConstants.PASSWORD).getValue(); - - String siddhiAppName = siddhiAppContext.getName(); String streamName = sourceEventListener.getStreamDefinition().getId(); //initialize mandatory parameters @@ -347,17 +384,24 @@ public void init(SourceEventListener sourceEventListener, OptionHolder optionHol validatePollingModeParameters(); String poolPropertyString = optionHolder.validateAndGetStaticValue(CDCSourceConstants.POOL_PROPERTIES, null); + boolean waitOnMissedRecord = Boolean.parseBoolean( + optionHolder.validateAndGetStaticValue(CDCSourceConstants.WAIT_ON_MISSED_RECORD, "false")); + int missedRecordWaitingTimeout = Integer.parseInt( + optionHolder.validateAndGetStaticValue( + CDCSourceConstants.MISSED_RECORD_WAITING_TIMEOUT, "-1")); if (isDatasourceNameAvailable) { String datasourceName = optionHolder.validateAndGetStaticValue(CDCSourceConstants.DATASOURCE_NAME); cdcPoller = new CDCPoller(null, null, null, tableName, null, datasourceName, null, pollingColumn, pollingInterval, - poolPropertyString, sourceEventListener, configReader); + poolPropertyString, sourceEventListener, configReader, waitOnMissedRecord, + missedRecordWaitingTimeout, siddhiAppName); } else if (isJndiResourceAvailable) { String jndiResource = optionHolder.validateAndGetStaticValue(CDCSourceConstants.JNDI_RESOURCE); cdcPoller = new CDCPoller(null, null, null, tableName, null, null, jndiResource, pollingColumn, pollingInterval, poolPropertyString, - sourceEventListener, configReader); + sourceEventListener, configReader, waitOnMissedRecord, missedRecordWaitingTimeout, + siddhiAppName); } else { String driverClassName; try { @@ -372,7 +416,8 @@ public void init(SourceEventListener sourceEventListener, OptionHolder optionHol } cdcPoller = new CDCPoller(url, username, password, tableName, driverClassName, null, null, pollingColumn, pollingInterval, poolPropertyString, - sourceEventListener, configReader); + sourceEventListener, configReader, waitOnMissedRecord, missedRecordWaitingTimeout, + siddhiAppName); } break; default: diff --git a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/CDCPoller.java b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/CDCPoller.java index 3d31b75..950b6c7 100644 --- a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/CDCPoller.java +++ b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/CDCPoller.java @@ -26,31 +26,17 @@ import org.osgi.framework.ServiceReference; import org.wso2.carbon.datasource.core.api.DataSourceService; import org.wso2.carbon.datasource.core.exception.DataSourceException; -import org.wso2.extension.siddhi.io.cdc.source.config.Database; -import org.wso2.extension.siddhi.io.cdc.source.config.QueryConfiguration; +import org.wso2.extension.siddhi.io.cdc.source.polling.strategies.DefaultPollingStrategy; +import org.wso2.extension.siddhi.io.cdc.source.polling.strategies.PollingStrategy; +import org.wso2.extension.siddhi.io.cdc.source.polling.strategies.WaitOnMissingRecordPollingStrategy; import org.wso2.extension.siddhi.io.cdc.util.CDCPollingUtil; import org.wso2.extension.siddhi.io.cdc.util.CDCSourceConstants; -import org.wso2.extension.siddhi.io.cdc.util.MyYamlConstructor; import org.wso2.siddhi.core.stream.input.source.SourceEventListener; import org.wso2.siddhi.core.util.config.ConfigReader; -import org.yaml.snakeyaml.TypeDescription; -import org.yaml.snakeyaml.Yaml; -import java.io.IOException; -import java.io.InputStream; -import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.util.HashMap; import java.util.List; -import java.util.Locale; -import java.util.Map; import java.util.Properties; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; + import javax.naming.InitialContext; import javax.naming.NamingException; @@ -60,48 +46,54 @@ public class CDCPoller implements Runnable { private static final Logger log = Logger.getLogger(CDCPoller.class); - private static final String PLACE_HOLDER_TABLE_NAME = "{{TABLE_NAME}}"; - private static final String PLACE_HOLDER_COLUMN_LIST = "{{COLUMN_LIST}}"; - private static final String PLACE_HOLDER_CONDITION = "{{CONDITION}}"; - private static final String SELECT_QUERY_CONFIG_FILE = "query-config.yaml"; - private static final String RECORD_SELECT_QUERY = "recordSelectQuery"; - private String selectQueryStructure = ""; private String url; private String tableName; private String username; private String password; private String driverClassName; private HikariDataSource dataSource; - private String lastReadPollingColumnValue; - private SourceEventListener sourceEventListener; - private String pollingColumn; private String datasourceName; - private int pollingInterval; private CompletionCallback completionCallback; - private boolean paused = false; - private ReentrantLock pauseLock = new ReentrantLock(); - private Condition pauseLockCondition = pauseLock.newCondition(); - private ConfigReader configReader; private String poolPropertyString; private String jndiResource; private boolean isLocalDataSource = false; + private String appName; + private String streamName; + + private PollingStrategy pollingStrategy; public CDCPoller(String url, String username, String password, String tableName, String driverClassName, String datasourceName, String jndiResource, String pollingColumn, int pollingInterval, String poolPropertyString, - SourceEventListener sourceEventListener, ConfigReader configReader) { + SourceEventListener sourceEventListener, ConfigReader configReader, boolean waitOnMissedRecord, + int missedRecordWaitingTimeout, String appName) { this.url = url; this.tableName = tableName; this.username = username; this.password = password; this.driverClassName = driverClassName; - this.sourceEventListener = sourceEventListener; - this.pollingColumn = pollingColumn; - this.pollingInterval = pollingInterval; - this.configReader = configReader; this.poolPropertyString = poolPropertyString; this.datasourceName = datasourceName; this.jndiResource = jndiResource; + this.appName = appName; + this.streamName = sourceEventListener.getStreamDefinition().getId(); + + try { + initializeDatasource(); + } catch (NamingException e) { + throw new CDCPollingModeException("Error in initializing connection for " + tableName + ". {mode=" + + CDCSourceConstants.MODE_POLLING + ", app=" + appName + ", stream=" + streamName + "}", e); + } + + if (waitOnMissedRecord) { + log.debug(WaitOnMissingRecordPollingStrategy.class + " is selected as the polling strategy."); + this.pollingStrategy = new WaitOnMissingRecordPollingStrategy(dataSource, configReader, sourceEventListener, + tableName, pollingColumn, pollingInterval, missedRecordWaitingTimeout, appName); + } else { + log.debug(DefaultPollingStrategy.class + " is selected as the polling strategy."); + this.pollingStrategy = new DefaultPollingStrategy(dataSource, configReader, sourceEventListener, + tableName, pollingColumn, pollingInterval, appName); + } } public HikariDataSource getDataSource() { @@ -174,205 +166,25 @@ public boolean isLocalDataSource() { } public String getLastReadPollingColumnValue() { - return lastReadPollingColumnValue; + return pollingStrategy.getLastReadPollingColumnValue(); } public void setLastReadPollingColumnValue(String lastReadPollingColumnValue) { - this.lastReadPollingColumnValue = lastReadPollingColumnValue; - } - - private Connection getConnection() { - Connection conn; - try { - conn = this.dataSource.getConnection(); - if (log.isDebugEnabled()) { - log.debug("A connection is initialized "); - } - } catch (SQLException e) { - throw new CDCPollingModeException("Error initializing datasource connection. Current mode: " + - CDCSourceConstants.MODE_POLLING, e); - } - return conn; - } - - private String getSelectQuery(String columnList, String condition) { - String selectQuery; - if (selectQueryStructure.isEmpty()) { - //Get the database product name - String databaseName; - Connection conn = null; - try { - conn = getConnection(); - DatabaseMetaData dmd = conn.getMetaData(); - databaseName = dmd.getDatabaseProductName(); - } catch (SQLException e) { - throw new CDCPollingModeException("Error in looking up database type. Current mode: " + - CDCSourceConstants.MODE_POLLING, e); - } finally { - CDCPollingUtil.cleanupConnection(null, null, conn); - } - - //Read configs from config reader. - selectQueryStructure = configReader.readConfig(databaseName + "." + RECORD_SELECT_QUERY, ""); - - if (selectQueryStructure.isEmpty()) { - //Read configs from yaml file - QueryConfiguration queryConfiguration; - InputStream inputStream = null; - try { - MyYamlConstructor constructor = new MyYamlConstructor(QueryConfiguration.class); - TypeDescription queryTypeDescription = new TypeDescription(QueryConfiguration.class); - queryTypeDescription.putListPropertyType("databases", Database.class); - constructor.addTypeDescription(queryTypeDescription); - Yaml yaml = new Yaml(constructor); - ClassLoader classLoader = getClass().getClassLoader(); - inputStream = classLoader.getResourceAsStream(SELECT_QUERY_CONFIG_FILE); - if (inputStream == null) { - throw new CDCPollingModeException(SELECT_QUERY_CONFIG_FILE - + " is not found in the classpath. Current mode: " + CDCSourceConstants.MODE_POLLING); - } - queryConfiguration = (QueryConfiguration) yaml.load(inputStream); - } finally { - if (inputStream != null) { - try { - inputStream.close(); - } catch (IOException e) { - log.error("Failed to close the input stream for " + SELECT_QUERY_CONFIG_FILE + ". " + - "Current mode: " + CDCSourceConstants.MODE_POLLING); - } - } - } - - //Get database related select query structure - if (queryConfiguration != null) { - for (Database database : queryConfiguration.getDatabases()) { - if (database.getName().equalsIgnoreCase(databaseName)) { - selectQueryStructure = database.getSelectQuery(); - break; - } - } - } - } - - if (selectQueryStructure.isEmpty()) { - throw new CDCPollingModeException("Unsupported database: " + databaseName + ". Configure system" + - " parameter: " + databaseName + "." + RECORD_SELECT_QUERY + ". Current mode: " + - CDCSourceConstants.MODE_POLLING); - } - } - - //create the select query with given constraints - selectQuery = selectQueryStructure.replace(PLACE_HOLDER_TABLE_NAME, tableName) - .replace(PLACE_HOLDER_COLUMN_LIST, columnList) - .replace(PLACE_HOLDER_CONDITION, condition); - - return selectQuery; - } - - /** - * Poll for inserts and updates. - */ - private void pollForChanges() { - try { - initializeDatasource(); - } catch (NamingException e) { - throw new CDCPollingModeException("Error in initializing connection for " + tableName + ". " + - "Current mode: " + CDCSourceConstants.MODE_POLLING, e); - } - - String selectQuery; - ResultSetMetaData metadata; - Map detailsMap; - Connection connection = getConnection(); - PreparedStatement statement = null; - ResultSet resultSet = null; - - try { - //If lastReadPollingColumnValue is null, assign it with last record of the table. - if (lastReadPollingColumnValue == null) { - selectQuery = getSelectQuery("MAX(" + pollingColumn + ")", "").trim(); - statement = connection.prepareStatement(selectQuery); - resultSet = statement.executeQuery(); - if (resultSet.next()) { - lastReadPollingColumnValue = resultSet.getString(1); - } - //if the table is empty, set last offset to a negative value. - if (lastReadPollingColumnValue == null) { - lastReadPollingColumnValue = "-1"; - } - } - - selectQuery = getSelectQuery("*", "WHERE " + pollingColumn + " > ?"); - statement = connection.prepareStatement(selectQuery); - - while (true) { - if (paused) { - pauseLock.lock(); - try { - while (paused) { - pauseLockCondition.await(); - } - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } finally { - pauseLock.unlock(); - } - } - try { - statement.setString(1, lastReadPollingColumnValue); - resultSet = statement.executeQuery(); - metadata = resultSet.getMetaData(); - while (resultSet.next()) { - detailsMap = new HashMap<>(); - for (int i = 1; i <= metadata.getColumnCount(); i++) { - String key = metadata.getColumnName(i); - Object value = resultSet.getObject(key); - detailsMap.put(key.toLowerCase(Locale.ENGLISH), value); - } - lastReadPollingColumnValue = resultSet.getString(pollingColumn); - handleEvent(detailsMap); - } - } catch (SQLException ex) { - log.error(ex); - } finally { - CDCPollingUtil.cleanupConnection(resultSet, null, null); - } - try { - Thread.sleep((long) pollingInterval * 1000); - } catch (InterruptedException e) { - log.error("Error while polling. Current mode: " + CDCSourceConstants.MODE_POLLING, e); - } - } - } catch (SQLException ex) { - throw new CDCPollingModeException("Error in polling for changes on " + tableName + ". Current mode: " + - CDCSourceConstants.MODE_POLLING, ex); - } finally { - CDCPollingUtil.cleanupConnection(resultSet, statement, connection); - } - } - - private void handleEvent(Map detailsMap) { - sourceEventListener.onEvent(detailsMap, null); + pollingStrategy.setLastReadPollingColumnValue(lastReadPollingColumnValue); } public void pause() { - paused = true; + pollingStrategy.pause(); } public void resume() { - paused = false; - try { - pauseLock.lock(); - pauseLockCondition.signal(); - } finally { - pauseLock.unlock(); - } + pollingStrategy.resume(); } @Override public void run() { try { - pollForChanges(); + pollingStrategy.poll(); } catch (CDCPollingModeException e) { completionCallback.handle(e); } diff --git a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/DefaultPollingStrategy.java b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/DefaultPollingStrategy.java new file mode 100644 index 0000000..90e656f --- /dev/null +++ b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/DefaultPollingStrategy.java @@ -0,0 +1,137 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.extension.siddhi.io.cdc.source.polling.strategies; + +import com.zaxxer.hikari.HikariDataSource; +import org.apache.log4j.Logger; +import org.wso2.extension.siddhi.io.cdc.source.polling.CDCPollingModeException; +import org.wso2.extension.siddhi.io.cdc.util.CDCPollingUtil; +import org.wso2.siddhi.core.stream.input.source.SourceEventListener; +import org.wso2.siddhi.core.util.config.ConfigReader; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; + +/** + * Default implementation of the polling strategy. This uses {@code pollingColumn} and {@code pollingInterval} to poll + * on. + */ +public class DefaultPollingStrategy extends PollingStrategy { + private static final Logger log = Logger.getLogger(DefaultPollingStrategy.class); + + private String pollingColumn; + private int pollingInterval; + private String lastReadPollingColumnValue; + + public DefaultPollingStrategy(HikariDataSource dataSource, ConfigReader configReader, + SourceEventListener sourceEventListener, String tableName, String pollingColumn, + int pollingInterval, String appName) { + super(dataSource, configReader, sourceEventListener, tableName, appName); + this.pollingColumn = pollingColumn; + this.pollingInterval = pollingInterval; + } + + @Override + public void poll() { + String selectQuery; + ResultSetMetaData metadata; + Map detailsMap; + Connection connection = getConnection(); + PreparedStatement statement = null; + ResultSet resultSet = null; + + try { + //If lastReadPollingColumnValue is null, assign it with last record of the table. + if (lastReadPollingColumnValue == null) { + selectQuery = getSelectQuery("MAX(" + pollingColumn + ")", "").trim(); + statement = connection.prepareStatement(selectQuery); + resultSet = statement.executeQuery(); + if (resultSet.next()) { + lastReadPollingColumnValue = resultSet.getString(1); + } + //if the table is empty, set last offset to a negative value. + if (lastReadPollingColumnValue == null) { + lastReadPollingColumnValue = "-1"; + } + } + + selectQuery = getSelectQuery("*", "WHERE " + pollingColumn + " > ?"); + statement = connection.prepareStatement(selectQuery); + + while (true) { + if (paused) { + pauseLock.lock(); + try { + while (paused) { + pauseLockCondition.await(); + } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } finally { + pauseLock.unlock(); + } + } + try { + statement.setString(1, lastReadPollingColumnValue); + resultSet = statement.executeQuery(); + metadata = resultSet.getMetaData(); + while (resultSet.next()) { + detailsMap = new HashMap<>(); + for (int i = 1; i <= metadata.getColumnCount(); i++) { + String key = metadata.getColumnName(i); + Object value = resultSet.getObject(key); + detailsMap.put(key.toLowerCase(Locale.ENGLISH), value); + } + lastReadPollingColumnValue = resultSet.getString(pollingColumn); + handleEvent(detailsMap); + } + } catch (SQLException ex) { + log.error(buildError("Error occurred while processing records in table %s.", tableName), ex); + } finally { + CDCPollingUtil.cleanupConnection(resultSet, null, null); + } + try { + Thread.sleep((long) pollingInterval * 1000); + } catch (InterruptedException e) { + log.error(buildError("Error while polling the table %s.", tableName), e); + } + } + } catch (SQLException ex) { + throw new CDCPollingModeException(buildError("Error in polling for changes on %s.", tableName), ex); + } finally { + CDCPollingUtil.cleanupConnection(resultSet, statement, connection); + } + } + + @Override + public String getLastReadPollingColumnValue() { + return lastReadPollingColumnValue; + } + + @Override + public void setLastReadPollingColumnValue(String lastReadPollingColumnValue) { + this.lastReadPollingColumnValue = lastReadPollingColumnValue; + } +} diff --git a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/PollingStrategy.java b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/PollingStrategy.java new file mode 100644 index 0000000..d56f726 --- /dev/null +++ b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/PollingStrategy.java @@ -0,0 +1,185 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.extension.siddhi.io.cdc.source.polling.strategies; + +import com.zaxxer.hikari.HikariDataSource; +import org.apache.log4j.Logger; +import org.wso2.extension.siddhi.io.cdc.source.config.Database; +import org.wso2.extension.siddhi.io.cdc.source.config.QueryConfiguration; +import org.wso2.extension.siddhi.io.cdc.source.polling.CDCPollingModeException; +import org.wso2.extension.siddhi.io.cdc.util.CDCPollingUtil; +import org.wso2.extension.siddhi.io.cdc.util.CDCSourceConstants; +import org.wso2.extension.siddhi.io.cdc.util.MyYamlConstructor; +import org.wso2.siddhi.core.stream.input.source.SourceEventListener; +import org.wso2.siddhi.core.util.config.ConfigReader; +import org.yaml.snakeyaml.TypeDescription; +import org.yaml.snakeyaml.Yaml; + +import java.io.IOException; +import java.io.InputStream; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.SQLException; +import java.util.Map; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Abstract definition of polling strategy to poll DB changes. + */ +public abstract class PollingStrategy { + private static final Logger log = Logger.getLogger(PollingStrategy.class); + private static final String PLACE_HOLDER_TABLE_NAME = "{{TABLE_NAME}}"; + private static final String PLACE_HOLDER_COLUMN_LIST = "{{COLUMN_LIST}}"; + private static final String PLACE_HOLDER_CONDITION = "{{CONDITION}}"; + private static final String SELECT_QUERY_CONFIG_FILE = "query-config.yaml"; + private static final String RECORD_SELECT_QUERY = "recordSelectQuery"; + + private HikariDataSource dataSource; + private String selectQueryStructure = ""; + private ConfigReader configReader; + private SourceEventListener sourceEventListener; + private String appName; + private String streamName; + + protected boolean paused = false; + protected ReentrantLock pauseLock = new ReentrantLock(); + protected Condition pauseLockCondition = pauseLock.newCondition(); + protected String tableName; + + public PollingStrategy(HikariDataSource dataSource, ConfigReader configReader, + SourceEventListener sourceEventListener, String tableName, String appName) { + this.dataSource = dataSource; + this.configReader = configReader; + this.sourceEventListener = sourceEventListener; + this.tableName = tableName; + this.appName = appName; + this.streamName = sourceEventListener.getStreamDefinition().getId(); + } + + public abstract void poll(); + + public abstract String getLastReadPollingColumnValue(); + + public abstract void setLastReadPollingColumnValue(String lastReadPollingColumnValue); + + public void pause() { + paused = true; + } + + public void resume() { + paused = false; + try { + pauseLock.lock(); + pauseLockCondition.signal(); + } finally { + pauseLock.unlock(); + } + } + + protected Connection getConnection() { + Connection conn; + try { + conn = this.dataSource.getConnection(); + log.debug("A connection is initialized."); + } catch (SQLException e) { + throw new CDCPollingModeException(buildError("Error initializing datasource connection."), e); + } + return conn; + } + + protected String getSelectQuery(String columnList, String condition) { + String selectQuery; + if (selectQueryStructure.isEmpty()) { + //Get the database product name + String databaseName; + Connection conn = null; + try { + conn = getConnection(); + DatabaseMetaData dmd = conn.getMetaData(); + databaseName = dmd.getDatabaseProductName(); + } catch (SQLException e) { + throw new CDCPollingModeException(buildError("Error in looking up database type."), e); + } finally { + CDCPollingUtil.cleanupConnection(null, null, conn); + } + + //Read configs from config reader. + selectQueryStructure = configReader.readConfig(databaseName + "." + RECORD_SELECT_QUERY, ""); + + if (selectQueryStructure.isEmpty()) { + //Read configs from yaml file + QueryConfiguration queryConfiguration; + InputStream inputStream = null; + try { + MyYamlConstructor constructor = new MyYamlConstructor(QueryConfiguration.class); + TypeDescription queryTypeDescription = new TypeDescription(QueryConfiguration.class); + queryTypeDescription.putListPropertyType("databases", Database.class); + constructor.addTypeDescription(queryTypeDescription); + Yaml yaml = new Yaml(constructor); + ClassLoader classLoader = getClass().getClassLoader(); + inputStream = classLoader.getResourceAsStream(SELECT_QUERY_CONFIG_FILE); + if (inputStream == null) { + throw new CDCPollingModeException(buildError("%s is not found in the classpath", + SELECT_QUERY_CONFIG_FILE)); + } + queryConfiguration = (QueryConfiguration) yaml.load(inputStream); + } finally { + if (inputStream != null) { + try { + inputStream.close(); + } catch (IOException e) { + log.error(buildError("Failed to close the input stream for %s.", SELECT_QUERY_CONFIG_FILE)); + } + } + } + + //Get database related select query structure + if (queryConfiguration != null) { + for (Database database : queryConfiguration.getDatabases()) { + if (database.getName().equalsIgnoreCase(databaseName)) { + selectQueryStructure = database.getSelectQuery(); + break; + } + } + } + } + + if (selectQueryStructure.isEmpty()) { + throw new CDCPollingModeException(buildError("Unsupported database: %s. Configure system " + + "parameter: %s.%s.", databaseName, databaseName, RECORD_SELECT_QUERY)); + } + } + //create the select query with given constraints + selectQuery = selectQueryStructure.replace(PLACE_HOLDER_TABLE_NAME, tableName) + .replace(PLACE_HOLDER_COLUMN_LIST, columnList) + .replace(PLACE_HOLDER_CONDITION, condition); + + return selectQuery; + } + + protected void handleEvent(Map detailsMap) { + sourceEventListener.onEvent(detailsMap, null); + } + + protected String buildError(String message, Object... args) { + return String.format(message, args) + " {mode=" + CDCSourceConstants.MODE_POLLING + ", app=" + appName + + ", stream=" + streamName + "}"; + } +} diff --git a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/WaitOnMissingRecordPollingStrategy.java b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/WaitOnMissingRecordPollingStrategy.java new file mode 100644 index 0000000..28722a9 --- /dev/null +++ b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/WaitOnMissingRecordPollingStrategy.java @@ -0,0 +1,177 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.extension.siddhi.io.cdc.source.polling.strategies; + +import com.zaxxer.hikari.HikariDataSource; +import org.apache.log4j.Logger; +import org.wso2.extension.siddhi.io.cdc.source.polling.CDCPollingModeException; +import org.wso2.extension.siddhi.io.cdc.util.CDCPollingUtil; +import org.wso2.siddhi.core.stream.input.source.SourceEventListener; +import org.wso2.siddhi.core.util.config.ConfigReader; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; + +/** + * Polling strategy implementation to wait-on-missing records. If the polling strategy identifies a missed record in + * the polled chunk, it holds the rest of the processing until the record comes in. This uses {@code pollingColumn}, + * {@code pollingInterval}, {@code missedRecordRetryIntervalMS} and {@code missedRecordWaitingTimeoutMS}. + */ +public class WaitOnMissingRecordPollingStrategy extends PollingStrategy { + private static final Logger log = Logger.getLogger(WaitOnMissingRecordPollingStrategy.class); + + private String pollingColumn; + private int pollingInterval; + private int waitTimeout; + // The 'wait on missed records' events only work with numeric type. Hence assuming the polling.column is a number. + private Integer lastReadPollingColumnValue; + + public WaitOnMissingRecordPollingStrategy(HikariDataSource dataSource, ConfigReader configReader, + SourceEventListener sourceEventListener, String tableName, + String pollingColumn, int pollingInterval, int waitTimeout, + String appName) { + super(dataSource, configReader, sourceEventListener, tableName, appName); + this.pollingColumn = pollingColumn; + this.pollingInterval = pollingInterval; + this.waitTimeout = waitTimeout; + } + + @Override + public void poll() { + String selectQuery; + ResultSetMetaData metadata; + Map detailsMap; + Connection connection = getConnection(); + PreparedStatement statement = null; + ResultSet resultSet = null; + boolean breakOnMissingRecord = false; + int waitingFor = -1; + long waitingFrom = -1; + try { + // If lastReadPollingColumnValue is null, assign it with last record of the table. + if (lastReadPollingColumnValue == null) { + selectQuery = getSelectQuery("MAX(" + pollingColumn + ")", "").trim(); + statement = connection.prepareStatement(selectQuery); + resultSet = statement.executeQuery(); + if (resultSet.next()) { + lastReadPollingColumnValue = resultSet.getInt(1); + } + // If the table is empty, set last offset to a negative value. + if (lastReadPollingColumnValue == null) { + lastReadPollingColumnValue = -1; + } + } + + selectQuery = getSelectQuery("*", "WHERE " + pollingColumn + " > ?"); + statement = connection.prepareStatement(selectQuery); + + while (true) { + if (paused) { + pauseLock.lock(); + try { + while (paused) { + pauseLockCondition.await(); + } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } finally { + pauseLock.unlock(); + } + } + try { + statement.setInt(1, lastReadPollingColumnValue); + resultSet = statement.executeQuery(); + metadata = resultSet.getMetaData(); + while (resultSet.next()) { + boolean isTimedout = false; + int currentPollingColumnValue = resultSet.getInt(pollingColumn); + if (currentPollingColumnValue - lastReadPollingColumnValue > 1) { + if (waitingFor == -1) { + // This is the first time to wait for the current record. Hence set the expected record + // id and the current timestamp. + waitingFor = lastReadPollingColumnValue + 1; + waitingFrom = System.currentTimeMillis(); + } + + isTimedout = waitTimeout > -1 && waitingFrom + waitTimeout < System.currentTimeMillis(); + if (!isTimedout) { + log.debug("Missed record found at " + waitingFor + " in table " + tableName + + ". Hence pausing the process and " + "retry in " + pollingInterval + + " seconds."); + breakOnMissingRecord = true; + break; + } + } + if (waitingFor > -1) { + if (isTimedout) { + log.debug("Waiting for missed record " + waitingFor + " in table " + tableName + + " timed-out. Hence resuming the process."); + } else { + log.debug("Received the missed record " + waitingFor + " in table " + tableName + + ". Hence resuming the process."); + } + waitingFor = -1; + waitingFrom = -1; + } + detailsMap = new HashMap<>(); + for (int i = 1; i <= metadata.getColumnCount(); i++) { + String key = metadata.getColumnName(i); + Object value = resultSet.getObject(key); + detailsMap.put(key.toLowerCase(Locale.ENGLISH), value); + } + lastReadPollingColumnValue = resultSet.getInt(pollingColumn); + handleEvent(detailsMap); + } + } catch (SQLException e) { + log.error(buildError("Error occurred while processing records in table %s.", tableName), e); + } finally { + CDCPollingUtil.cleanupConnection(resultSet, null, null); + } + try { + if (breakOnMissingRecord) { + breakOnMissingRecord = false; + } + Thread.sleep((long) pollingInterval * 1000); + } catch (InterruptedException e) { + log.error(buildError("Error while polling the table %s.", tableName), e); + } + } + } catch (SQLException ex) { + throw new CDCPollingModeException(buildError("Error in polling for changes on %s.", tableName), ex); + } finally { + CDCPollingUtil.cleanupConnection(resultSet, statement, connection); + } + } + + @Override + public String getLastReadPollingColumnValue() { + return String.valueOf(lastReadPollingColumnValue); + } + + @Override + public void setLastReadPollingColumnValue(String lastReadPollingColumnValue) { + this.lastReadPollingColumnValue = Integer.parseInt(lastReadPollingColumnValue); + } +} diff --git a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/util/CDCSourceConstants.java b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/util/CDCSourceConstants.java index 38b3afe..bd18b76 100644 --- a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/util/CDCSourceConstants.java +++ b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/util/CDCSourceConstants.java @@ -70,4 +70,6 @@ public class CDCSourceConstants { public static final int DEFAULT_POLLING_INTERVAL_SECONDS = 1; public static final String DATASOURCE_NAME = "datasource.name"; public static final String JNDI_RESOURCE = "jndi.resource"; + public static final String WAIT_ON_MISSED_RECORD = "wait.on.missed.record"; + public static final String MISSED_RECORD_WAITING_TIMEOUT = "missed.record.waiting.timeout"; } diff --git a/component/src/test/java/org/wso2/extension/siddhi/io/cdc/source/TestCaseOfCDCPollingMode.java b/component/src/test/java/org/wso2/extension/siddhi/io/cdc/source/TestCaseOfCDCPollingMode.java index 389eaa4..ce523cb 100644 --- a/component/src/test/java/org/wso2/extension/siddhi/io/cdc/source/TestCaseOfCDCPollingMode.java +++ b/component/src/test/java/org/wso2/extension/siddhi/io/cdc/source/TestCaseOfCDCPollingMode.java @@ -180,7 +180,7 @@ public void receive(Event[] events) { /** * Test case to test state persistence of polling mode. */ - @Test + @Test(dependsOnMethods = {"testCDCPollingMode"}) public void testCDCPollingModeStatePersistence() throws InterruptedException { log.info("------------------------------------------------------------------------------------------------"); log.info("CDC TestCase: Testing state persistence of the polling mode."); @@ -300,4 +300,92 @@ public void receive(Event[] events) { siddhiAppRuntime.shutdown(); siddhiManager.shutdown(); } + + @Test(dependsOnMethods = {"testCDCPollingModeStatePersistence"}) + public void testOutOfOrderRecords() throws InterruptedException { + log.info("------------------------------------------------------------------------------------------------"); + log.info("CDC TestCase: Test missed/out-of-order events in polling mode."); + log.info("------------------------------------------------------------------------------------------------"); + + SiddhiManager siddhiManager = new SiddhiManager(); + + int pollingInterval = 1; + String cdcinStreamDefinition = "@source(type = 'cdc', " + + "mode='polling', " + + "polling.column='" + pollingColumn + "', " + + "jdbc.driver.name='" + jdbcDriverName + "', " + + "url = '" + databaseURL + "', " + + "username = '" + username + "', " + + "password = '" + password + "', " + + "table.name = 'students', " + + "polling.interval = '" + pollingInterval + "', " + + "operation = 'insert', " + + "wait.on.missed.record = 'true'," + + "missed.record.waiting.timeout = '10'," + + "@map(type='keyvalue'), " + + "@attributes(id = 'id', name = 'name'))" + + "define stream outputStream (id int, name string);\n"; + + String rdbmsStoreDefinition = "define stream inputStream (id int, name string);" + + "@Store(type='rdbms', " + + "jdbc.url='" + databaseURL + "', " + + "username='" + username + "', " + + "password='" + password + "' , " + + "jdbc.driver.name='" + jdbcDriverName + "')" + + "define table students (id int, name string);"; + + String rdbmsQuery = "@info(name='query2') " + + "from inputStream " + + "insert into students;"; + + QueryCallback rdbmsQueryCallback = new QueryCallback() { + @Override + public void receive(long timestamp, Event[] inEvents, Event[] removeEvents) { + for (Event event : inEvents) { + log.info("insert done: " + event); + } + } + }; + + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(cdcinStreamDefinition + + rdbmsStoreDefinition + rdbmsQuery); + siddhiAppRuntime.addCallback("query2", rdbmsQueryCallback); + + StreamCallback outputStreamCallback = new StreamCallback() { + @Override + public void receive(Event[] events) { + for (Event event : events) { + eventCount.getAndIncrement(); + log.info(eventCount + ". " + event); + } + } + }; + + siddhiAppRuntime.addCallback("outputStream", outputStreamCallback); + siddhiAppRuntime.start(); + + // Wait till CDC poller initializes. + Thread.sleep(5000); + + // Do inserts and wait CDC app to capture the events. + InputHandler inputHandler = siddhiAppRuntime.getInputHandler("inputStream"); + Object[] ann = new Object[]{11, "Ann"}; + Object[] bob = new Object[]{12, "Bob"}; + Object[] charles = new Object[]{13, "Charles"}; + Object[] david = new Object[]{14, "David"}; + + inputHandler.send(ann); + inputHandler.send(bob); + inputHandler.send(david); + Thread.sleep(1000); + inputHandler.send(charles); + + SiddhiTestHelper.waitForEvents(waitTime, 4, eventCount, timeout); + + // Assert received event count. + Assert.assertEquals(eventCount.get(), 4); + + siddhiAppRuntime.shutdown(); + siddhiManager.shutdown(); + } } diff --git a/component/src/test/resources/testng-suit2.xml b/component/src/test/resources/testng-suit2.xml index ed8e564..01ec67f 100644 --- a/component/src/test/resources/testng-suit2.xml +++ b/component/src/test/resources/testng-suit2.xml @@ -22,7 +22,7 @@ - + diff --git a/findbugs-exclude.xml b/findbugs-exclude.xml index eac53d2..ab812c4 100644 --- a/findbugs-exclude.xml +++ b/findbugs-exclude.xml @@ -19,9 +19,9 @@ --> - + - +