Skip to content

Commit

Permalink
Reduce the probability of intermittent test failure
Browse files Browse the repository at this point in the history
  • Loading branch information
lasanthaS committed Oct 23, 2019
1 parent 8fecfef commit dc0145b
Showing 1 changed file with 100 additions and 97 deletions.
Expand Up @@ -42,13 +42,12 @@ public class TestCaseOfCDCPollingMode {
private Event currentEvent;
private AtomicInteger eventCount = new AtomicInteger(0);
private AtomicBoolean eventArrived = new AtomicBoolean(false);
private int waitTime = 50;
private int timeout = 10000;
private int waitTime = 5000;
private int timeout = 50000;
private String username;
private String password;
private String jdbcDriverName;
private String databaseURL;
private String pollingTableName = "login";
private String pollingColumn = "id";

@BeforeClass
Expand Down Expand Up @@ -106,6 +105,7 @@ public void testCDCPollingMode() throws InterruptedException {
log.info("CDC TestCase: Capturing change data with polling mode.");
log.info("------------------------------------------------------------------------------------------------");

String pollingTableName = "loginTable1";
SiddhiManager siddhiManager = new SiddhiManager();

int pollingInterval = 1;
Expand All @@ -123,11 +123,11 @@ public void testCDCPollingMode() throws InterruptedException {
"@Store(type='rdbms', jdbc.url='" + databaseURL + "'," +
" username='" + username + "', password='" + password + "' ," +
" jdbc.driver.name='" + jdbcDriverName + "')" +
" define table login (id string, name string);";
" define table loginTable1 (id string, name string);";

String rdbmsQuery = "@info(name='query2') " +
"from insertionStream " +
"insert into login;";
"insert into loginTable1;";

QueryCallback rdbmsQueryCallback = new QueryCallback() {
@Override
Expand Down Expand Up @@ -177,15 +177,104 @@ public void receive(Event[] events) {
siddhiManager.shutdown();
}

@Test(dependsOnMethods = {"testCDCPollingMode"})
public void testOutOfOrderRecords() throws InterruptedException {
log.info("------------------------------------------------------------------------------------------------");
log.info("CDC TestCase: Test missed/out-of-order events in polling mode.");
log.info("------------------------------------------------------------------------------------------------");

SiddhiManager siddhiManager = new SiddhiManager();

int pollingInterval = 1;
String cdcinStreamDefinition = "@source(type = 'cdc', " +
"mode='polling', " +
"polling.column='" + pollingColumn + "', " +
"jdbc.driver.name='" + jdbcDriverName + "', " +
"url = '" + databaseURL + "', " +
"username = '" + username + "', " +
"password = '" + password + "', " +
"table.name = 'students', " +
"polling.interval = '" + pollingInterval + "', " +
"operation = 'insert', " +
"wait.on.missed.record = 'true'," +
"missed.record.waiting.timeout = '10'," +
"@map(type='keyvalue'), " +
"@attributes(id = 'id', name = 'name'))" +
"define stream outputStream (id int, name string);\n";

String rdbmsStoreDefinition = "define stream inputStream (id int, name string);" +
"@Store(type='rdbms', " +
"jdbc.url='" + databaseURL + "', " +
"username='" + username + "', " +
"password='" + password + "' , " +
"jdbc.driver.name='" + jdbcDriverName + "')" +
"define table students (id int, name string);";

String rdbmsQuery = "@info(name='query2') " +
"from inputStream " +
"insert into students;";

QueryCallback rdbmsQueryCallback = new QueryCallback() {
@Override
public void receive(long timestamp, Event[] inEvents, Event[] removeEvents) {
for (Event event : inEvents) {
log.info("insert done: " + event);
}
}
};

SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(cdcinStreamDefinition +
rdbmsStoreDefinition + rdbmsQuery);
siddhiAppRuntime.addCallback("query2", rdbmsQueryCallback);

StreamCallback outputStreamCallback = new StreamCallback() {
@Override
public void receive(Event[] events) {
for (Event event : events) {
eventCount.getAndIncrement();
log.info(eventCount + ". " + event);
}
}
};

siddhiAppRuntime.addCallback("outputStream", outputStreamCallback);
siddhiAppRuntime.start();

// Wait till CDC poller initializes.
Thread.sleep(5000);

// Do inserts and wait CDC app to capture the events.
InputHandler inputHandler = siddhiAppRuntime.getInputHandler("inputStream");
Object[] ann = new Object[]{1, "Ann"};
Object[] bob = new Object[]{2, "Bob"};
Object[] charles = new Object[]{3, "Charles"};
Object[] david = new Object[]{4, "David"};

inputHandler.send(ann);
inputHandler.send(bob);
inputHandler.send(david);
Thread.sleep(1000);
inputHandler.send(charles);

SiddhiTestHelper.waitForEvents(waitTime, 4, eventCount, timeout);

// Assert received event count.
Assert.assertEquals(eventCount.get(), 4);

siddhiAppRuntime.shutdown();
siddhiManager.shutdown();
}

/**
* Test case to test state persistence of polling mode.
*/
@Test(dependsOnMethods = {"testCDCPollingMode"})
@Test(dependsOnMethods = {"testOutOfOrderRecords"})
public void testCDCPollingModeStatePersistence() throws InterruptedException {
log.info("------------------------------------------------------------------------------------------------");
log.info("CDC TestCase: Testing state persistence of the polling mode.");
log.info("------------------------------------------------------------------------------------------------");

String pollingTableName = "loginTable2";
PersistenceStore persistenceStore = new InMemoryPersistenceStore();
SiddhiManager siddhiManager = new SiddhiManager();
siddhiManager.setPersistenceStore(persistenceStore);
Expand All @@ -206,11 +295,11 @@ public void testCDCPollingModeStatePersistence() throws InterruptedException {
"\n@Store(type='rdbms', jdbc.url='" + databaseURL + "'," +
" username='" + username + "', password='" + password + "' ," +
" jdbc.driver.name='" + jdbcDriverName + "')" +
"\ndefine table login (id string, name string);";
"\ndefine table loginTable2 (id string, name string);";

String rdbmsQuery = "@info(name='query2') " +
"from insertionStream " +
"insert into login;";
"insert into loginTable2;";

QueryCallback rdbmsQueryCallback = new QueryCallback() {
@Override
Expand Down Expand Up @@ -257,7 +346,7 @@ public void receive(Event[] events) {
Assert.assertEquals(insertingObject, currentEvent.getData());

//persisting
Thread.sleep(500);
Thread.sleep(5000);
siddhiAppRuntime.persist();

//stopping siddhi app
Expand All @@ -271,13 +360,15 @@ public void receive(Event[] events) {
inputHandler = siddhiAppRuntime.getInputHandler("insertionStream");
insertingObject = new Object[]{"e004", "new_employer"};
inputHandler.send(insertingObject);
Thread.sleep(5000);
siddhiAppRuntime.shutdown();

//start CDC siddhi app
init();
siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(cdcinStreamDefinition);
siddhiAppRuntime.addCallback("istm", insertionStreamCallback);
siddhiAppRuntime.start();
Thread.sleep(5000);

//loading
try {
Expand All @@ -300,92 +391,4 @@ public void receive(Event[] events) {
siddhiAppRuntime.shutdown();
siddhiManager.shutdown();
}

@Test(dependsOnMethods = {"testCDCPollingModeStatePersistence"})
public void testOutOfOrderRecords() throws InterruptedException {
log.info("------------------------------------------------------------------------------------------------");
log.info("CDC TestCase: Test missed/out-of-order events in polling mode.");
log.info("------------------------------------------------------------------------------------------------");

SiddhiManager siddhiManager = new SiddhiManager();

int pollingInterval = 1;
String cdcinStreamDefinition = "@source(type = 'cdc', " +
"mode='polling', " +
"polling.column='" + pollingColumn + "', " +
"jdbc.driver.name='" + jdbcDriverName + "', " +
"url = '" + databaseURL + "', " +
"username = '" + username + "', " +
"password = '" + password + "', " +
"table.name = 'students', " +
"polling.interval = '" + pollingInterval + "', " +
"operation = 'insert', " +
"wait.on.missed.record = 'true'," +
"missed.record.waiting.timeout = '10'," +
"@map(type='keyvalue'), " +
"@attributes(id = 'id', name = 'name'))" +
"define stream outputStream (id int, name string);\n";

String rdbmsStoreDefinition = "define stream inputStream (id int, name string);" +
"@Store(type='rdbms', " +
"jdbc.url='" + databaseURL + "', " +
"username='" + username + "', " +
"password='" + password + "' , " +
"jdbc.driver.name='" + jdbcDriverName + "')" +
"define table students (id int, name string);";

String rdbmsQuery = "@info(name='query2') " +
"from inputStream " +
"insert into students;";

QueryCallback rdbmsQueryCallback = new QueryCallback() {
@Override
public void receive(long timestamp, Event[] inEvents, Event[] removeEvents) {
for (Event event : inEvents) {
log.info("insert done: " + event);
}
}
};

SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(cdcinStreamDefinition +
rdbmsStoreDefinition + rdbmsQuery);
siddhiAppRuntime.addCallback("query2", rdbmsQueryCallback);

StreamCallback outputStreamCallback = new StreamCallback() {
@Override
public void receive(Event[] events) {
for (Event event : events) {
eventCount.getAndIncrement();
log.info(eventCount + ". " + event);
}
}
};

siddhiAppRuntime.addCallback("outputStream", outputStreamCallback);
siddhiAppRuntime.start();

// Wait till CDC poller initializes.
Thread.sleep(5000);

// Do inserts and wait CDC app to capture the events.
InputHandler inputHandler = siddhiAppRuntime.getInputHandler("inputStream");
Object[] ann = new Object[]{1, "Ann"};
Object[] bob = new Object[]{2, "Bob"};
Object[] charles = new Object[]{3, "Charles"};
Object[] david = new Object[]{4, "David"};

inputHandler.send(ann);
inputHandler.send(bob);
inputHandler.send(david);
Thread.sleep(1000);
inputHandler.send(charles);

SiddhiTestHelper.waitForEvents(waitTime, 4, eventCount, timeout);

// Assert received event count.
Assert.assertEquals(eventCount.get(), 4);

siddhiAppRuntime.shutdown();
siddhiManager.shutdown();
}
}

0 comments on commit dc0145b

Please sign in to comment.