Skip to content

Commit

Permalink
Merge pull request #36 from lasanthaS/siddhi-4.x.x
Browse files Browse the repository at this point in the history
Fix unable to capture changes while concurrent writes
  • Loading branch information
suhothayan authored Oct 18, 2019
2 parents 98b018d + 45ea9c2 commit 6c428ed
Show file tree
Hide file tree
Showing 9 changed files with 677 additions and 231 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,28 @@
defaultValue = "{host}_{port}",
optional = true,
type = DataType.STRING
),
@Parameter(
name = "wait.on.missed.record",
description = "Indicates whether the process needs to wait on missing/out-of-order records. " +
"\nWhen this flag is set to 'true' the process will be held once it identifies a " +
"missing record. The missing recrod is identified by the sequence of the " +
"polling.column value. This can be used only with number fields and not recommended " +
"to use with time values as it will not be sequential." +
"\nThis should be enabled ONLY where the records can be written out-of-order, (eg. " +
"concurrent writers) as this degrades the performance.",
type = DataType.BOOL,
optional = true,
defaultValue = "false"
),
@Parameter(
name = "missed.record.waiting.timeout",
description = "The timeout (specified in seconds) to retry for missing/out-of-order record. " +
"This should be used along with the wait.on.missed.record parameter. If the " +
"parameter is not set, the process will indefinitely wait for the missing record.",
type = DataType.INT,
optional = true,
defaultValue = "-1"
)
},
examples = {
Expand Down Expand Up @@ -252,6 +274,20 @@
"\ndefine stream inputStream (name string);",
description = "In this example, the CDC source polls the 'students' table for inserts " +
"and updates. The polling column is a timestamp field."
),
@Example(
syntax = "@source(type='cdc', jdbc.driver.name='com.mysql.jdbc.Driver', " +
"url='jdbc:mysql://localhost:3306/SimpleDB', username='cdcuser', " +
"password='pswd4cdc', table.name='students', mode='polling', polling.column='id', " +
"operation='insert', wait.on.missed.record='true', " +
"missed.record.waiting.timeout='10'," +
"\n@map(type='keyvalue'), " +
"\n@attributes(batch_no='batch_no', item='item', qty='qty'))" +
"\ndefine stream inputStream (id int, name string);",
description = "In this example, the CDC source polls the 'students' table for inserts. The " +
"polling column is a numeric field. This source expects the records in the database " +
"to be written concurrently/out-of-order so it waits if it encounters a missing " +
"record. If the record doesn't appear within 10 seconds it resumes the process."
)
}
)
Expand All @@ -273,20 +309,21 @@ public class CDCSource extends Source {
public void init(SourceEventListener sourceEventListener, OptionHolder optionHolder,
String[] requestedTransportPropertyNames, ConfigReader configReader,
SiddhiAppContext siddhiAppContext) {


//initialize mode
mode = optionHolder.validateAndGetStaticValue(CDCSourceConstants.MODE, CDCSourceConstants.MODE_LISTENING);

//initialize common mandatory parameters
String tableName = optionHolder.validateAndGetOption(CDCSourceConstants.TABLE_NAME).getValue();
String siddhiAppName = siddhiAppContext.getName();

switch (mode) {
case CDCSourceConstants.MODE_LISTENING:

String url = optionHolder.validateAndGetOption(CDCSourceConstants.DATABASE_CONNECTION_URL).getValue();
String username = optionHolder.validateAndGetOption(CDCSourceConstants.USERNAME).getValue();
String password = optionHolder.validateAndGetOption(CDCSourceConstants.PASSWORD).getValue();

String siddhiAppName = siddhiAppContext.getName();
String streamName = sourceEventListener.getStreamDefinition().getId();

//initialize mandatory parameters
Expand Down Expand Up @@ -347,17 +384,24 @@ public void init(SourceEventListener sourceEventListener, OptionHolder optionHol
validatePollingModeParameters();
String poolPropertyString = optionHolder.validateAndGetStaticValue(CDCSourceConstants.POOL_PROPERTIES,
null);
boolean waitOnMissedRecord = Boolean.parseBoolean(
optionHolder.validateAndGetStaticValue(CDCSourceConstants.WAIT_ON_MISSED_RECORD, "false"));
int missedRecordWaitingTimeout = Integer.parseInt(
optionHolder.validateAndGetStaticValue(
CDCSourceConstants.MISSED_RECORD_WAITING_TIMEOUT, "-1"));

if (isDatasourceNameAvailable) {
String datasourceName = optionHolder.validateAndGetStaticValue(CDCSourceConstants.DATASOURCE_NAME);
cdcPoller = new CDCPoller(null, null, null, tableName, null,
datasourceName, null, pollingColumn, pollingInterval,
poolPropertyString, sourceEventListener, configReader);
poolPropertyString, sourceEventListener, configReader, waitOnMissedRecord,
missedRecordWaitingTimeout, siddhiAppName);
} else if (isJndiResourceAvailable) {
String jndiResource = optionHolder.validateAndGetStaticValue(CDCSourceConstants.JNDI_RESOURCE);
cdcPoller = new CDCPoller(null, null, null, tableName, null,
null, jndiResource, pollingColumn, pollingInterval, poolPropertyString,
sourceEventListener, configReader);
sourceEventListener, configReader, waitOnMissedRecord, missedRecordWaitingTimeout,
siddhiAppName);
} else {
String driverClassName;
try {
Expand All @@ -372,7 +416,8 @@ public void init(SourceEventListener sourceEventListener, OptionHolder optionHol
}
cdcPoller = new CDCPoller(url, username, password, tableName, driverClassName,
null, null, pollingColumn, pollingInterval, poolPropertyString,
sourceEventListener, configReader);
sourceEventListener, configReader, waitOnMissedRecord, missedRecordWaitingTimeout,
siddhiAppName);
}
break;
default:
Expand Down
Loading

0 comments on commit 6c428ed

Please sign in to comment.