Skip to content

Commit

Permalink
[DROOLS-416] Queries do not work with events
Browse files Browse the repository at this point in the history
  • Loading branch information
sotty committed Feb 7, 2014
1 parent d32e3d1 commit 8eb71d6
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3205,4 +3205,45 @@ public void testQueryWithEvalAndTypeBoxingUnboxing() {
assertEquals( Arrays.asList( 178, 178, 178 ), list );

}


@Test
public void testQueryWithEvents() {
String drl = "global java.util.List list; " +
"" +
"declare Inner\n" +
" @role(event)\n" +
"end\n" +

"rule \"Input\"\n" +
"when\n" +
"then\n" +
" insert( \"X\" );\n" +
" insert( new Inner( ) );\n" +
"end\n" +
"\n" +
"query myAgg( )\n" +
" Inner( )\n" +
"end\n" +
"\n" +
"rule \"React\"\n" +
"when\n" +
" String()\n" +
" myAgg( )\n" +
"then\n" +
" list.add( 42 );\n" +
"end";

KnowledgeBase knowledgeBase = loadKnowledgeBaseFromString( drl );
StatefulKnowledgeSession knowledgeSession = knowledgeBase.newStatefulKnowledgeSession();
ArrayList list = new ArrayList();
knowledgeSession.setGlobal( "list", list );

knowledgeSession.fireAllRules();

assertEquals( Arrays.asList( 42 ), list );

}


}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.drools.core.marshalling.impl.ProtobufMessages.ActionQueue.Action;
import org.drools.core.marshalling.impl.ProtobufMessages.ActionQueue.Assert;
import org.drools.core.phreak.RuleAgendaItem;
import org.drools.core.phreak.RuleExecutor;
import org.drools.core.phreak.SegmentUtilities;
import org.drools.core.phreak.StackEntry;
import org.drools.core.reteoo.EntryPointNode;
Expand Down Expand Up @@ -538,20 +539,13 @@ protected BaseNode[] evalQuery(String queryName, DroolsQuery queryObject, Intern
lsmem = SegmentUtilities.createSegmentMemory(lts, this);
}

// TODO this is OTT, it shouldn't need to do this for ALL rules, just those rules that event stream inputs (mdp)
LeftInputAdapterNode.doInsertObject( handle, pCtx, lian, this, lmem, false, queryObject.isOpen() );

RuleBaseConfiguration conf = this.ruleBase.getConfiguration();
if( conf.isPhreakEnabled() && conf.getEventProcessingMode().equals(EventProcessingOption.STREAM) ) {
lmem.linkNode(this);
List<PathMemory> pmems = lmem.getSegmentMemory().getPathMemories();
PathMemory pmm = pmems!=null && !pmems.isEmpty() ? pmems.get(0) : null;
if( pmm != null && pmm.getRuleAgendaItem() != null ) {
RuleAgendaItem item = pmm.getRuleAgendaItem();
item.getRuleExecutor().reEvaluateNetwork( this, new org.drools.core.util.LinkedList<StackEntry>(), false);
}
if( conf.isPhreakEnabled() && lmem.getSegmentMemory().getTupleQueue() != null ) {
RuleExecutor.flushTupleQueue( lmem.getSegmentMemory().getTupleQueue() );
}

LeftInputAdapterNode.doInsertObject( handle, pCtx, lian, this, lmem, false, queryObject.isOpen() );


List<PathMemory> pmems = lmem.getSegmentMemory().getPathMemories();
for ( int i = 0, length = pmems.size(); i < length; i++ ) {
PathMemory rm = pmems.get( i );
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.drools.core.phreak;

import org.drools.core.RuleBaseConfiguration;
import org.drools.core.base.DroolsQuery;
import org.drools.core.base.extractors.ArrayElementReader;
import org.drools.core.common.InternalFactHandle;
Expand All @@ -9,14 +10,14 @@
import org.drools.core.reteoo.LeftInputAdapterNode.LiaNodeMemory;
import org.drools.core.reteoo.LeftTuple;
import org.drools.core.reteoo.LeftTupleSink;
import org.drools.core.reteoo.ObjectTypeConf;
import org.drools.core.reteoo.ObjectTypeNode;
import org.drools.core.reteoo.QueryElementNode;
import org.drools.core.reteoo.QueryElementNode.QueryElementNodeMemory;
import org.drools.core.reteoo.QueryElementNode.UnificationNodeViewChangedEventListener;
import org.drools.core.reteoo.ReteooRuleBase;
import org.drools.core.reteoo.SegmentMemory;
import org.drools.core.rule.Declaration;
import org.drools.core.spi.PropagationContext;
import org.kie.api.conf.EventProcessingOption;
import org.kie.api.runtime.rule.Variable;

public class PhreakQueryNode {
Expand Down Expand Up @@ -68,6 +69,8 @@ public void doLeftInserts(QueryElementNode queryNode,
LiaNodeMemory lm = (LiaNodeMemory) qmem.getQuerySegmentMemory().getNodeMemories().get(0);
LeftInputAdapterNode.doInsertObject(handle, pCtx, lian, wm, lm, false, dquery.isOpen());

flushTupleQuery( lm, wm );

leftTuple.clearStaged();
leftTuple = next;
}
Expand Down Expand Up @@ -137,12 +140,14 @@ public void doLeftUpdates(QueryElementNode queryNode,
if (dquery.isOpen()) {
LeftTuple childLeftTuple = fh.getFirstLeftTuple(); // there is only one, all other LTs are peers
LeftInputAdapterNode.doUpdateObject(childLeftTuple, childLeftTuple.getPropagationContext(), wm, lian, false, lmem, qmem.getQuerySegmentMemory());
flushTupleQuery( lmem, wm );
} else {
if (fh.getFirstLeftTuple() != null) {
throw new RuntimeException("defensive programming while testing"); // @TODO remove later (mdp)
}
LiaNodeMemory lm = (LiaNodeMemory) qmem.getQuerySegmentMemory().getNodeMemories().get(0);
LeftInputAdapterNode.doInsertObject(fh, leftTuple.getPropagationContext(), lian, wm, lm, false, dquery.isOpen());
flushTupleQuery( lm, wm );
}


Expand All @@ -166,15 +171,25 @@ public void doLeftDeletes(QueryElementNodeMemory qmem,
LiaNodeMemory lm = (LiaNodeMemory) qmem.getQuerySegmentMemory().getNodeMemories().get(0);
LeftTuple childLeftTuple = fh.getFirstLeftTuple(); // there is only one, all other LTs are peers
LeftInputAdapterNode.doDeleteObject(childLeftTuple, childLeftTuple.getPropagationContext(), qmem.getQuerySegmentMemory(), wm, lian, false, lm);
flushTupleQuery( lm, wm );
} else {
LeftTuple childLeftTuple = leftTuple.getFirstChild();
while (childLeftTuple != null) {
childLeftTuple = RuleNetworkEvaluator.deleteLeftChild(childLeftTuple, trgLeftTuples, stagedLeftTuples);
LiaNodeMemory lm = (LiaNodeMemory) qmem.getQuerySegmentMemory().getNodeMemories().get(0);
flushTupleQuery( lm, wm );
}
}

leftTuple.clearStaged();
leftTuple = next;
}
}

public void flushTupleQuery( LiaNodeMemory lm, InternalWorkingMemory wm ) {
if ( lm.getSegmentMemory().getTupleQueue() != null ) {
RuleExecutor.flushTupleQueue( lm.getSegmentMemory().getTupleQueue() );
}
}

}
23 changes: 16 additions & 7 deletions drools-core/src/main/java/org/drools/core/phreak/RuleExecutor.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.drools.core.phreak;

import java.util.Comparator;
import java.util.Queue;

import org.drools.core.base.SalienceInteger;
import org.drools.core.common.AgendaItem;
Expand Down Expand Up @@ -190,7 +191,7 @@ public synchronized void reEvaluateNetwork(InternalWorkingMemory wm, LinkedList<
boolean evaled = false;
if (pmem.getTupleQueue() != null) {
while (!pmem.getTupleQueue().isEmpty()) {
removeQueuedTupleEntry();
removeQueuedTupleEntry( pmem.getTupleQueue() );
NETWORK_EVALUATOR.evaluateNetwork(pmem, outerStack, this, wm);
evaled = true;
}
Expand All @@ -202,14 +203,22 @@ public synchronized void reEvaluateNetwork(InternalWorkingMemory wm, LinkedList<
}
}

private void removeQueuedTupleEntry() {
TupleEntry tupleEntry = pmem.getTupleQueue().remove();
public static void flushTupleQueue( Queue<TupleEntry> tupleQueue ) {
if ( tupleQueue != null ) {
while ( ! tupleQueue.isEmpty() ) {
removeQueuedTupleEntry( tupleQueue );
}
}
}

public static void removeQueuedTupleEntry( Queue<TupleEntry> tupleQueue ) {
TupleEntry tupleEntry = tupleQueue.remove();
PropagationContext originalPctx = tupleEntry.getPropagationContext();

boolean repeat = true;
while (repeat) {
if (log.isTraceEnabled()) {
log.trace("Stream removed entry {} {} size {}", System.identityHashCode(pmem.getTupleQueue()), tupleEntry, pmem.getTupleQueue().size());
log.trace("Stream removed entry {} {} size {}", System.identityHashCode(tupleQueue), tupleEntry, tupleQueue.size());
}
if (tupleEntry.getLeftTuple() != null) {
SegmentMemory sm = tupleEntry.getNodeMemory().getSegmentMemory();
Expand Down Expand Up @@ -247,8 +256,8 @@ private void removeQueuedTupleEntry() {
break;
}
}
if (!pmem.getTupleQueue().isEmpty()) {
tupleEntry = pmem.getTupleQueue().peek();
if (!tupleQueue.isEmpty()) {
tupleEntry = tupleQueue.peek();
PropagationContext pctx = tupleEntry.getPropagationContext();

// repeat if either the pctx number is the same, or the event time is the same or before
Expand All @@ -273,7 +282,7 @@ private void removeQueuedTupleEntry() {
repeat = false;
}
if (repeat) {
tupleEntry = pmem.getTupleQueue().remove();
tupleEntry = tupleQueue.remove();
}
}
}
Expand Down

0 comments on commit 8eb71d6

Please sign in to comment.