Skip to content

Commit

Permalink
Merge pull request #38 from lasanthaS/siddhi-4.x.x
Browse files Browse the repository at this point in the history
Fix timeout issue while waiting on missed records
  • Loading branch information
mohanvive committed Oct 21, 2019
2 parents 6c428ed + 02e0e2d commit 8fecfef
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 10 deletions.
Expand Up @@ -66,7 +66,6 @@ public void poll() {
Connection connection = getConnection();
PreparedStatement statement = null;
ResultSet resultSet = null;
boolean breakOnMissingRecord = false;
int waitingFor = -1;
long waitingFrom = -1;
try {
Expand Down Expand Up @@ -115,12 +114,13 @@ public void poll() {
waitingFrom = System.currentTimeMillis();
}

isTimedout = waitTimeout > -1 && waitingFrom + waitTimeout < System.currentTimeMillis();
long waitEndTimestamp = waitTimeout > -1 ?
waitingFrom + (waitTimeout * 1000L) : Long.MAX_VALUE;
isTimedout = waitEndTimestamp < System.currentTimeMillis();
if (!isTimedout) {
log.debug("Missed record found at " + waitingFor + " in table " + tableName +
". Hence pausing the process and " + "retry in " + pollingInterval +
" seconds.");
breakOnMissingRecord = true;
break;
}
}
Expand Down Expand Up @@ -150,9 +150,6 @@ public void poll() {
CDCPollingUtil.cleanupConnection(resultSet, null, null);
}
try {
if (breakOnMissingRecord) {
breakOnMissingRecord = false;
}
Thread.sleep((long) pollingInterval * 1000);
} catch (InterruptedException e) {
log.error(buildError("Error while polling the table %s.", tableName), e);
Expand Down
Expand Up @@ -369,10 +369,10 @@ public void receive(Event[] events) {

// Do inserts and wait CDC app to capture the events.
InputHandler inputHandler = siddhiAppRuntime.getInputHandler("inputStream");
Object[] ann = new Object[]{11, "Ann"};
Object[] bob = new Object[]{12, "Bob"};
Object[] charles = new Object[]{13, "Charles"};
Object[] david = new Object[]{14, "David"};
Object[] ann = new Object[]{1, "Ann"};
Object[] bob = new Object[]{2, "Bob"};
Object[] charles = new Object[]{3, "Charles"};
Object[] david = new Object[]{4, "David"};

inputHandler.send(ann);
inputHandler.send(bob);
Expand Down

0 comments on commit 8fecfef

Please sign in to comment.