Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #5023 race condition bug in wf engine #5029

Merged
merged 6 commits into from Jul 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,51 @@
/*
* Copyright 2019 Rundeck, Inc. (http://rundeck.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.dtolabs.rundeck.core.rules;

/**
* Uses a mutable state and rule engine
*/
public interface StateWorkflowSystem
extends WorkflowSystem
{
/**
* state object
*/
MutableStateObj getState();

/**
* Rule engine
*/
RuleEngine getRuleEngine();

/**
* @return true if the state indicates the workflow should end
*/
boolean isWorkflowEndState();

/**
* listener
*/
WorkflowSystemEventListener getListener();

/**
* set listener
*
* @param listener
*/
void setListener(WorkflowSystemEventListener listener);
}
Expand Up @@ -17,7 +17,9 @@
* state, and any that can are queued to be executed. Workflow processing stops when no operations
* are currently running, no new state changes are available, and no pending operations can be run.
*/
public class WorkflowEngine implements WorkflowSystem {
public class WorkflowEngine
implements StateWorkflowSystem, WorkflowSystemEventHandler
{
static Logger logger = Logger.getLogger(WorkflowEngine.class.getName());
private final MutableStateObj state;
private final RuleEngine engine;
Expand Down Expand Up @@ -47,11 +49,13 @@ public WorkflowEngine(
manager = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
}

MutableStateObj getState() {
@Override
public MutableStateObj getState() {
return state;
}

RuleEngine getRuleEngine() {
@Override
public RuleEngine getRuleEngine() {
return engine;
}

Expand Down Expand Up @@ -86,6 +90,7 @@ TimeUnit unit() {
Set<OperationResult<DAT, RES, OP>> processOperations(final Set<OP> operations, final SharedData<DAT> sharedData) {

WorkflowEngineOperationsProcessor<DAT, RES, OP> processor = new WorkflowEngineOperationsProcessor<>(
this,
this,
operations,
sharedData,
Expand Down Expand Up @@ -130,25 +135,6 @@ Set<OperationResult<DAT, RES, OP>> processOperations(final Set<OP> operations, f
return results;
}

void eventLoopProgress(
final int origPendingCount,
final int skipcount,
final int toruncount,
final int pendingcount
)
{
event(
WorkflowSystemEventType.LoopProgress,
String.format(
"Pending(%d) => run(%d), skip(%d), remain(%d)",
origPendingCount,
toruncount - skipcount,
skipcount,
pendingcount
)
);
}

static class Event implements WorkflowSystemEvent {
private WorkflowSystemEventType eventType;
private String message;
Expand Down Expand Up @@ -211,40 +197,43 @@ public T getResult() {
};
}

void event(final WorkflowSystemEventType endOfChanges, final String message) {
event(endOfChanges, message, null);
@Override
public void event(final WorkflowSystemEventType eventType, final String message) {
event(eventType, message, null);
}

void event(final WorkflowSystemEventType eventType, final String message, final Object data) {
@Override
public void event(final WorkflowSystemEventType eventType, final String message, final Object data) {
event(Event.with(eventType, message, data));
}

private void event(final WorkflowSystemEvent event) {
@Override
public void event(final WorkflowSystemEvent event) {
if (null != listener) {
listener.onEvent(event);
}
}

protected boolean isWorkflowEndState(final MutableStateObj state) {
return state.hasState(Workflows.getWorkflowEndState());
@Override
public boolean isWorkflowEndState() {
return getState().hasState(Workflows.getWorkflowEndState());
}

@Override
public WorkflowSystemEventListener getListener() {
return listener;
}

@Override
public void setListener(WorkflowSystemEventListener listener) {
this.listener = listener;
}

@Override
public boolean isInterrupted() {
return interrupted;
}

public void setInterrupted(boolean interrupted) {
this.interrupted = interrupted;
}


static class WResult<D, T extends OperationCompleted<D>, X extends Operation<D, T>> implements
OperationResult<D, T, X>
Expand Down