Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sequence and pattern issue with every and within #436

Merged
merged 1 commit into from
Jul 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ protected void process(StateEvent stateEvent, ComplexEventChunk complexEventChun
break;
case OR:
super.process(stateEvent, complexEventChunk);
if (partnerPostStateProcessor.nextProcessor != null && thisStatePreProcessor.thisLastProcessor ==
partnerPostStateProcessor) {
// 'from A or B select' scenario require this
partnerPostStateProcessor.isEventReturned = true;
}
break;
case NOT:
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,30 @@ public PreStateProcessor cloneProcessor(String key) {

@Override
public void addState(StateEvent stateEvent) {
if (newAndEveryStateEventList.isEmpty()) {
if (isStartState || stateType == StateInputStream.Type.SEQUENCE) {
if (newAndEveryStateEventList.isEmpty()) {
newAndEveryStateEventList.add(stateEvent);
}
if (partnerStatePreProcessor != null && partnerStatePreProcessor.newAndEveryStateEventList.isEmpty()) {
partnerStatePreProcessor.newAndEveryStateEventList.add(stateEvent);
}
} else {
newAndEveryStateEventList.add(stateEvent);
if (partnerStatePreProcessor != null) {
partnerStatePreProcessor.newAndEveryStateEventList.add(stateEvent);
}
}
if (partnerStatePreProcessor != null && partnerStatePreProcessor.newAndEveryStateEventList.isEmpty()) {
partnerStatePreProcessor.newAndEveryStateEventList.add(stateEvent);
}

}

@Override
public void addEveryState(StateEvent stateEvent) {
newAndEveryStateEventList.add(stateEventCloner.copyStateEvent(stateEvent));
StateEvent clonedEvent = stateEventCloner.copyStateEvent(stateEvent);
clonedEvent.setEvent(stateId, null);
newAndEveryStateEventList.add(clonedEvent);
if (partnerStatePreProcessor != null) {
partnerStatePreProcessor.newAndEveryStateEventList.add(clonedEvent);
}
}

public void setStartState(boolean isStartState) {
Expand All @@ -82,13 +95,21 @@ public void setStartState(boolean isStartState) {

@Override
public void resetState() {
pendingStateEventList.clear();
partnerStatePreProcessor.pendingStateEventList.clear();

if (isStartState && newAndEveryStateEventList.isEmpty()) {
// if (isStartState && stateType == StateInputStream.Type.SEQUENCE && newAndEveryStateEventList
// .isEmpty()) {
init();
if (logicalType == LogicalStateElement.Type.OR || pendingStateEventList.size() ==
partnerStatePreProcessor.pendingStateEventList.size()) {
pendingStateEventList.clear();
partnerStatePreProcessor.pendingStateEventList.clear();

if (isStartState && newAndEveryStateEventList.isEmpty()) {
// if (isStartState && stateType == StateInputStream.Type.SEQUENCE && newAndEveryStateEventList
Copy link

@ruwanta ruwanta Sep 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better remove this commented line.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi,
Thanks for pointing this. Please merger the PR #566

Thanks

// .isEmpty()) {
if (stateType == StateInputStream.Type.SEQUENCE && thisStatePostProcessor.nextEveryStatePerProcessor ==
null && !((StreamPreStateProcessor) thisStatePostProcessor.nextStatePerProcessor)
.pendingStateEventList.isEmpty()) {
return;
}
init();
}
}
}

Expand All @@ -108,6 +129,12 @@ public ComplexEventChunk<StateEvent> processAndReturn(ComplexEventChunk complexE
StreamEvent streamEvent = (StreamEvent) complexEventChunk.next(); //Sure only one will be sent
for (Iterator<StateEvent> iterator = pendingStateEventList.iterator(); iterator.hasNext(); ) {
StateEvent stateEvent = iterator.next();
if (withinStates.size() > 0) {
if (isExpired(stateEvent, streamEvent)) {
iterator.remove();
continue;
}
}
if (logicalType == LogicalStateElement.Type.OR && stateEvent.getStreamEvent(partnerStatePreProcessor
.getStateId()) != null) {
iterator.remove();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class StreamPreStateProcessor implements PreStateProcessor, Snapshotable
protected StateEventCloner stateEventCloner;
protected StreamEventPool streamEventPool;
protected String queryName;
private boolean initialized;

public StreamPreStateProcessor(StateInputStream.Type stateType, List<Map.Entry<Long, Set<Integer>>> withinStates) {
this.stateType = stateType;
Expand Down Expand Up @@ -98,7 +99,7 @@ public void process(ComplexEventChunk complexEventChunk) {
"processAndReturn method is used for handling event chunks.");
}

private boolean isExpired(StateEvent pendingStateEvent, StreamEvent incomingStreamEvent) {
protected boolean isExpired(StateEvent pendingStateEvent, StreamEvent incomingStreamEvent) {
for (Map.Entry<Long, Set<Integer>> withinEntry : withinStates) {
for (Integer withinStateId : withinEntry.getValue()) {
if (withinStateId == SiddhiConstants.ANY) {
Expand Down Expand Up @@ -162,9 +163,10 @@ public void setToLast(Processor processor) {
}

public void init() {
if (isStartState) {
if (isStartState && (!initialized || this.thisStatePostProcessor.nextEveryStatePerProcessor != null)) {
StateEvent stateEvent = stateEventPool.borrowEvent();
addState(stateEvent);
initialized = true;
}
}

Expand Down Expand Up @@ -250,6 +252,11 @@ public void resetState() {
if (isStartState && newAndEveryStateEventList.isEmpty()) {
// if (isStartState && stateType == StateInputStream.Type.SEQUENCE && newAndEveryStateEventList
// .isEmpty()) {
if (stateType == StateInputStream.Type.SEQUENCE && thisStatePostProcessor.nextEveryStatePerProcessor ==
null && !((StreamPreStateProcessor) thisStatePostProcessor.nextStatePerProcessor)
.pendingStateEventList.isEmpty()) {
return;
}
init();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,8 @@ private static InnerStateRuntime parse(StateElement stateElement, Map<String, Ab
for (SingleStreamRuntime singleStreamRuntime : innerStateRuntime.getSingleStreamRuntimeList()) {
everyInnerStateRuntime.addStreamRuntime(singleStreamRuntime);
}
if (stateType == StateInputStream.Type.PATTERN) {
everyInnerStateRuntime.getLastProcessor().setNextEveryStatePerProcessor(everyInnerStateRuntime
everyInnerStateRuntime.getLastProcessor().setNextEveryStatePerProcessor(everyInnerStateRuntime
.getFirstProcessor());
}
return everyInnerStateRuntime;

} else if (stateElement instanceof LogicalStateElement) {
Expand All @@ -262,12 +260,12 @@ private static InnerStateRuntime parse(StateElement stateElement, Map<String, Ab
}

LogicalPreStateProcessor logicalPreStateProcessor1 = new LogicalPreStateProcessor(type, stateType,
withinStates);
clonewithinStates(withinStates));
logicalPreStateProcessor1.init(siddhiAppContext, queryName);
LogicalPostStateProcessor logicalPostStateProcessor1 = new LogicalPostStateProcessor(type);

LogicalPreStateProcessor logicalPreStateProcessor2 = new LogicalPreStateProcessor(type, stateType,
withinStates);
clonewithinStates(withinStates));
logicalPreStateProcessor2.init(siddhiAppContext, queryName);
LogicalPostStateProcessor logicalPostStateProcessor2 = new LogicalPostStateProcessor(type);

Expand Down
Loading