Skip to content

Commit

Permalink
Merge pull request #485 from slgobinath/fix-same-stream-not
Browse files Browse the repository at this point in the history
Fix 'not A and B' not sending output
  • Loading branch information
suhothayan authored Aug 14, 2017
2 parents 4dd96ec + c36fd0b commit 58db072
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -287,10 +287,10 @@ public ComplexEventChunk<StateEvent> processAndReturn(ComplexEventChunk complexE
if (!stateChanged) {
switch (stateType) {
case PATTERN:
stateEvent.setEvent(stateId, null);
stateEvent.setEvent(stateId, currentStreamEvent);
break;
case SEQUENCE:
stateEvent.setEvent(stateId, null);
stateEvent.setEvent(stateId, currentStreamEvent);
iterator.remove();
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2964,6 +2964,79 @@ public void testQueryAbsent65() throws InterruptedException {
siddhiAppRuntime.shutdown();
}

@Test
public void testQueryAbsent66() throws InterruptedException {
log.info("Test the query not e1 and e2 -> e3 with e2 only");

SiddhiManager siddhiManager = new SiddhiManager();

String streams = "" +
"define stream Stream1 (symbol string, price float, volume int); " +
"define stream Stream2 (symbol string, price float, volume int); ";
String query = "" +
"@info(name = 'query1') " +
"from not Stream1[price>50] and e2=Stream2[price>20] " +
"select e2.symbol as symbol2 " +
"insert into OutputStream ;";

SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query);

addCallback(siddhiAppRuntime, "query1", new Object[]{"IBM"});

InputHandler stream2 = siddhiAppRuntime.getInputHandler("Stream2");

siddhiAppRuntime.start();

stream2.send(new Object[]{"IBM", 25.0f, 100});
Thread.sleep(100);

for (AssertionError e : this.assertionErrors) {
throw e;
}
Assert.assertEquals("Number of success events", 1, inEventCount);
Assert.assertEquals("Number of remove events", 0, removeEventCount);
Assert.assertTrue("Event arrived", eventArrived);

siddhiAppRuntime.shutdown();
}

@Test
public void testQueryAbsent67() throws InterruptedException {
log.info("Test the query not e1 and e2 -> e3 with e1 and e2");

SiddhiManager siddhiManager = new SiddhiManager();

String streams = "" +
"define stream Stream1 (symbol string, price float, volume int); ";
String query = "" +
"@info(name = 'query1') " +
"from not Stream1[price==50.0f] and e2=Stream1[price==20.0f] " +
"select e2.symbol as symbol2 " +
"insert into OutputStream ;";

SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query);

addCallback(siddhiAppRuntime, "query1");

InputHandler stream1 = siddhiAppRuntime.getInputHandler("Stream1");

siddhiAppRuntime.start();

stream1.send(new Object[]{"WSO2", 50.0f, 100});
Thread.sleep(100);
stream1.send(new Object[]{"IBM", 20.0f, 100});
Thread.sleep(100);

for (AssertionError e : this.assertionErrors) {
throw e;
}
Assert.assertEquals("Number of success events", 0, inEventCount);
Assert.assertEquals("Number of remove events", 0, removeEventCount);
Assert.assertFalse("Event arrived", eventArrived);

siddhiAppRuntime.shutdown();
}

private void addCallback(SiddhiAppRuntime siddhiAppRuntime, String queryName, Object[]... expected) {
final int noOfExpectedEvents = expected.length;
siddhiAppRuntime.addCallback(queryName, new QueryCallback() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,22 @@ public static AbsentStreamStateElement logicalNot(StreamStateElement streamState
return new AbsentStreamStateElement(streamStateElement.getBasicSingleInputStream(), time);
}

public static StateElement logicalNotAnd(StreamStateElement streamStateElement1,
AbsentStreamStateElement streamStateElement2) {
public static StateElement logicalNotAnd(AbsentStreamStateElement streamStateElement1,
StreamStateElement streamStateElement2) {

if (streamStateElement1 instanceof AbsentStreamStateElement) {
if (streamStateElement2 instanceof AbsentStreamStateElement) {
// not A for 1 sec and not B for 1 sec
return new LogicalStateElement(streamStateElement1, LogicalStateElement.Type.AND,
streamStateElement2);
}
if (streamStateElement2.getWaitingTime() == null) {
if (streamStateElement1.getWaitingTime() == null) {
// not A and B
return new LogicalStateElement(streamStateElement1, LogicalStateElement.Type.AND,
streamStateElement2);
} else {
// not A for 1 sec and B
return new LogicalStateElement(streamStateElement2, LogicalStateElement.Type.AND,
streamStateElement1);
return new LogicalStateElement(streamStateElement1, LogicalStateElement.Type.AND,
streamStateElement2);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -957,7 +957,7 @@ public Object visitLogical_absent_stateful_source(SiddhiQLParser.Logical_absent_
absentStreamState = State.logicalNot(new StreamStateElement((BasicSingleInputStream) visit(ctx
.basic_source())));
}
return State.logicalNotAnd(presentStreamState, absentStreamState);
return State.logicalNotAnd(absentStreamState, presentStreamState);
}
} else if (ctx.OR() != null) {
if (ctx.basic_absent_pattern_source().size() == 2) {
Expand Down

0 comments on commit 58db072

Please sign in to comment.