From 02e0e2d2f74e9c1f35c31905a87b8b9f31b1d812 Mon Sep 17 00:00:00 2001 From: lasanthaS Date: Sun, 20 Oct 2019 02:35:24 +0530 Subject: [PATCH] Fix wait timeout issue --- .../strategies/WaitOnMissingRecordPollingStrategy.java | 9 +++------ .../siddhi/io/cdc/source/TestCaseOfCDCPollingMode.java | 8 ++++---- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/WaitOnMissingRecordPollingStrategy.java b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/WaitOnMissingRecordPollingStrategy.java index 28722a9..d45982c 100644 --- a/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/WaitOnMissingRecordPollingStrategy.java +++ b/component/src/main/java/org/wso2/extension/siddhi/io/cdc/source/polling/strategies/WaitOnMissingRecordPollingStrategy.java @@ -66,7 +66,6 @@ public void poll() { Connection connection = getConnection(); PreparedStatement statement = null; ResultSet resultSet = null; - boolean breakOnMissingRecord = false; int waitingFor = -1; long waitingFrom = -1; try { @@ -115,12 +114,13 @@ public void poll() { waitingFrom = System.currentTimeMillis(); } - isTimedout = waitTimeout > -1 && waitingFrom + waitTimeout < System.currentTimeMillis(); + long waitEndTimestamp = waitTimeout > -1 ? + waitingFrom + (waitTimeout * 1000L) : Long.MAX_VALUE; + isTimedout = waitEndTimestamp < System.currentTimeMillis(); if (!isTimedout) { log.debug("Missed record found at " + waitingFor + " in table " + tableName + ". Hence pausing the process and " + "retry in " + pollingInterval + " seconds."); - breakOnMissingRecord = true; break; } } @@ -150,9 +150,6 @@ public void poll() { CDCPollingUtil.cleanupConnection(resultSet, null, null); } try { - if (breakOnMissingRecord) { - breakOnMissingRecord = false; - } Thread.sleep((long) pollingInterval * 1000); } catch (InterruptedException e) { log.error(buildError("Error while polling the table %s.", tableName), e); diff --git a/component/src/test/java/org/wso2/extension/siddhi/io/cdc/source/TestCaseOfCDCPollingMode.java b/component/src/test/java/org/wso2/extension/siddhi/io/cdc/source/TestCaseOfCDCPollingMode.java index ce523cb..37f6be0 100644 --- a/component/src/test/java/org/wso2/extension/siddhi/io/cdc/source/TestCaseOfCDCPollingMode.java +++ b/component/src/test/java/org/wso2/extension/siddhi/io/cdc/source/TestCaseOfCDCPollingMode.java @@ -369,10 +369,10 @@ public void receive(Event[] events) { // Do inserts and wait CDC app to capture the events. InputHandler inputHandler = siddhiAppRuntime.getInputHandler("inputStream"); - Object[] ann = new Object[]{11, "Ann"}; - Object[] bob = new Object[]{12, "Bob"}; - Object[] charles = new Object[]{13, "Charles"}; - Object[] david = new Object[]{14, "David"}; + Object[] ann = new Object[]{1, "Ann"}; + Object[] bob = new Object[]{2, "Bob"}; + Object[] charles = new Object[]{3, "Charles"}; + Object[] david = new Object[]{4, "David"}; inputHandler.send(ann); inputHandler.send(bob);