Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Action is not parallel #336

Open
924060929 opened this issue Mar 23, 2017 · 4 comments
Open

Action is not parallel #336

924060929 opened this issue Mar 23, 2017 · 4 comments

Comments

@924060929
Copy link

924060929 commented Mar 23, 2017

Version: 1.2.3.RELEASE

The test code:

public static void main(String[] args) throws Throwable {
        StateMachineBuilder.Builder<String, String> builder = StateMachineBuilder.builder();

        StaticListableBeanFactory beanFactory = new StaticListableBeanFactory();

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(30);
        executor.setKeepAliveSeconds(30000);
        executor.setMaxPoolSize(30);
        executor.setQueueCapacity(3000);
        executor.initialize();

        builder.configureConfiguration()
            .withConfiguration()
                .beanFactory(beanFactory)
                .taskExecutor(executor)
                .taskScheduler(new ConcurrentTaskScheduler())
                .listener(new StateMachineListenerAdapter<String, String>() {
                    @Override
                    public void stateEntered(State<String, String> state) {
                        String id = state == null ? null : state.getId();
                        System.out.println("entered " + id);
                    }

                    @Override
                    public void stateExited(State<String, String> state) {
                        String id = state == null ? null : state.getId();
                        System.out.println("exited " + id);
                    }
                });

        StateMachineStateConfigurer<String, String> smsConfigurer = builder.configureStates();
        smsConfigurer.withStates()
            .initial("READY")
            .fork("FORK")
            .state("TASKS")
            .join("JOIN")
            .choice("CHOICE")
            .state("ERROR")
            .and()
            .withStates()
            .parent("TASKS")
            .initial("T1")
            .end("T1E")
            .and()
            .withStates()
            .parent("TASKS")
            .initial("T2")
            .end("T2E");


        StateMachineTransitionConfigurer<String, String> smtConfigurer = builder.configureTransitions();
        smtConfigurer.withExternal()
            .source("READY").target("FORK")
            .and()
            .withFork()
            .source("FORK").target("TASKS")
            .and()
            .withJoin()
            .source("TASKS").target("JOIN")
            .and()
            .withExternal()
            .source("T1").target("T1E")
            .action(getAction())
            .and()
            .withExternal()
            .source("T2").target("T2E")
            .action(getAction())
            .and()
            .withExternal()
            .source("JOIN").target("CHOICE")
            .and()
            .withChoice()
            .source("CHOICE")
            .first("ERROR", c -> true)
            .last("READY");

        StateMachine<String, String> stateMachine = builder.build();

        stateMachine.start();
    }

    public static Action<String, String> getAction() {
        return c -> {
            System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>..");
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                ;
            }
            System.out.println("<<<<<<<<<<<<<<<<<<<<<<<<<<..");
        };
    }

The Console log:

entered READY
exited READY
entered TASKS
entered T1
>>>>>>>>>>>>>>>>>>>>>>>>>>..
entered T2
<<<<<<<<<<<<<<<<<<<<<<<<<<..
exited T1
entered T1E
>>>>>>>>>>>>>>>>>>>>>>>>>>..
<<<<<<<<<<<<<<<<<<<<<<<<<<..
exited T2
entered T2E
exited TASKS
entered ERROR

If parallel, the log would like this:

>>>>>>>>>>>>>>>>>>>>>>>>>>..
>>>>>>>>>>>>>>>>>>>>>>>>>>..
<<<<<<<<<<<<<<<<<<<<<<<<<<..
<<<<<<<<<<<<<<<<<<<<<<<<<<..
@jvalkeal
Copy link
Contributor

Give me some time to play with this sample. Clearly something is happening parallel as otherwise T2 would not be entered before action with T1 is fully executed. But true, I'd also expect a little different logging from actions.

@924060929
Copy link
Author

924060929 commented Mar 26, 2017

@jvalkeal
Today, I debug the source code and found this:
org.springframework.statemachine.config.AbstractStateMachineFactory

private Collection<TransitionData<S, E>> resolveTransitionData(Collection<TransitionData<S, E>> in, Collection<StateData<S, E>> stateDatas) {
    ArrayList<TransitionData<S, E>> out = new ArrayList<TransitionData<S,E>>();

    Collection<Object> states = new ArrayList<Object>();
    for (StateData<S, E> stateData : stateDatas) {
        states.add(stateData.getParent());
    }

    for (TransitionData<S, E> transitionData : in) {
        S state = transitionData.getState(); // core code
        if (state != null && states.contains(state)) { // core code
            out.add(transitionData);
        }
    }

    return out;
}

So I added state() after withExternal(), now code as follow:

smtConfigurer
    .withExternal()
        .source("READY").target("FORK")
        .and()
    .withFork()
        .source("FORK").target("TASKS")
        .and()
    .withJoin()
        .source("TASKS").target("JOIN")
        .and()
    .withExternal()
        .state("TASKS") // add this line to set parent state
        .source("T1").target("T1E")
        .action(getAction())
        .and()
    .withExternal()
        .state("TASKS") // add this line to set parent state
        .source("T2").target("T2E")
        .action(getAction())
        .and()
    .withExternal()
        .source("JOIN").target("CHOICE")
        .and()
    .withChoice()
        .source("CHOICE")
        .first("ERROR", c -> true)
        .last("READY");

Finally, console log changed:

entered READY
exited READY
entered TASKS
entered T2
entered T1
>>>>>>>>>>>>>>>>>>>>>>>>>>..
>>>>>>>>>>>>>>>>>>>>>>>>>>..
<<<<<<<<<<<<<<<<<<<<<<<<<<..
<<<<<<<<<<<<<<<<<<<<<<<<<<..
exited T2
exited T1
entered T2E
entered T1E
exited TASKS
entered SUCCESS

Cheers, Action become pallallel!
I suggest you add notice to reference otherwise somebody would meet trouble like me~~

@jvalkeal
Copy link
Contributor

I ran the code and indeed it's just like you mentioned. I need to add few tests to better understand this specific behaviour. Thing is that you can add transitions which are potentially triggered by same event in every machine in a s state hierarchy. If deepest submachine state don't accept event then it's checked from parent, etc.

Things here still goes via separate executors in a different submachines but things seem to work differently because you machine is not driven by events and everything happens automatically via anonymous transitions initiated from initial states. All this start to happen when you start a root machine.

I still don't how this behaves so I'll try to something about it.

@924060929
Copy link
Author

924060929 commented Mar 28, 2017

According to resolveTransitionData method, if not invoke state("TASKS") to set parent state is TASKS, the transition will mark as root statechine's transition, after child statemachine start executor and enter initial state, it can not find this statemachine's transition, so this child executor will do nothing and return.

But Root statemachine have all transition, after root executor start child executor, it continue to next loop as follow code, and get next transition(should be child transition), so the action will be executed by root statemachine synchronous.
org.springframework.statemachine.support.DefaultStateMachineExecutor.processTriggerQueue()

if (stateMachine.getState() != null) {
	// loop triggerless transitions here so that
	// all "chained" transitions will get queue message
	boolean transit = false;
	do {
		transit = handleTriggerTrans(triggerlessTransitions, queuedMessage);
	} while (transit);
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants