Skip to content

Commit

Permalink
Fix concurrency in DefaultStateMachineExecutor
Browse files Browse the repository at this point in the history
- Change deferList from LinkedList to ConcurrentLinkedQueue
  to fix ConcurrentModificationException with parallel events.
- Fixes #736
  • Loading branch information
jvalkeal committed May 3, 2019
1 parent 55b7ff0 commit 98ba694
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
Expand Down Expand Up @@ -72,7 +71,7 @@ public class DefaultStateMachineExecutor<S, E> extends LifecycleObjectSupport im

private final Queue<Message<E>> eventQueue = new ConcurrentLinkedQueue<Message<E>>();

private final LinkedList<Message<E>> deferList = new LinkedList<Message<E>>();
private final Queue<Message<E>> deferList = new ConcurrentLinkedQueue<Message<E>>();

private final Queue<TriggerQueueItem> triggerQueue = new ConcurrentLinkedQueue<TriggerQueueItem>();

Expand Down Expand Up @@ -155,7 +154,7 @@ public void queueDeferredEvent(Message<E> message) {
if (log.isDebugEnabled()) {
log.debug("Deferring message " + message);
}
deferList.addLast(message);
deferList.add(message);
}

@Override
Expand Down Expand Up @@ -480,7 +479,7 @@ private synchronized boolean processDeferList() {
if (log.isDebugEnabled()) {
log.debug("Process defer list, size=" + deferList.size());
}
ListIterator<Message<E>> iterator = deferList.listIterator();
Iterator<Message<E>> iterator = deferList.iterator();
State<S,E> currentState = stateMachine.getState();
while (iterator.hasNext()) {
Message<E> event = iterator.next();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015 the original author or authors.
* Copyright 2015-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,14 +15,17 @@
*/
package org.springframework.statemachine;

import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

import java.util.List;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Test;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
Expand Down Expand Up @@ -56,7 +59,7 @@ public void testDeferWithFlat() throws Exception {
machine.sendEvent("E3");
machine.sendEvent("E1");
Object executor = TestUtils.readField("stateMachineExecutor", machine);
List<?> readField = TestUtils.readField("deferList", executor);
Collection<?> readField = TestUtils.readField("deferList", executor);
assertThat(readField.size(), is(1));
machine.sendEvent("E2");
assertThat(readField.size(), is(2));
Expand All @@ -81,12 +84,60 @@ public void testDeferWithFlatThreadExecutor() throws Exception {
machine.sendEvent("E1");
machine.sendEvent("E1");
Object executor = TestUtils.readField("stateMachineExecutor", machine);
List<?> readField = TestUtils.readField("deferList", executor);
Collection<?> readField = TestUtils.readField("deferList", executor);
assertThat(readField.size(), is(2));
machine.sendEvent("E2");
assertThat(readField.size(), is(3));
}

@Test
public void testDeferSmokeExecutorConcurrentModification() throws Exception {
context.register(Config5.class);
context.refresh();

@SuppressWarnings("unchecked")
StateMachine<String, String> machine = context.getBean(StateMachineSystemConstants.DEFAULT_ID_STATEMACHINE, StateMachine.class);
machine.start();
assertThat(machine.getState().getIds(), contains("S1"));

machine.sendEvent("E2");

Object executor = TestUtils.readField("stateMachineExecutor", machine);
Collection<?> readField = TestUtils.readField("deferList", executor);
assertThat(readField.size(), is(1));

AtomicReference<Exception> error = new AtomicReference<>();
AtomicInteger i1 = new AtomicInteger();
Thread t1 = new Thread(() -> {
while(i1.incrementAndGet() < 1000) {
try {
machine.sendEvent("E1");
machine.sendEvent("E2");
} catch (Exception e) {
error.set(e);
break;
}
}
});
AtomicInteger i2 = new AtomicInteger();
Thread t2 = new Thread(() -> {
while(i2.incrementAndGet() < 1000) {
try {
machine.sendEvent("E1");
machine.sendEvent("E2");
} catch (Exception e) {
error.set(e);
break;
}
}
});
t1.start();
t2.start();
t1.join();
t2.join();
assertThat(error.get(), nullValue());
}

@Test
public void testDeferWithSubsSyncExecutor() throws Exception {
context.register(Config1.class);
Expand All @@ -108,7 +159,7 @@ public void testDeferWithSubsSyncExecutor() throws Exception {
machine.sendEvent("E1");

Object executor = TestUtils.readField("stateMachineExecutor", machine);
List<?> readField = TestUtils.readField("deferList", executor);
Collection<?> readField = TestUtils.readField("deferList", executor);
assertThat(readField.size(), is(1));

listener.reset(0, 0, 2, 0);
Expand Down Expand Up @@ -142,7 +193,7 @@ public void testDeferWithSubsThreadExecutor() throws Exception {
machine.sendEvent("E1");

Object executor = TestUtils.readField("stateMachineExecutor", machine);
List<?> readField = TestUtils.readField("deferList", executor);
Collection<?> readField = TestUtils.readField("deferList", executor);
assertThat(readField.size(), is(2));

listener.reset(0, 0, 3, 0);
Expand Down Expand Up @@ -203,7 +254,7 @@ public void testSubNotDeferOverrideSuperTransition() throws Exception {
// sub doesn't defer
machine.sendEvent("E15");
Object executor = TestUtils.readField("stateMachineExecutor", machine);
List<?> readField = TestUtils.readField("deferList", executor);
Collection<?> readField = TestUtils.readField("deferList", executor);
assertThat(readField.size(), is(0));

assertThat(machine.getState().getIds(), contains("SUB5"));
Expand All @@ -229,7 +280,7 @@ public void testSubDeferOverrideSuperTransition() throws Exception {
// sub defers
machine.sendEvent("E15");
Object executor = TestUtils.readField("stateMachineExecutor", machine);
List<?> readField = TestUtils.readField("deferList", executor);
Collection<?> readField = TestUtils.readField("deferList", executor);
assertThat(readField.size(), is(1));

assertThat(machine.getState().getIds(), contains("SUB1", "SUB12"));
Expand Down Expand Up @@ -260,7 +311,7 @@ public void testRegionOneDeferTransition() throws Exception {
// regions defers
machine.sendEvent("E3");
Object executor = TestUtils.readField("stateMachineExecutor", machine);
List<?> readField = TestUtils.readField("deferList", executor);
Collection<?> readField = TestUtils.readField("deferList", executor);
assertThat(readField.size(), is(0));
}

Expand All @@ -287,7 +338,7 @@ public void testRegionAllDeferTransition() throws Exception {
// regions defers
machine.sendEvent("E3");
Object executor = TestUtils.readField("stateMachineExecutor", machine);
List<?> readField = TestUtils.readField("deferList", executor);
Collection<?> readField = TestUtils.readField("deferList", executor);
assertThat(readField.size(), is(1));
}

Expand All @@ -308,7 +359,7 @@ public void testRegionNotDeferTransition() throws Exception {
// regions doesn't defer
machine.sendEvent("E3");
Object executor = TestUtils.readField("stateMachineExecutor", machine);
List<?> readField = TestUtils.readField("deferList", executor);
Collection<?> readField = TestUtils.readField("deferList", executor);
assertThat(readField.size(), is(0));

assertThat(machine.getState().getIds(), contains("SUB2"));
Expand Down Expand Up @@ -542,6 +593,32 @@ public void configure(StateMachineTransitionConfigurer<String, String> transitio

}

@Configuration
@EnableStateMachine
static class Config5 extends StateMachineConfigurerAdapter<String, String> {

@Override
public void configure(StateMachineStateConfigurer<String, String> states) throws Exception {
states
.withStates()
.initial("S1")
.state("S1", "E2")
.state("S2");
}

@Override
public void configure(StateMachineTransitionConfigurer<String, String> transitions) throws Exception {
transitions
.withExternal()
.source("S1").target("S2")
.event("E1")
.and()
.withExternal()
.source("S2").target("S1")
.event("E2");
}
}

@Configuration
static class ExecutorConfig {

Expand Down

0 comments on commit 98ba694

Please sign in to comment.