diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/input/stream/state/LogicalPostStateProcessor.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/input/stream/state/LogicalPostStateProcessor.java index 695b730518..1f5bb0874e 100644 --- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/input/stream/state/LogicalPostStateProcessor.java +++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/input/stream/state/LogicalPostStateProcessor.java @@ -67,6 +67,11 @@ protected void process(StateEvent stateEvent, ComplexEventChunk complexEventChun break; case OR: super.process(stateEvent, complexEventChunk); + if (partnerPostStateProcessor.nextProcessor != null && thisStatePreProcessor.thisLastProcessor == + partnerPostStateProcessor) { + // 'from A or B select' scenario require this + partnerPostStateProcessor.isEventReturned = true; + } break; case NOT: break; diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/input/stream/state/LogicalPreStateProcessor.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/input/stream/state/LogicalPreStateProcessor.java index 64d9e58606..10ff663397 100644 --- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/input/stream/state/LogicalPreStateProcessor.java +++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/input/stream/state/LogicalPreStateProcessor.java @@ -60,17 +60,30 @@ public PreStateProcessor cloneProcessor(String key) { @Override public void addState(StateEvent stateEvent) { - if (newAndEveryStateEventList.isEmpty()) { + if (isStartState || stateType == StateInputStream.Type.SEQUENCE) { + if (newAndEveryStateEventList.isEmpty()) { + newAndEveryStateEventList.add(stateEvent); + } + if (partnerStatePreProcessor != null && partnerStatePreProcessor.newAndEveryStateEventList.isEmpty()) { + partnerStatePreProcessor.newAndEveryStateEventList.add(stateEvent); + } + } else { newAndEveryStateEventList.add(stateEvent); + if (partnerStatePreProcessor != null) { + partnerStatePreProcessor.newAndEveryStateEventList.add(stateEvent); + } } - if (partnerStatePreProcessor != null && partnerStatePreProcessor.newAndEveryStateEventList.isEmpty()) { - partnerStatePreProcessor.newAndEveryStateEventList.add(stateEvent); - } + } @Override public void addEveryState(StateEvent stateEvent) { - newAndEveryStateEventList.add(stateEventCloner.copyStateEvent(stateEvent)); + StateEvent clonedEvent = stateEventCloner.copyStateEvent(stateEvent); + clonedEvent.setEvent(stateId, null); + newAndEveryStateEventList.add(clonedEvent); + if (partnerStatePreProcessor != null) { + partnerStatePreProcessor.newAndEveryStateEventList.add(clonedEvent); + } } public void setStartState(boolean isStartState) { @@ -82,13 +95,21 @@ public void setStartState(boolean isStartState) { @Override public void resetState() { - pendingStateEventList.clear(); - partnerStatePreProcessor.pendingStateEventList.clear(); - - if (isStartState && newAndEveryStateEventList.isEmpty()) { - // if (isStartState && stateType == StateInputStream.Type.SEQUENCE && newAndEveryStateEventList - // .isEmpty()) { - init(); + if (logicalType == LogicalStateElement.Type.OR || pendingStateEventList.size() == + partnerStatePreProcessor.pendingStateEventList.size()) { + pendingStateEventList.clear(); + partnerStatePreProcessor.pendingStateEventList.clear(); + + if (isStartState && newAndEveryStateEventList.isEmpty()) { + // if (isStartState && stateType == StateInputStream.Type.SEQUENCE && newAndEveryStateEventList + // .isEmpty()) { + if (stateType == StateInputStream.Type.SEQUENCE && thisStatePostProcessor.nextEveryStatePerProcessor == + null && !((StreamPreStateProcessor) thisStatePostProcessor.nextStatePerProcessor) + .pendingStateEventList.isEmpty()) { + return; + } + init(); + } } } @@ -108,6 +129,12 @@ public ComplexEventChunk processAndReturn(ComplexEventChunk complexE StreamEvent streamEvent = (StreamEvent) complexEventChunk.next(); //Sure only one will be sent for (Iterator iterator = pendingStateEventList.iterator(); iterator.hasNext(); ) { StateEvent stateEvent = iterator.next(); + if (withinStates.size() > 0) { + if (isExpired(stateEvent, streamEvent)) { + iterator.remove(); + continue; + } + } if (logicalType == LogicalStateElement.Type.OR && stateEvent.getStreamEvent(partnerStatePreProcessor .getStateId()) != null) { iterator.remove(); diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/input/stream/state/StreamPreStateProcessor.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/input/stream/state/StreamPreStateProcessor.java index 93b8499f26..f2f7f20d8a 100644 --- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/input/stream/state/StreamPreStateProcessor.java +++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/query/input/stream/state/StreamPreStateProcessor.java @@ -64,6 +64,7 @@ public class StreamPreStateProcessor implements PreStateProcessor, Snapshotable protected StateEventCloner stateEventCloner; protected StreamEventPool streamEventPool; protected String queryName; + private boolean initialized; public StreamPreStateProcessor(StateInputStream.Type stateType, List>> withinStates) { this.stateType = stateType; @@ -98,7 +99,7 @@ public void process(ComplexEventChunk complexEventChunk) { "processAndReturn method is used for handling event chunks."); } - private boolean isExpired(StateEvent pendingStateEvent, StreamEvent incomingStreamEvent) { + protected boolean isExpired(StateEvent pendingStateEvent, StreamEvent incomingStreamEvent) { for (Map.Entry> withinEntry : withinStates) { for (Integer withinStateId : withinEntry.getValue()) { if (withinStateId == SiddhiConstants.ANY) { @@ -162,9 +163,10 @@ public void setToLast(Processor processor) { } public void init() { - if (isStartState) { + if (isStartState && (!initialized || this.thisStatePostProcessor.nextEveryStatePerProcessor != null)) { StateEvent stateEvent = stateEventPool.borrowEvent(); addState(stateEvent); + initialized = true; } } @@ -250,6 +252,11 @@ public void resetState() { if (isStartState && newAndEveryStateEventList.isEmpty()) { // if (isStartState && stateType == StateInputStream.Type.SEQUENCE && newAndEveryStateEventList // .isEmpty()) { + if (stateType == StateInputStream.Type.SEQUENCE && thisStatePostProcessor.nextEveryStatePerProcessor == + null && !((StreamPreStateProcessor) thisStatePostProcessor.nextStatePerProcessor) + .pendingStateEventList.isEmpty()) { + return; + } init(); } } diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/StateInputStreamParser.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/StateInputStreamParser.java index 02ff75f40a..8692333e82 100644 --- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/StateInputStreamParser.java +++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/StateInputStreamParser.java @@ -244,10 +244,8 @@ private static InnerStateRuntime parse(StateElement stateElement, Map20] -> e2=Stream2['IBM' == symbol] and " + + "e3=Stream3['WSO2' == symbol]" + + "select e1.price as price1, e2.price as price2, e3.price as price3 " + + "insert into OutputStream ;"; + + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query); + + siddhiAppRuntime.addCallback("query1", new QueryCallback() { + @Override + public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { + EventPrinter.print(timeStamp, inEvents, removeEvents); + if (inEvents != null) { + inEventCount = inEventCount + inEvents.length; + switch (inEventCount) { + case 1: + Assert.assertArrayEquals(new Object[]{25.5f, 45.5f, 46.56f}, inEvents[0].getData()); + break; + case 2: + Assert.assertArrayEquals(new Object[]{59.65f, 45.5f, 46.56f}, inEvents[0].getData()); + break; + default: + Assert.assertEquals("Number of success events", 2, inEventCount); + } + eventArrived = true; + } + if (removeEvents != null) { + removeEventCount = removeEventCount + removeEvents.length; + } + eventArrived = true; + } + + }); + + InputHandler stream1 = siddhiAppRuntime.getInputHandler("Stream1"); + InputHandler stream2 = siddhiAppRuntime.getInputHandler("Stream2"); + InputHandler stream3 = siddhiAppRuntime.getInputHandler("Stream3"); + + siddhiAppRuntime.start(); + + stream1.send(new Object[]{"IBM", 25.5f, 100}); + Thread.sleep(100); + stream1.send(new Object[]{"IBM", 59.65f, 100}); + Thread.sleep(100); + stream2.send(new Object[]{"IBM", 45.5f, 100}); + Thread.sleep(100); + stream3.send(new Object[]{"WSO2", 46.56f, 100}); + Thread.sleep(100); + + Assert.assertEquals("Number of success events", 2, inEventCount); + Assert.assertEquals("Number of remove events", 0, removeEventCount); + Assert.assertEquals("Event arrived", true, eventArrived); + + siddhiAppRuntime.shutdown(); + } + + @Test + public void testQuery12() throws InterruptedException { + log.info("testPatternLogical12 - OUT 2"); + + SiddhiManager siddhiManager = new SiddhiManager(); + + String streams = "" + + "define stream Stream1 (symbol string, price float, volume int); " + + "define stream Stream2 (symbol string, price float, volume int); " + + "define stream Stream3 (symbol string, price float, volume int); "; + + String query = "" + + "@info(name = 'query1') " + + "from every e1=Stream1[price >20] -> e2=Stream2['IBM' == symbol] or " + + "e3=Stream3['WSO2' == symbol]" + + "select e1.price as price1, e2.price as price2, e3.price as price3 " + + "insert into OutputStream ;"; + + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query); + + siddhiAppRuntime.addCallback("query1", new QueryCallback() { + @Override + public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { + EventPrinter.print(timeStamp, inEvents, removeEvents); + if (inEvents != null) { + inEventCount = inEventCount + inEvents.length; + switch (inEventCount) { + case 1: + Assert.assertArrayEquals(new Object[]{25.5f, 45.5f, null}, inEvents[0].getData()); + break; + case 2: + Assert.assertArrayEquals(new Object[]{59.65f, 45.5f, null}, inEvents[0].getData()); + break; + default: + Assert.assertEquals("Number of success events", 2, inEventCount); + } + eventArrived = true; + } + if (removeEvents != null) { + removeEventCount = removeEventCount + removeEvents.length; + } + eventArrived = true; + } + + }); + + InputHandler stream1 = siddhiAppRuntime.getInputHandler("Stream1"); + InputHandler stream2 = siddhiAppRuntime.getInputHandler("Stream2"); + InputHandler stream3 = siddhiAppRuntime.getInputHandler("Stream3"); + + siddhiAppRuntime.start(); + + stream1.send(new Object[]{"IBM", 25.5f, 100}); + Thread.sleep(100); + stream1.send(new Object[]{"IBM", 59.65f, 100}); + Thread.sleep(100); + stream2.send(new Object[]{"IBM", 45.5f, 100}); + Thread.sleep(100); + + Assert.assertEquals("Number of success events", 2, inEventCount); + Assert.assertEquals("Number of remove events", 0, removeEventCount); + Assert.assertEquals("Event arrived", true, eventArrived); + + siddhiAppRuntime.shutdown(); + } + + @Test + public void testQuery13() throws InterruptedException { + log.info("testQuery13 - OUT 1"); + + SiddhiManager siddhiManager = new SiddhiManager(); + + String streams = "" + + "define stream Stream1 (symbol string, price float, volume int); " + + "define stream Stream2 (symbol string, price float, volume int); "; + String query = "" + + "@info(name = 'query1') " + + "from e1=Stream1[price > 20] and e2=Stream2[price >30] " + + "select e1.symbol as symbol1, e2.price as price2 " + + "insert into OutputStream ;"; + + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query); + + siddhiAppRuntime.addCallback("query1", new QueryCallback() { + @Override + public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { + EventPrinter.print(timeStamp, inEvents, removeEvents); + if (inEvents != null) { + inEventCount = inEventCount + inEvents.length; + Assert.assertArrayEquals(new Object[]{"WSO2", 35.0f}, inEvents[0].getData()); + eventArrived = true; + } + if (removeEvents != null) { + removeEventCount = removeEventCount + removeEvents.length; + } + eventArrived = true; + } + + }); + + InputHandler stream1 = siddhiAppRuntime.getInputHandler("Stream1"); + InputHandler stream2 = siddhiAppRuntime.getInputHandler("Stream2"); + + siddhiAppRuntime.start(); + + stream1.send(new Object[]{"WSO2", 25.0f, 100}); + Thread.sleep(100); + stream2.send(new Object[]{"IBM", 35.0f, 100}); + Thread.sleep(100); + stream1.send(new Object[]{"GOOGLE", 45.0f, 100}); + Thread.sleep(100); + stream2.send(new Object[]{"ORACLE", 55.0f, 100}); + Thread.sleep(100); + + Assert.assertEquals("Number of success events", 1, inEventCount); + Assert.assertEquals("Number of remove events", 0, removeEventCount); + Assert.assertEquals("Event arrived", true, eventArrived); + + siddhiAppRuntime.shutdown(); + } + + @Test + public void testQuery14() throws InterruptedException { + log.info("testQuery14 - OUT 1"); + + SiddhiManager siddhiManager = new SiddhiManager(); + + String streams = "" + + "define stream Stream1 (symbol string, price float, volume int); " + + "define stream Stream2 (symbol string, price float, volume int); "; + String query = "" + + "@info(name = 'query1') " + + "from e1=Stream1[price > 20] or e2=Stream2[price >30] " + + "select e1.symbol as symbol1, e2.price as price2 " + + "insert into OutputStream ;"; + + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query); + + siddhiAppRuntime.addCallback("query1", new QueryCallback() { + @Override + public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { + EventPrinter.print(timeStamp, inEvents, removeEvents); + if (inEvents != null) { + inEventCount = inEventCount + inEvents.length; + eventArrived = true; + Assert.assertArrayEquals(new Object[]{"WSO2", null}, inEvents[0].getData()); + } + if (removeEvents != null) { + removeEventCount = removeEventCount + removeEvents.length; + } + eventArrived = true; + } + + }); + + InputHandler stream1 = siddhiAppRuntime.getInputHandler("Stream1"); + InputHandler stream2 = siddhiAppRuntime.getInputHandler("Stream2"); + + siddhiAppRuntime.start(); + + stream1.send(new Object[]{"WSO2", 25.0f, 100}); + Thread.sleep(100); + stream2.send(new Object[]{"IBM", 35.0f, 100}); + Thread.sleep(100); + stream2.send(new Object[]{"ORACLE", 45.0f, 100}); + Thread.sleep(100); + + Assert.assertEquals("Number of success events", 1, inEventCount); + Assert.assertEquals("Number of remove events", 0, removeEventCount); + Assert.assertEquals("Event arrived", true, eventArrived); + + siddhiAppRuntime.shutdown(); + } + + @Test + public void testQuery15() throws InterruptedException { + log.info("testQuery15 - OUT 2"); + + SiddhiManager siddhiManager = new SiddhiManager(); + + String streams = "" + + "define stream Stream1 (symbol string, price float, volume int); " + + "define stream Stream2 (symbol string, price float, volume int); "; + String query = "" + + "@info(name = 'query1') " + + "from every (e1=Stream1[price > 20] and e2=Stream2[price >30]) " + + "select e1.symbol as symbol1, e2.price as price2 " + + "insert into OutputStream ;"; + + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query); + + siddhiAppRuntime.addCallback("query1", new QueryCallback() { + @Override + public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { + EventPrinter.print(timeStamp, inEvents, removeEvents); + if (inEvents != null) { + inEventCount = inEventCount + inEvents.length; + eventArrived = true; + switch (inEventCount) { + case 1: + Assert.assertArrayEquals(new Object[]{"WSO2", 35.0f}, inEvents[0].getData()); + break; + case 2: + Assert.assertArrayEquals(new Object[]{"GOOGLE", 55.0f}, inEvents[0].getData()); + } + } + if (removeEvents != null) { + removeEventCount = removeEventCount + removeEvents.length; + } + eventArrived = true; + } + + }); + + InputHandler stream1 = siddhiAppRuntime.getInputHandler("Stream1"); + InputHandler stream2 = siddhiAppRuntime.getInputHandler("Stream2"); + + siddhiAppRuntime.start(); + + stream1.send(new Object[]{"WSO2", 25.0f, 100}); + Thread.sleep(100); + stream2.send(new Object[]{"IBM", 35.0f, 100}); + Thread.sleep(100); + stream1.send(new Object[]{"GOOGLE", 45.0f, 100}); + Thread.sleep(100); + stream2.send(new Object[]{"ORACLE", 55.0f, 100}); + Thread.sleep(100); + + Assert.assertEquals("Number of success events", 2, inEventCount); + Assert.assertEquals("Number of remove events", 0, removeEventCount); + Assert.assertEquals("Event arrived", true, eventArrived); + + siddhiAppRuntime.shutdown(); + } + + @Test + public void testQuery16() throws InterruptedException { + log.info("testQuery16 - OUT 3"); + + SiddhiManager siddhiManager = new SiddhiManager(); + + String streams = "" + + "define stream Stream1 (symbol string, price float, volume int); " + + "define stream Stream2 (symbol string, price float, volume int); "; + String query = "" + + "@info(name = 'query1') " + + "from every (e1=Stream1[price > 20] or e2=Stream2[price >30]) " + + "select e1.symbol as symbol1, e2.price as price2 " + + "insert into OutputStream ;"; + + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query); + + siddhiAppRuntime.addCallback("query1", new QueryCallback() { + @Override + public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { + EventPrinter.print(timeStamp, inEvents, removeEvents); + if (inEvents != null) { + inEventCount = inEventCount + inEvents.length; + eventArrived = true; + + switch (inEventCount) { + case 1: + Assert.assertArrayEquals(new Object[]{"WSO2", null}, inEvents[0].getData()); + break; + case 2: + Assert.assertArrayEquals(new Object[]{null, 35.0f}, inEvents[0].getData()); + break; + case 3: + Assert.assertArrayEquals(new Object[]{null, 45.0f}, inEvents[0].getData()); + } + } + if (removeEvents != null) { + removeEventCount = removeEventCount + removeEvents.length; + } + eventArrived = true; + } + + }); + + InputHandler stream1 = siddhiAppRuntime.getInputHandler("Stream1"); + InputHandler stream2 = siddhiAppRuntime.getInputHandler("Stream2"); + + siddhiAppRuntime.start(); + + stream1.send(new Object[]{"WSO2", 25.0f, 100}); + Thread.sleep(100); + stream2.send(new Object[]{"IBM", 35.0f, 100}); + Thread.sleep(100); + stream2.send(new Object[]{"ORACLE", 45.0f, 100}); + Thread.sleep(100); + + Assert.assertEquals("Number of success events", 3, inEventCount); + Assert.assertEquals("Number of remove events", 0, removeEventCount); + Assert.assertEquals("Event arrived", true, eventArrived); + + siddhiAppRuntime.shutdown(); + } + + @Test + public void testQuery17() throws InterruptedException { + log.info("testQuery17 - OUT 0"); + + SiddhiManager siddhiManager = new SiddhiManager(); + + String streams = "" + + "define stream Stream1 (symbol string, price float, volume int); " + + "define stream Stream2 (symbol string, price float, volume int); "; + String query = "" + + "@info(name = 'query1') " + + "from e1=Stream1[price > 20] -> e2=Stream2[price > e1.price] or e3=Stream2['IBM' == symbol] " + + " within 1 sec " + + "select e1.symbol as symbol1, e2.symbol as symbol2 " + + "insert into OutputStream ;"; + + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query); + + siddhiAppRuntime.addCallback("query1", new QueryCallback() { + @Override + public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { + EventPrinter.print(timeStamp, inEvents, removeEvents); + if (inEvents != null) { + inEventCount = inEventCount + inEvents.length; + eventArrived = true; + } + if (removeEvents != null) { + removeEventCount = removeEventCount + removeEvents.length; + } + eventArrived = true; + } + + }); + + InputHandler stream1 = siddhiAppRuntime.getInputHandler("Stream1"); + InputHandler stream2 = siddhiAppRuntime.getInputHandler("Stream2"); + + siddhiAppRuntime.start(); + + stream1.send(new Object[]{"WSO2", 55.6f, 100}); + Thread.sleep(1100); + stream2.send(new Object[]{"GOOG", 59.6f, 100}); + Thread.sleep(100); + + Assert.assertEquals("Number of success events", 0, inEventCount); + Assert.assertEquals("Number of remove events", 0, removeEventCount); + Assert.assertEquals("Event arrived", false, eventArrived); + + siddhiAppRuntime.shutdown(); + } + + @Test + public void testQuery18() throws InterruptedException { + log.info("testQuery18 - OUT 0"); + + SiddhiManager siddhiManager = new SiddhiManager(); + + String streams = "" + + "define stream Stream1 (symbol string, price float, volume int); " + + "define stream Stream2 (symbol string, price float, volume int); "; + String query = "" + + "@info(name = 'query1') " + + "from e1=Stream1[price > 20] -> e2=Stream2[price > e1.price] and e3=Stream2['IBM' == symbol] " + + " within 1 sec " + + "select e1.symbol as symbol1, e2.price as price2, e3.price as price3 " + + "insert into OutputStream ;"; + + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query); + + siddhiAppRuntime.addCallback("query1", new QueryCallback() { + @Override + public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { + EventPrinter.print(timeStamp, inEvents, removeEvents); + if (inEvents != null) { + inEventCount = inEventCount + inEvents.length; + eventArrived = true; + } + if (removeEvents != null) { + removeEventCount = removeEventCount + removeEvents.length; + } + eventArrived = true; + } + + }); + + InputHandler stream1 = siddhiAppRuntime.getInputHandler("Stream1"); + InputHandler stream2 = siddhiAppRuntime.getInputHandler("Stream2"); + + siddhiAppRuntime.start(); + + stream1.send(new Object[]{"WSO2", 55.6f, 100}); + Thread.sleep(100); + stream2.send(new Object[]{"GOOG", 72.7f, 100}); + Thread.sleep(1100); + stream2.send(new Object[]{"IBM", 4.7f, 100}); + Thread.sleep(100); + + Assert.assertEquals("Number of success events", 0, inEventCount); + Assert.assertEquals("Number of remove events", 0, removeEventCount); + Assert.assertEquals("Event arrived", false, eventArrived); + + siddhiAppRuntime.shutdown(); + } } diff --git a/modules/siddhi-core/src/test/java/org/wso2/siddhi/core/query/sequence/SequenceTestCase.java b/modules/siddhi-core/src/test/java/org/wso2/siddhi/core/query/sequence/SequenceTestCase.java index 2c58ad8dd6..e3021ba5ab 100644 --- a/modules/siddhi-core/src/test/java/org/wso2/siddhi/core/query/sequence/SequenceTestCase.java +++ b/modules/siddhi-core/src/test/java/org/wso2/siddhi/core/query/sequence/SequenceTestCase.java @@ -1605,6 +1605,420 @@ public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { siddhiAppRuntime.shutdown(); } + @Test + public void testQuery25() throws InterruptedException { + log.info("testQuery25 - OUT 1"); + + SiddhiManager siddhiManager = new SiddhiManager(); + + String streams = "" + + "define stream Stream1 (symbol string, price float, volume int); " + + "define stream Stream2 (symbol string, price float, volume int); " + + "define stream Stream3 (symbol string, price float, volume int); "; + + String query = "" + + "@info(name = 'query1') " + + "from e1=Stream1[price >20], e2=Stream2['IBM' == symbol] and " + + "e3=Stream3['WSO2' == symbol]" + + "select e1.price as price1, e2.price as price2, e3.price as price3 " + + "insert into OutputStream ;"; + + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query); + + siddhiAppRuntime.addCallback("query1", new QueryCallback() { + @Override + public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { + EventPrinter.print(timeStamp, inEvents, removeEvents); + if (inEvents != null) { + inEventCount = inEventCount + inEvents.length; + switch (inEventCount) { + case 1: + Assert.assertArrayEquals(new Object[]{25.5f, 45.5f, 46.56f}, inEvents[0].getData()); + break; + default: + Assert.assertEquals("Number of success events", 1, inEventCount); + } + eventArrived = true; + } + if (removeEvents != null) { + removeEventCount = removeEventCount + removeEvents.length; + } + eventArrived = true; + } + + }); + + InputHandler stream1 = siddhiAppRuntime.getInputHandler("Stream1"); + InputHandler stream2 = siddhiAppRuntime.getInputHandler("Stream2"); + InputHandler stream3 = siddhiAppRuntime.getInputHandler("Stream3"); + + siddhiAppRuntime.start(); + + stream1.send(new Object[]{"IBM", 25.5f, 100}); + Thread.sleep(100); + stream2.send(new Object[]{"IBM", 45.5f, 100}); + Thread.sleep(100); + stream3.send(new Object[]{"WSO2", 46.56f, 100}); + Thread.sleep(100); + + Assert.assertEquals("Number of success events", 1, inEventCount); + Assert.assertEquals("Number of remove events", 0, removeEventCount); + Assert.assertEquals("Event arrived", true, eventArrived); + + siddhiAppRuntime.shutdown(); + } + + @Test + public void testQuery26() throws InterruptedException { + log.info("testQuery26 - OUT 1"); + + SiddhiManager siddhiManager = new SiddhiManager(); + + String streams = "" + + "define stream Stream1 (symbol string, price float, volume int); " + + "define stream Stream2 (symbol string, price float, volume int); " + + "define stream Stream3 (symbol string, price float, volume int); "; + + String query = "" + + "@info(name = 'query1') " + + "from e1=Stream1[price >20], e2=Stream2['IBM' == symbol] and " + + "e3=Stream3['WSO2' == symbol]" + + "select e1.price as price1, e2.price as price2, e3.price as price3 " + + "insert into OutputStream ;"; + + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query); + + siddhiAppRuntime.addCallback("query1", new QueryCallback() { + @Override + public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { + EventPrinter.print(timeStamp, inEvents, removeEvents); + if (inEvents != null) { + inEventCount = inEventCount + inEvents.length; + switch (inEventCount) { + case 1: + Assert.assertArrayEquals(new Object[]{25.5f, 45.5f, 46.56f}, inEvents[0].getData()); + break; + default: + Assert.assertEquals("Number of success events", 1, inEventCount); + } + eventArrived = true; + } + if (removeEvents != null) { + removeEventCount = removeEventCount + removeEvents.length; + } + eventArrived = true; + } + + }); + + InputHandler stream1 = siddhiAppRuntime.getInputHandler("Stream1"); + InputHandler stream2 = siddhiAppRuntime.getInputHandler("Stream2"); + InputHandler stream3 = siddhiAppRuntime.getInputHandler("Stream3"); + + siddhiAppRuntime.start(); + + stream1.send(new Object[]{"IBM", 25.5f, 100}); + Thread.sleep(100); + stream2.send(new Object[]{"IBM", 45.5f, 100}); + Thread.sleep(100); + stream3.send(new Object[]{"WSO2", 46.56f, 100}); + Thread.sleep(100); + + Assert.assertEquals("Number of success events", 1, inEventCount); + Assert.assertEquals("Number of remove events", 0, removeEventCount); + Assert.assertEquals("Event arrived", true, eventArrived); + + siddhiAppRuntime.shutdown(); + } + + @Test + public void testQuery27() throws InterruptedException { + log.info("testQuery27 - OUT 1"); + + SiddhiManager siddhiManager = new SiddhiManager(); + + String streams = "" + + "define stream Stream1 (symbol string, price float, volume int); " + + "define stream Stream2 (symbol string, price float, volume int); " + + "define stream Stream3 (symbol string, price float, volume int); "; + + String query = "" + + "@info(name = 'query1') " + + "from e1=Stream1[price >20], e2=Stream2['IBM' == symbol] or " + + "e3=Stream3['WSO2' == symbol]" + + "select e1.price as price1, e2.price as price2, e3.price as price3 " + + "insert into OutputStream ;"; + + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query); + + siddhiAppRuntime.addCallback("query1", new QueryCallback() { + @Override + public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { + EventPrinter.print(timeStamp, inEvents, removeEvents); + if (inEvents != null) { + inEventCount = inEventCount + inEvents.length; + switch (inEventCount) { + case 1: + Assert.assertArrayEquals(new Object[]{59.65f, 45.5f, null}, inEvents[0].getData()); + break; + default: + Assert.assertEquals("Number of success events", 1, inEventCount); + } + eventArrived = true; + } + if (removeEvents != null) { + removeEventCount = removeEventCount + removeEvents.length; + } + eventArrived = true; + } + + }); + + InputHandler stream1 = siddhiAppRuntime.getInputHandler("Stream1"); + InputHandler stream2 = siddhiAppRuntime.getInputHandler("Stream2"); + + siddhiAppRuntime.start(); + + stream1.send(new Object[]{"IBM", 59.65f, 100}); + Thread.sleep(100); + stream2.send(new Object[]{"IBM", 45.5f, 100}); + Thread.sleep(100); + + Assert.assertEquals("Number of success events", 1, inEventCount); + Assert.assertEquals("Number of remove events", 0, removeEventCount); + Assert.assertEquals("Event arrived", true, eventArrived); + + siddhiAppRuntime.shutdown(); + } + + @Test + public void testQuery28() throws InterruptedException { + log.info("testQuery28 - OUT 1"); + + SiddhiManager siddhiManager = new SiddhiManager(); + + String streams = "" + + "define stream Stream1 (symbol string, price float, volume int); " + + "define stream Stream2 (symbol string, price float, volume int); " + + "define stream Stream3 (symbol string, price float, volume int); "; + + String query = "" + + "@info(name = 'query1') " + + "from e1=Stream1[price >20], e2=Stream2['IBM' == symbol] and " + + "e3=Stream3['WSO2' == symbol]" + + "select e1.price as price1, e2.price as price2, e3.price as price3 " + + "insert into OutputStream ;"; + + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query); + + siddhiAppRuntime.addCallback("query1", new QueryCallback() { + @Override + public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { + EventPrinter.print(timeStamp, inEvents, removeEvents); + if (inEvents != null) { + inEventCount = inEventCount + inEvents.length; + eventArrived = true; + } + if (removeEvents != null) { + removeEventCount = removeEventCount + removeEvents.length; + } + eventArrived = true; + } + + }); + + InputHandler stream1 = siddhiAppRuntime.getInputHandler("Stream1"); + InputHandler stream2 = siddhiAppRuntime.getInputHandler("Stream2"); + InputHandler stream3 = siddhiAppRuntime.getInputHandler("Stream3"); + + siddhiAppRuntime.start(); + + + stream1.send(new Object[]{"IBM", 59.65f, 100}); + Thread.sleep(100); + stream2.send(new Object[]{"IBM", 45.5f, 100}); + Thread.sleep(100); + stream3.send(new Object[]{"WSO2", 46.56f, 100}); + Thread.sleep(100); + + Assert.assertEquals("Number of success events", 1, inEventCount); + Assert.assertEquals("Number of remove events", 0, removeEventCount); + Assert.assertEquals("Event arrived", true, eventArrived); + + siddhiAppRuntime.shutdown(); + } + + @Test + public void testQuery29() throws InterruptedException { + log.info("testSequence29 - OUT 1"); + + SiddhiManager siddhiManager = new SiddhiManager(); + + String streams = "" + + "define stream Stream1 (symbol string, price float, volume int); " + + "define stream Stream2 (symbol string, price float, volume int); "; + String query = "" + + "@info(name = 'query1') " + + "from e1=Stream1[price>20],e2=Stream2[price>e1.price] " + + "select e1.symbol as symbol1, e2.symbol as symbol2 " + + "insert into OutputStream ;"; + + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query); + + siddhiAppRuntime.addCallback("query1", new QueryCallback() { + @Override + public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { + EventPrinter.print(timeStamp, inEvents, removeEvents); + if (inEvents != null) { + inEventCount = inEventCount + inEvents.length; + Assert.assertArrayEquals(new Object[]{"WSO2", "IBM"}, inEvents[0].getData()); + eventArrived = true; + } + if (removeEvents != null) { + removeEventCount = removeEventCount + removeEvents.length; + } + eventArrived = true; + } + + }); + + InputHandler stream1 = siddhiAppRuntime.getInputHandler("Stream1"); + InputHandler stream2 = siddhiAppRuntime.getInputHandler("Stream2"); + + siddhiAppRuntime.start(); + + stream1.send(new Object[]{"WSO2", 55.6f, 100}); + Thread.sleep(100); + stream2.send(new Object[]{"IBM", 55.7f, 100}); + Thread.sleep(100); + stream1.send(new Object[]{"ORACLE", 55.6f, 100}); + Thread.sleep(100); + stream2.send(new Object[]{"GOOGLE", 55.7f, 100}); + Thread.sleep(100); + + Assert.assertEquals("Number of success events", 1, inEventCount); + Assert.assertEquals("Number of remove events", 0, removeEventCount); + Assert.assertEquals("Event arrived", true, eventArrived); + + siddhiAppRuntime.shutdown(); + } + + @Test + public void testQuery30() throws InterruptedException { + log.info("testSequence30 - OUT 2"); + + SiddhiManager siddhiManager = new SiddhiManager(); + + String streams = "" + + "define stream Stream1 (symbol string, price float, volume int); " + + "define stream Stream2 (symbol string, price float, volume int); "; + String query = "" + + "@info(name = 'query1') " + + "from every e1=Stream1[price>20],e2=Stream2[price>e1.price] " + + "select e1.symbol as symbol1, e2.symbol as symbol2 " + + "insert into OutputStream ;"; + + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query); + + siddhiAppRuntime.addCallback("query1", new QueryCallback() { + @Override + public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { + EventPrinter.print(timeStamp, inEvents, removeEvents); + if (inEvents != null) { + inEventCount = inEventCount + inEvents.length; + eventArrived = true; + switch (inEventCount) { + case 1: + Assert.assertArrayEquals(new Object[]{"WSO2", "IBM"}, inEvents[0].getData()); + break; + case 2: + Assert.assertArrayEquals(new Object[]{"MICROSOFT", "GOOGLE"}, inEvents[0].getData()); + break; + } + } + if (removeEvents != null) { + removeEventCount = removeEventCount + removeEvents.length; + } + eventArrived = true; + } + + }); + + InputHandler stream1 = siddhiAppRuntime.getInputHandler("Stream1"); + InputHandler stream2 = siddhiAppRuntime.getInputHandler("Stream2"); + + siddhiAppRuntime.start(); + + stream1.send(new Object[]{"WSO2", 55.6f, 100}); + Thread.sleep(100); + stream2.send(new Object[]{"IBM", 55.7f, 100}); + Thread.sleep(100); + stream1.send(new Object[]{"ORACLE", 55.6f, 100}); + Thread.sleep(100); + stream1.send(new Object[]{"MICROSOFT", 55.8f, 100}); + Thread.sleep(100); + stream2.send(new Object[]{"GOOGLE", 55.9f, 100}); + Thread.sleep(100); + + Assert.assertEquals("Number of success events", 2, inEventCount); + Assert.assertEquals("Number of remove events", 0, removeEventCount); + Assert.assertEquals("Event arrived", true, eventArrived); + + siddhiAppRuntime.shutdown(); + } + + @Test + public void testQuery31() throws InterruptedException { + log.info("testSequence31 - OUT 0"); + + SiddhiManager siddhiManager = new SiddhiManager(); + + String streams = "" + + "define stream Stream1 (symbol string, price float, volume int); " + + "define stream Stream2 (symbol string, price float, volume int); "; + String query = "" + + "@info(name = 'query1') " + + "from e1=Stream1[price>20], e2=Stream2[price>e1.price] " + + "select e1.symbol as symbol1, e2.symbol as symbol2 " + + "insert into OutputStream ;"; + + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query); + + siddhiAppRuntime.addCallback("query1", new QueryCallback() { + @Override + public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { + EventPrinter.print(timeStamp, inEvents, removeEvents); + if (inEvents != null) { + inEventCount = inEventCount + inEvents.length; + eventArrived = true; + } + if (removeEvents != null) { + removeEventCount = removeEventCount + removeEvents.length; + } + eventArrived = true; + } + + }); + + InputHandler stream1 = siddhiAppRuntime.getInputHandler("Stream1"); + InputHandler stream2 = siddhiAppRuntime.getInputHandler("Stream2"); + + siddhiAppRuntime.start(); + + stream1.send(new Object[]{"WSO2", 55.6f, 100}); + Thread.sleep(100); + stream1.send(new Object[]{"GOOG", 57.6f, 100}); + Thread.sleep(100); + stream2.send(new Object[]{"IBM", 65.7f, 100}); + Thread.sleep(100); + + Assert.assertEquals("Number of success events", 0, inEventCount); + Assert.assertEquals("Number of remove events", 0, removeEventCount); + Assert.assertEquals("Event arrived", false, eventArrived); + + siddhiAppRuntime.shutdown(); + } + @Test public void testTimeBatchAndSequence() throws Exception { log.info("testTimeBatchAndSequence OUT 1");