From 98ba6949ff3b5aad04941b86613843d062f21bdc Mon Sep 17 00:00:00 2001 From: Janne Valkealahti Date: Fri, 3 May 2019 14:14:14 +0100 Subject: [PATCH] Fix concurrency in DefaultStateMachineExecutor - Change deferList from LinkedList to ConcurrentLinkedQueue to fix ConcurrentModificationException with parallel events. - Fixes #736 --- .../support/DefaultStateMachineExecutor.java | 9 +- .../statemachine/EventDeferTests.java | 99 ++++++++++++++++--- 2 files changed, 92 insertions(+), 16 deletions(-) diff --git a/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/DefaultStateMachineExecutor.java b/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/DefaultStateMachineExecutor.java index d6bb055e5..d0ab835e0 100644 --- a/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/DefaultStateMachineExecutor.java +++ b/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/DefaultStateMachineExecutor.java @@ -20,9 +20,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; +import java.util.Iterator; import java.util.List; -import java.util.ListIterator; import java.util.Map; import java.util.Map.Entry; import java.util.Queue; @@ -72,7 +71,7 @@ public class DefaultStateMachineExecutor extends LifecycleObjectSupport im private final Queue> eventQueue = new ConcurrentLinkedQueue>(); - private final LinkedList> deferList = new LinkedList>(); + private final Queue> deferList = new ConcurrentLinkedQueue>(); private final Queue triggerQueue = new ConcurrentLinkedQueue(); @@ -155,7 +154,7 @@ public void queueDeferredEvent(Message message) { if (log.isDebugEnabled()) { log.debug("Deferring message " + message); } - deferList.addLast(message); + deferList.add(message); } @Override @@ -480,7 +479,7 @@ private synchronized boolean processDeferList() { if (log.isDebugEnabled()) { log.debug("Process defer list, size=" + deferList.size()); } - ListIterator> iterator = deferList.listIterator(); + Iterator> iterator = deferList.iterator(); State currentState = stateMachine.getState(); while (iterator.hasNext()) { Message event = iterator.next(); diff --git a/spring-statemachine-core/src/test/java/org/springframework/statemachine/EventDeferTests.java b/spring-statemachine-core/src/test/java/org/springframework/statemachine/EventDeferTests.java index 94e25a7c6..89988bd8e 100644 --- a/spring-statemachine-core/src/test/java/org/springframework/statemachine/EventDeferTests.java +++ b/spring-statemachine-core/src/test/java/org/springframework/statemachine/EventDeferTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2015 the original author or authors. + * Copyright 2015-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,14 +15,17 @@ */ package org.springframework.statemachine; +import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; -import java.util.List; +import java.util.Collection; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; import org.springframework.context.annotation.AnnotationConfigApplicationContext; @@ -56,7 +59,7 @@ public void testDeferWithFlat() throws Exception { machine.sendEvent("E3"); machine.sendEvent("E1"); Object executor = TestUtils.readField("stateMachineExecutor", machine); - List readField = TestUtils.readField("deferList", executor); + Collection readField = TestUtils.readField("deferList", executor); assertThat(readField.size(), is(1)); machine.sendEvent("E2"); assertThat(readField.size(), is(2)); @@ -81,12 +84,60 @@ public void testDeferWithFlatThreadExecutor() throws Exception { machine.sendEvent("E1"); machine.sendEvent("E1"); Object executor = TestUtils.readField("stateMachineExecutor", machine); - List readField = TestUtils.readField("deferList", executor); + Collection readField = TestUtils.readField("deferList", executor); assertThat(readField.size(), is(2)); machine.sendEvent("E2"); assertThat(readField.size(), is(3)); } + @Test + public void testDeferSmokeExecutorConcurrentModification() throws Exception { + context.register(Config5.class); + context.refresh(); + + @SuppressWarnings("unchecked") + StateMachine machine = context.getBean(StateMachineSystemConstants.DEFAULT_ID_STATEMACHINE, StateMachine.class); + machine.start(); + assertThat(machine.getState().getIds(), contains("S1")); + + machine.sendEvent("E2"); + + Object executor = TestUtils.readField("stateMachineExecutor", machine); + Collection readField = TestUtils.readField("deferList", executor); + assertThat(readField.size(), is(1)); + + AtomicReference error = new AtomicReference<>(); + AtomicInteger i1 = new AtomicInteger(); + Thread t1 = new Thread(() -> { + while(i1.incrementAndGet() < 1000) { + try { + machine.sendEvent("E1"); + machine.sendEvent("E2"); + } catch (Exception e) { + error.set(e); + break; + } + } + }); + AtomicInteger i2 = new AtomicInteger(); + Thread t2 = new Thread(() -> { + while(i2.incrementAndGet() < 1000) { + try { + machine.sendEvent("E1"); + machine.sendEvent("E2"); + } catch (Exception e) { + error.set(e); + break; + } + } + }); + t1.start(); + t2.start(); + t1.join(); + t2.join(); + assertThat(error.get(), nullValue()); + } + @Test public void testDeferWithSubsSyncExecutor() throws Exception { context.register(Config1.class); @@ -108,7 +159,7 @@ public void testDeferWithSubsSyncExecutor() throws Exception { machine.sendEvent("E1"); Object executor = TestUtils.readField("stateMachineExecutor", machine); - List readField = TestUtils.readField("deferList", executor); + Collection readField = TestUtils.readField("deferList", executor); assertThat(readField.size(), is(1)); listener.reset(0, 0, 2, 0); @@ -142,7 +193,7 @@ public void testDeferWithSubsThreadExecutor() throws Exception { machine.sendEvent("E1"); Object executor = TestUtils.readField("stateMachineExecutor", machine); - List readField = TestUtils.readField("deferList", executor); + Collection readField = TestUtils.readField("deferList", executor); assertThat(readField.size(), is(2)); listener.reset(0, 0, 3, 0); @@ -203,7 +254,7 @@ public void testSubNotDeferOverrideSuperTransition() throws Exception { // sub doesn't defer machine.sendEvent("E15"); Object executor = TestUtils.readField("stateMachineExecutor", machine); - List readField = TestUtils.readField("deferList", executor); + Collection readField = TestUtils.readField("deferList", executor); assertThat(readField.size(), is(0)); assertThat(machine.getState().getIds(), contains("SUB5")); @@ -229,7 +280,7 @@ public void testSubDeferOverrideSuperTransition() throws Exception { // sub defers machine.sendEvent("E15"); Object executor = TestUtils.readField("stateMachineExecutor", machine); - List readField = TestUtils.readField("deferList", executor); + Collection readField = TestUtils.readField("deferList", executor); assertThat(readField.size(), is(1)); assertThat(machine.getState().getIds(), contains("SUB1", "SUB12")); @@ -260,7 +311,7 @@ public void testRegionOneDeferTransition() throws Exception { // regions defers machine.sendEvent("E3"); Object executor = TestUtils.readField("stateMachineExecutor", machine); - List readField = TestUtils.readField("deferList", executor); + Collection readField = TestUtils.readField("deferList", executor); assertThat(readField.size(), is(0)); } @@ -287,7 +338,7 @@ public void testRegionAllDeferTransition() throws Exception { // regions defers machine.sendEvent("E3"); Object executor = TestUtils.readField("stateMachineExecutor", machine); - List readField = TestUtils.readField("deferList", executor); + Collection readField = TestUtils.readField("deferList", executor); assertThat(readField.size(), is(1)); } @@ -308,7 +359,7 @@ public void testRegionNotDeferTransition() throws Exception { // regions doesn't defer machine.sendEvent("E3"); Object executor = TestUtils.readField("stateMachineExecutor", machine); - List readField = TestUtils.readField("deferList", executor); + Collection readField = TestUtils.readField("deferList", executor); assertThat(readField.size(), is(0)); assertThat(machine.getState().getIds(), contains("SUB2")); @@ -542,6 +593,32 @@ public void configure(StateMachineTransitionConfigurer transitio } + @Configuration + @EnableStateMachine + static class Config5 extends StateMachineConfigurerAdapter { + + @Override + public void configure(StateMachineStateConfigurer states) throws Exception { + states + .withStates() + .initial("S1") + .state("S1", "E2") + .state("S2"); + } + + @Override + public void configure(StateMachineTransitionConfigurer transitions) throws Exception { + transitions + .withExternal() + .source("S1").target("S2") + .event("E1") + .and() + .withExternal() + .source("S2").target("S1") + .event("E2"); + } + } + @Configuration static class ExecutorConfig {