Skip to content

Commit

Permalink
[DROOLS-1285] avoid forced flushing of unlinked paths in stream mode (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
mariofusco committed Sep 15, 2016
1 parent 463112e commit e6da071
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 21 deletions.
@@ -0,0 +1,65 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.drools.compiler.integrationtests;

import org.junit.Test;
import org.kie.api.conf.EventProcessingOption;
import org.kie.api.definition.type.Role;
import org.kie.api.io.ResourceType;
import org.kie.api.runtime.KieSession;
import org.kie.api.runtime.rule.FactHandle;
import org.kie.internal.utils.KieHelper;

public class SubnetworkTest {

@Test
public void testNPEOnFlushingOfUnlinkedPmem() {
// DROOLS-1285
String drl =
"import " + A.class.getCanonicalName() + "\n" +
"import " + B.class.getCanonicalName() + "\n" +
"import " + C.class.getCanonicalName() + "\n" +
"rule R1 when\n" +
" A()\n" +
" B()\n" +
" not( B() and C() )\n" +
"then end\n";

KieSession kSession = new KieHelper().addContent( drl, ResourceType.DRL )
.build( EventProcessingOption.STREAM )
.newKieSession();

FactHandle fhA = kSession.insert( new A() );
kSession.insert(new C());
kSession.fireAllRules();

kSession.delete( fhA );

kSession.insert(new A());
kSession.insert(new B());
kSession.fireAllRules();
}

@Role(Role.Type.EVENT)
public static class A { }

@Role(Role.Type.EVENT)
public static class B { }

@Role(Role.Type.EVENT)
public static class C { }
}
Expand Up @@ -74,6 +74,8 @@ public class PhreakPropagationContext
// the deserialization of a session
private transient MarshallerReaderContext readerContext;

private transient boolean marshalling;

public PhreakPropagationContext() {

}
Expand Down Expand Up @@ -373,8 +375,15 @@ public MarshallerReaderContext getReaderContext() {
return this.readerContext;
}

public boolean isMarshalling() {
return marshalling;
}

public void setMarshalling( boolean marshalling ) {
this.marshalling = marshalling;
}

public static String intEnumToString(PropagationContext pctx) {
public static String intEnumToString( PropagationContext pctx ) {
String pctxType = null;
switch( pctx.getType() ) {
case PropagationContext.INSERTION:
Expand Down
43 changes: 26 additions & 17 deletions drools-core/src/main/java/org/drools/core/phreak/AddRemoveRule.java
Expand Up @@ -614,28 +614,13 @@ private static void flushStagedTuples(LeftTupleNode splitStartNode, PathMemory p

public static boolean flushLeftTupleIfNecessary(InternalWorkingMemory wm, SegmentMemory sm, LeftTuple leftTuple, boolean streamMode) {
PathMemory pmem = streamMode ?
sm.getPathMemories().get(0) :
sm.getFirstDataDrivenPathMemory();
getPathMemoryToFlushInStreamMode( sm, leftTuple ) :
getPathMemoryToFlushForEagerEvaluation( sm, leftTuple );

if ( pmem == null ) {
return false;
}

if ( !streamMode && leftTuple == null && !pmem.isRuleLinked() ) {
pmem = null;
List<PathMemory> dataDrivenPmems = sm.getDataDrivenPathMemories();
// skip the first, we already know it isn't linked
for (int i = 1; i < dataDrivenPmems.size(); i++) {
if (dataDrivenPmems.get(i).isRuleLinked()) {
pmem = dataDrivenPmems.get(i);
break;
}
}
if ( pmem == null ) {
return false;
}
}

TupleSets<LeftTuple> leftTupleSets = new TupleSetsImpl<LeftTuple>();
if (leftTuple != null) {
leftTupleSets.addInsert(leftTuple);
Expand All @@ -660,6 +645,30 @@ public static boolean flushLeftTupleIfNecessary(InternalWorkingMemory wm, Segmen
return true;
}

private static PathMemory getPathMemoryToFlushForEagerEvaluation( SegmentMemory sm, LeftTuple leftTuple ) {
PathMemory pmem = sm.getFirstDataDrivenPathMemory();
if ( leftTuple == null && pmem != null && !pmem.isRuleLinked() ) {
// skip the first, we already know it isn't linked
pmem = getFirstLinkedPathMemory( sm, sm.getDataDrivenPathMemories(), 1 );
}
return pmem;
}

private static PathMemory getPathMemoryToFlushInStreamMode( SegmentMemory sm, LeftTuple leftTuple ) {
return leftTuple != null && leftTuple.getPropagationContext().isMarshalling() ?
sm.getPathMemories().get(0) : // marshalling requires flushing even if the pmem is unlinked
getFirstLinkedPathMemory( sm, sm.getPathMemories(), 0 );
}

private static PathMemory getFirstLinkedPathMemory( SegmentMemory sm, List<PathMemory> pmems, int start ) {
for (int i = start; i < pmems.size(); i++) {
if (pmems.get(i).isRuleLinked()) {
return pmems.get(i);
}
}
return null;
}

public static void forceFlushLeftTuple(PathMemory pmem, SegmentMemory sm, InternalWorkingMemory wm, TupleSets<LeftTuple> leftTupleSets) {
SegmentMemory[] smems = pmem.getSegmentMemories();

Expand Down
Expand Up @@ -35,6 +35,7 @@
public interface PropagationEntry {

void execute(InternalWorkingMemory wm);
void executeForMarshalling(InternalWorkingMemory wm);
void execute(InternalKnowledgeRuntime kruntime);

PropagationEntry getNext();
Expand Down Expand Up @@ -76,6 +77,11 @@ public boolean isCalledFromRHS() {
public void execute(InternalKnowledgeRuntime kruntime) {
execute( ((InternalWorkingMemoryEntryPoint) kruntime).getInternalWorkingMemory() );
}

@Override
public void executeForMarshalling(InternalWorkingMemory wm) {
execute( wm );
}
}

abstract class PropagationEntryWithResult<T> extends PropagationEntry.AbstractPropagationEntry {
Expand Down Expand Up @@ -122,7 +128,13 @@ public Insert( InternalFactHandle handle, PropagationContext context, InternalWo
this.insertionTime = isEvent ? workingMemory.getTimerService().getCurrentTime() : 0L;
}

public void execute(InternalWorkingMemory wm) {
@Override
public void executeForMarshalling( InternalWorkingMemory wm ) {
context.setMarshalling( true );
execute(wm);
}

public void execute( InternalWorkingMemory wm ) {
for ( ObjectTypeNode otn : objectTypeConf.getObjectTypeNodes() ) {
otn.propagateAssert( handle, context, wm );
if (isEvent) {
Expand Down
Expand Up @@ -108,7 +108,7 @@ public synchronized void flushNonMarshallable() {
}
newTail = entry;
} else {
entry.execute(workingMemory);
entry.executeForMarshalling(workingMemory);
}
}
head = newHead;
Expand Down
Expand Up @@ -84,4 +84,7 @@ public interface PropagationContext
void cleanReaderContext();

void setEntryPoint(EntryPointId entryPoint);

boolean isMarshalling();
void setMarshalling( boolean marshalling );
}
Expand Up @@ -77,10 +77,12 @@ public class RetePropagationContext

private ObjectType objectType;

// this field is only set for propagations happening during
// this field is only set for propagations happening during
// the deserialization of a session
private transient MarshallerReaderContext readerContext;

private transient boolean marshalling;

public RetePropagationContext() {

}
Expand Down Expand Up @@ -429,6 +431,13 @@ public MarshallerReaderContext getReaderContext() {
return this.readerContext;
}

public boolean isMarshalling() {
return marshalling;
}

public void setMarshalling( boolean marshalling ) {
this.marshalling = marshalling;
}

public static String intEnumToString(PropagationContext pctx) {
String pctxType = null;
Expand Down

0 comments on commit e6da071

Please sign in to comment.