diff --git a/ProcessQueue.mpr b/ProcessQueue.mpr index 5a05b39..861562c 100644 Binary files a/ProcessQueue.mpr and b/ProcessQueue.mpr differ diff --git a/README.md b/README.md index fcd6641..0cdc383 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,6 @@ # ProcessQueue This module enables you to control the amount of these microflows that are executed at once by assigning them to queues. Each of these queues can be configured to handle a subset of these microflows and you can also set a limit to the number of microflows each queue can execute at once. This allows you to control the maximum load put on your application during peak usage by these microflows while still ensuring all microflows will be executed eventually. The queues use a FIFO approach (first-in, first-out) and will automatically restart themselves (and any microflows still left to execute) after a server restart. - ## Description This module enables you to control the load on your application by configuring different Queues. The amount of parallel processes and the number of queues can be controlled from the runtime and you’ll be able to see the progress real-time in your application. Typical usage scenario @@ -15,7 +14,6 @@ Typical usage scenario ## Dependencies - Mx Reflection Model module - ## Configuration After importing the module, you should connect the “QueueOverview” form to your application. This is the starting place for defining the different queues and processes. Add the microflow "ASu_InitialiseQueue" as a startup event to instantiate the queue. Before configuring the queue you need to synchronize the Mx Model Reflection module, make sure you sync the "ProcessQueue" as well. @@ -30,7 +28,9 @@ In the folder: “Example / Test” you will found an example how to queue an ac ## Other The constant: "FinishedQueuedActionsCleanupInDays" can be used to automatically clean up finished queued actions (through the scheduled event SE_CleanupFinishedQueuedActions): -Negative value = disabled.0 = clear all finished actions1 or more = clear all finished actions that completed [1 or more] days ago. +- Negative value = disabled. +- 0 = clear all finished actions +- 1 or more = clear all finished actions that completed [1 or more] days ago. Please note that (when dealing a large amount of actions in a short period of time): -> Create QueuedAction (no commit) -> add to list -> append to queue -> commit list of queued actions @@ -38,7 +38,3 @@ is inferior to: -> Create QueuedAction (commit) -> append to queue. However both constructions should work now (tested batch sizes up to 10000 objects in size). - -### Automatic retry behavior: - -The module will keep retrying exponentially for up to 11 retries. Initial retry will have a delay of 1 second (2^0), the second retry will take (2^1) seconds and the nth retry (2^n-1) seconds for a maximum of 2^10 = 1024 seconds for a combined total of 2047 seconds (=34 minutes) which is excessive but finite on purpose. In tests even adding 10000 actions at once (batch commit) will only take 5 retries (and only the first action is affected). This is basically the time it takes the microflow doing the batch commit (of queued actions) to complete. diff --git a/Releases/ProcessQueueMX601-V3.2.mpk b/Releases/ProcessQueueMX601-V3.2.mpk new file mode 100644 index 0000000..7922c92 Binary files /dev/null and b/Releases/ProcessQueueMX601-V3.2.mpk differ diff --git a/Releases/ProcessQueueMX710-V4.1.mpk b/Releases/ProcessQueueMX710-V4.1.mpk new file mode 100644 index 0000000..6a0d51d Binary files /dev/null and b/Releases/ProcessQueueMX710-V4.1.mpk differ diff --git a/Releases/ProcessQueueMx710-V4.1.1.mpk b/Releases/ProcessQueueMx710-V4.1.1.mpk new file mode 100644 index 0000000..2256ea7 Binary files /dev/null and b/Releases/ProcessQueueMx710-V4.1.1.mpk differ diff --git a/javasource/processqueue/actions/AppendNewActionToQueue.java b/javasource/processqueue/actions/AppendNewActionToQueue.java index ad69bd5..1ff95c4 100644 --- a/javasource/processqueue/actions/AppendNewActionToQueue.java +++ b/javasource/processqueue/actions/AppendNewActionToQueue.java @@ -1,97 +1,87 @@ -// This file was generated by Mendix Modeler. -// -// WARNING: Only the following code will be retained when actions are regenerated: -// - the import list -// - the code between BEGIN USER CODE and END USER CODE -// - the code between BEGIN EXTRA CODE and END EXTRA CODE -// Other code you write will be lost the next time you deploy the project. -// Special characters, e.g., é, ö, à, etc. are supported in comments. - -package processqueue.actions; - -import com.mendix.core.Core; -import com.mendix.core.CoreException; -import com.mendix.systemwideinterfaces.core.IContext; -import com.mendix.systemwideinterfaces.core.IMendixIdentifier; -import com.mendix.systemwideinterfaces.core.IMendixObject; -import com.mendix.webui.CustomJavaAction; -import processqueue.proxies.QueuedAction; -import processqueue.queuehandler.QueueHandler; - -/** - * Append the new action to the Queue, based on the configured process the action will be appended to the correct Queue. - * The action should be initialized with its default values, you should only change the RefrenceText for your own reference. Optionally you can set the association to the Process entity. - * - * Either the association between action and Process must be filled or you should provide an input parameter. - * Of course the input parameter is much faster since the Java doesn't need to retrieve anything from the database. - */ -public class AppendNewActionToQueue extends CustomJavaAction -{ - private IMendixObject __ActionToQueue; - private processqueue.proxies.QueuedAction ActionToQueue; - private IMendixObject __AddActionToProcess; - private processqueue.proxies.Process AddActionToProcess; - - public AppendNewActionToQueue(IContext context, IMendixObject ActionToQueue, IMendixObject AddActionToProcess) - { - super(context); - this.__ActionToQueue = ActionToQueue; - this.__AddActionToProcess = AddActionToProcess; - } - - @Override - public java.lang.Boolean executeAction() throws Exception - { - this.ActionToQueue = __ActionToQueue == null ? null : processqueue.proxies.QueuedAction.initialize(getContext(), __ActionToQueue); - - this.AddActionToProcess = __AddActionToProcess == null ? null : processqueue.proxies.Process.initialize(getContext(), __AddActionToProcess); - - // BEGIN USER CODE - IContext context = this.getContext().getSession().createContext().createSudoClone(); - - - String calling_microflow_name = ""; - try { - if( this.getContext().getActionStack() != null && this.getContext().getActionStack().size() > 0 ) - calling_microflow_name = this.getContext().getActionStack().get(0).getActionName(); - } - catch ( Exception e ) { - Core.getLogger(this.toString()).debug("Unable to get action stack, continueing"); - } - - IMendixObject process = this.__AddActionToProcess; - IMendixIdentifier processId = this.__ActionToQueue.getValue(context, QueuedAction.MemberNames.QueuedAction_Process.toString()); - - if( process == null ) { - if( processId != null ) { - process = Core.retrieveId(context, processId); - } - else - throw new CoreException("No process specified for Queued object: " + this.ActionToQueue.getActionNumber(context) ); - } - else if( processId == null ) { - this.__ActionToQueue.setValue(context, QueuedAction.MemberNames.QueuedAction_Process.toString(), this.__AddActionToProcess.getId()); - } - - //Make sure we commit the latest info about the action before passing it along - if( this.__ActionToQueue.isNew() || this.__ActionToQueue.isChanged() ) - Core.commit(getContext(), this.__ActionToQueue); - - QueueHandler.getQueueHandler().addActionToQueue(context, this.__ActionToQueue, process, false, calling_microflow_name); - - return true; - // END USER CODE - } - - /** - * Returns a string representation of this action - */ - @Override - public java.lang.String toString() - { - return "AppendNewActionToQueue"; - } - - // BEGIN EXTRA CODE - // END EXTRA CODE -} +// This file was generated by Mendix Modeler. +// +// WARNING: Only the following code will be retained when actions are regenerated: +// - the import list +// - the code between BEGIN USER CODE and END USER CODE +// - the code between BEGIN EXTRA CODE and END EXTRA CODE +// Other code you write will be lost the next time you deploy the project. +// Special characters, e.g., é, ö, à, etc. are supported in comments. + +package processqueue.actions; + +import com.mendix.core.Core; +import com.mendix.core.CoreException; +import com.mendix.systemwideinterfaces.core.IContext; +import com.mendix.systemwideinterfaces.core.IMendixIdentifier; +import com.mendix.systemwideinterfaces.core.IMendixObject; +import com.mendix.webui.CustomJavaAction; +import processqueue.proxies.QueuedAction; +import processqueue.queuehandler.QueueHandler; + +/** + * Append the new action to the Queue, based on the configured process the action will be appended to the correct Queue. + * The action should be initialized with its default values, you should only change the RefrenceText for your own reference. Optionally you can set the association to the Process entity. + * + * Either the association between action and Process must be filled or you should provide an input parameter. + * Of course the input parameter is much faster since the Java doesn't need to retrieve anything from the database. + */ +public class AppendNewActionToQueue extends CustomJavaAction +{ + private IMendixObject __ActionToQueue; + private processqueue.proxies.QueuedAction ActionToQueue; + private IMendixObject __AddActionToProcess; + private processqueue.proxies.Process AddActionToProcess; + + public AppendNewActionToQueue(IContext context, IMendixObject ActionToQueue, IMendixObject AddActionToProcess) + { + super(context); + this.__ActionToQueue = ActionToQueue; + this.__AddActionToProcess = AddActionToProcess; + } + + @Override + public java.lang.Boolean executeAction() throws Exception + { + this.ActionToQueue = __ActionToQueue == null ? null : processqueue.proxies.QueuedAction.initialize(getContext(), __ActionToQueue); + + this.AddActionToProcess = __AddActionToProcess == null ? null : processqueue.proxies.Process.initialize(getContext(), __AddActionToProcess); + + // BEGIN USER CODE + IContext context = Core.createSystemContext(); + + IMendixObject process = this.__AddActionToProcess; + IMendixIdentifier processId = this.__ActionToQueue.getValue(context, QueuedAction.MemberNames.QueuedAction_Process.toString()); + + if( process == null ) { + if( processId != null ) { + process = Core.retrieveId(context, processId); + } + else + throw new CoreException("No process specified for Queued object: " + this.ActionToQueue.getActionNumber(context) ); + } + else if( processId == null ) { + this.__ActionToQueue.setValue(context, QueuedAction.MemberNames.QueuedAction_Process.toString(), this.__AddActionToProcess.getId()); + } + + //Make sure we commit the latest info about the action before passing it along + if( this.__ActionToQueue.isNew() || this.__ActionToQueue.isChanged() ) + Core.commit(getContext(), this.__ActionToQueue); + + QueueHandler.getQueueHandler().appendActionForProcessing(context, this.__ActionToQueue, process, false); + + return true; + // END USER CODE + } + + /** + * Returns a string representation of this action + */ + @Override + public java.lang.String toString() + { + return "AppendNewActionToQueue"; + } + + // BEGIN EXTRA CODE + // END EXTRA CODE +} diff --git a/javasource/processqueue/actions/InitializeQueue.java b/javasource/processqueue/actions/InitializeQueue.java index 1efb152..87bf98d 100644 --- a/javasource/processqueue/actions/InitializeQueue.java +++ b/javasource/processqueue/actions/InitializeQueue.java @@ -1,51 +1,51 @@ -// This file was generated by Mendix Modeler. -// -// WARNING: Only the following code will be retained when actions are regenerated: -// - the import list -// - the code between BEGIN USER CODE and END USER CODE -// - the code between BEGIN EXTRA CODE and END EXTRA CODE -// Other code you write will be lost the next time you deploy the project. -// Special characters, e.g., é, ö, à, etc. are supported in comments. - -package processqueue.actions; - -import com.mendix.systemwideinterfaces.core.IMendixObject; -import processqueue.queuehandler.QueueHandler; -import com.mendix.systemwideinterfaces.core.IContext; -import com.mendix.webui.CustomJavaAction; - -/** - * Initialize the Queue using all configuration options specified in the entity. - * This prepares the Queue so actions can be added. - */ -public class InitializeQueue extends CustomJavaAction -{ - private IMendixObject QueConfig; - - public InitializeQueue(IContext context, IMendixObject QueConfig) - { - super(context); - this.QueConfig = QueConfig; - } - - @Override - public java.lang.Boolean executeAction() throws Exception - { - // BEGIN USER CODE - QueueHandler.getQueueHandler().initializeQueue(getContext(), this.QueConfig); - return true; - // END USER CODE - } - - /** - * Returns a string representation of this action - */ - @Override - public java.lang.String toString() - { - return "InitializeQueue"; - } - - // BEGIN EXTRA CODE - // END EXTRA CODE -} +// This file was generated by Mendix Modeler. +// +// WARNING: Only the following code will be retained when actions are regenerated: +// - the import list +// - the code between BEGIN USER CODE and END USER CODE +// - the code between BEGIN EXTRA CODE and END EXTRA CODE +// Other code you write will be lost the next time you deploy the project. +// Special characters, e.g., é, ö, à, etc. are supported in comments. + +package processqueue.actions; + +import com.mendix.systemwideinterfaces.core.IMendixObject; +import processqueue.queuehandler.QueueHandler; +import com.mendix.systemwideinterfaces.core.IContext; +import com.mendix.webui.CustomJavaAction; + +/** + * Initialize the Queue using all configuration options specified in the entity. + * This prepares the Queue so actions can be added. + */ +public class InitializeQueue extends CustomJavaAction +{ + private IMendixObject QueConfig; + + public InitializeQueue(IContext context, IMendixObject QueConfig) + { + super(context); + this.QueConfig = QueConfig; + } + + @Override + public java.lang.Boolean executeAction() throws Exception + { + // BEGIN USER CODE + QueueHandler.getQueueHandler().initializeQueue(getContext(), this.QueConfig); + return true; + // END USER CODE + } + + /** + * Returns a string representation of this action + */ + @Override + public java.lang.String toString() + { + return "InitializeQueue"; + } + + // BEGIN EXTRA CODE + // END EXTRA CODE +} diff --git a/javasource/processqueue/actions/ShutdownAllQueues.java b/javasource/processqueue/actions/ShutdownAllQueues.java index 878e66c..ee564bf 100644 --- a/javasource/processqueue/actions/ShutdownAllQueues.java +++ b/javasource/processqueue/actions/ShutdownAllQueues.java @@ -1,52 +1,52 @@ -// This file was generated by Mendix Modeler. -// -// WARNING: Only the following code will be retained when actions are regenerated: -// - the import list -// - the code between BEGIN USER CODE and END USER CODE -// - the code between BEGIN EXTRA CODE and END EXTRA CODE -// Other code you write will be lost the next time you deploy the project. -// Special characters, e.g., é, ö, à, etc. are supported in comments. - -package processqueue.actions; - -import processqueue.queuehandler.QueueHandler; -import com.mendix.systemwideinterfaces.core.IContext; -import com.mendix.webui.CustomJavaAction; - -/** - * Stop all running and scheduled Queued actions. Depending on the boolean parameter it throws an exception in the executing microflow or waits for it to finish without starting new actions. - * - * When shutting down gracefully is true the Queue waits for the Actions to finish. - */ -public class ShutdownAllQueues extends CustomJavaAction -{ - private java.lang.Boolean Gracefully; - - public ShutdownAllQueues(IContext context, java.lang.Boolean Gracefully) - { - super(context); - this.Gracefully = Gracefully; - } - - @Override - public java.lang.Boolean executeAction() throws Exception - { - // BEGIN USER CODE - QueueHandler.getQueueHandler().stopProcess(this.Gracefully); - - return true; - // END USER CODE - } - - /** - * Returns a string representation of this action - */ - @Override - public java.lang.String toString() - { - return "ShutdownAllQueues"; - } - - // BEGIN EXTRA CODE - // END EXTRA CODE -} +// This file was generated by Mendix Modeler. +// +// WARNING: Only the following code will be retained when actions are regenerated: +// - the import list +// - the code between BEGIN USER CODE and END USER CODE +// - the code between BEGIN EXTRA CODE and END EXTRA CODE +// Other code you write will be lost the next time you deploy the project. +// Special characters, e.g., é, ö, à, etc. are supported in comments. + +package processqueue.actions; + +import processqueue.queuehandler.QueueHandler; +import com.mendix.systemwideinterfaces.core.IContext; +import com.mendix.webui.CustomJavaAction; + +/** + * Stop all running and scheduled Queued actions. Depending on the boolean parameter it throws an exception in the executing microflow or waits for it to finish without starting new actions. + * + * When shutting down gracefully is true the Queue waits for the Actions to finish. + */ +public class ShutdownAllQueues extends CustomJavaAction +{ + private java.lang.Boolean Gracefully; + + public ShutdownAllQueues(IContext context, java.lang.Boolean Gracefully) + { + super(context); + this.Gracefully = Gracefully; + } + + @Override + public java.lang.Boolean executeAction() throws Exception + { + // BEGIN USER CODE + QueueHandler.getQueueHandler().stopProcess(this.Gracefully); + + return true; + // END USER CODE + } + + /** + * Returns a string representation of this action + */ + @Override + public java.lang.String toString() + { + return "ShutdownAllQueues"; + } + + // BEGIN EXTRA CODE + // END EXTRA CODE +} diff --git a/javasource/processqueue/actions/ShutdownQueue.java b/javasource/processqueue/actions/ShutdownQueue.java index 1975228..350737f 100644 --- a/javasource/processqueue/actions/ShutdownQueue.java +++ b/javasource/processqueue/actions/ShutdownQueue.java @@ -1,55 +1,55 @@ -// This file was generated by Mendix Modeler. -// -// WARNING: Only the following code will be retained when actions are regenerated: -// - the import list -// - the code between BEGIN USER CODE and END USER CODE -// - the code between BEGIN EXTRA CODE and END EXTRA CODE -// Other code you write will be lost the next time you deploy the project. -// Special characters, e.g., é, ö, à, etc. are supported in comments. - -package processqueue.actions; - -import processqueue.queuehandler.QueueHandler; -import com.mendix.systemwideinterfaces.core.IMendixObject; -import com.mendix.systemwideinterfaces.core.IContext; -import com.mendix.webui.CustomJavaAction; - -/** - * Stop the selected running and scheduled Queued actions. Depending on the boolean parameter it throws an exception in the executing microflow or waits for it to finish without starting new actions. - * - * When shutting down gracefully is true the Queue waits for the Actions to finish. - */ -public class ShutdownQueue extends CustomJavaAction -{ - private java.lang.Boolean Gracefully; - private IMendixObject QueueConfiguration; - - public ShutdownQueue(IContext context, java.lang.Boolean Gracefully, IMendixObject QueueConfiguration) - { - super(context); - this.Gracefully = Gracefully; - this.QueueConfiguration = QueueConfiguration; - } - - @Override - public java.lang.Boolean executeAction() throws Exception - { - // BEGIN USER CODE - IContext context = this.getContext().getSession().createContext().createSudoClone(); - QueueHandler.getQueueHandler().stopProcess(context, this.QueueConfiguration, this.Gracefully); - return true; - // END USER CODE - } - - /** - * Returns a string representation of this action - */ - @Override - public java.lang.String toString() - { - return "ShutdownQueue"; - } - - // BEGIN EXTRA CODE - // END EXTRA CODE -} +// This file was generated by Mendix Modeler. +// +// WARNING: Only the following code will be retained when actions are regenerated: +// - the import list +// - the code between BEGIN USER CODE and END USER CODE +// - the code between BEGIN EXTRA CODE and END EXTRA CODE +// Other code you write will be lost the next time you deploy the project. +// Special characters, e.g., é, ö, à, etc. are supported in comments. + +package processqueue.actions; + +import processqueue.queuehandler.QueueHandler; +import com.mendix.systemwideinterfaces.core.IMendixObject; +import com.mendix.systemwideinterfaces.core.IContext; +import com.mendix.webui.CustomJavaAction; + +/** + * Stop the selected running and scheduled Queued actions. Depending on the boolean parameter it throws an exception in the executing microflow or waits for it to finish without starting new actions. + * + * When shutting down gracefully is true the Queue waits for the Actions to finish. + */ +public class ShutdownQueue extends CustomJavaAction +{ + private java.lang.Boolean Gracefully; + private IMendixObject QueueConfiguration; + + public ShutdownQueue(IContext context, java.lang.Boolean Gracefully, IMendixObject QueueConfiguration) + { + super(context); + this.Gracefully = Gracefully; + this.QueueConfiguration = QueueConfiguration; + } + + @Override + public java.lang.Boolean executeAction() throws Exception + { + // BEGIN USER CODE + IContext context = this.getContext().getSession().createContext().createSudoClone(); + QueueHandler.getQueueHandler().stopProcess(context, this.QueueConfiguration, this.Gracefully); + return true; + // END USER CODE + } + + /** + * Returns a string representation of this action + */ + @Override + public java.lang.String toString() + { + return "ShutdownQueue"; + } + + // BEGIN EXTRA CODE + // END EXTRA CODE +} diff --git a/javasource/processqueue/queuehandler/ObjectQueueExecutor.java b/javasource/processqueue/queuehandler/ObjectQueueExecutor.java index ca4f26a..84b2453 100644 --- a/javasource/processqueue/queuehandler/ObjectQueueExecutor.java +++ b/javasource/processqueue/queuehandler/ObjectQueueExecutor.java @@ -22,25 +22,18 @@ /** * This class is responsible for executing the configured Microflow and updating the QueuedAction object afterwards with the correct status. - * @author JvdH + * @authors JvdH and KJP * */ public class ObjectQueueExecutor implements Runnable { private static final ILogNode _logNode = Core.getLogger("QueueExecutor"); - private final IContext context; + private IContext context; private String microflowName; private IMendixObject action; private long QAGuid; private long actionNr; - private String callingMicroflowName; - private String referenceText; private State _state = State.initiated; - private final int max_retries = processqueue.proxies.constants.Constants.getProcessQueueMaxRetries() != null - ? processqueue.proxies.constants.Constants.getProcessQueueMaxRetries().intValue() - : 11; - private int retryTimeMs = 1000; - public enum State { initiated, @@ -56,27 +49,19 @@ public enum State { threadFinished; } - public ObjectQueueExecutor( IContext context, IMendixObject action, IMendixObject process, String calling_microflow_name ) + public ObjectQueueExecutor( IContext context, IMendixObject action, IMendixObject process ) throws CoreException { this.context = context; this.QAGuid = action.getId().toLong(); - this.callingMicroflowName = calling_microflow_name; this.actionNr = action.getValue(this.context, QueuedAction.MemberNames.ActionNumber.toString()); - this.referenceText = action.getValue(this.context, QueuedAction.MemberNames.ReferenceText.toString()); this.microflowName = (String) process.getValue(this.context, Process.MemberNames.MicroflowFullname.toString()); this.action = action; this.action.setValue(this.context, QueuedAction.MemberNames.Phase.toString(), ActionStatus.Queued.toString()); - //Make sure we commit the latest info so status changes always get updated in the client as soon as possible. + // Based on ticket #56473: so status changes always get updated in the client as soon as possible. // E.g. actions being set to "Queued". - if( this.action.isNew() || this.action.isChanged() ) { - try { - Core.commit( this.context, this.action ); - } catch (CoreException e) { - _logNode.error("Error while trying to commit QueuedAction " + this.action.getValue(this.context, QueuedAction.MemberNames.ActionNumber.toString()) + " from queue", e); - } - } + Core.commit(this.context, this.action); } public void initializeAction(ActionStatus phase, LogExecutionStatus status ) { @@ -86,7 +71,7 @@ public void initializeAction(ActionStatus phase, LogExecutionStatus status ) { try { Core.commit( this.context, this.action ); - } catch (CoreException e) { + } catch (Exception e) { _logNode.error("Error while trying to commit QueuedAction " + this.action.getValue(this.context, QueuedAction.MemberNames.ActionNumber.toString()) + " from queue", e); } } @@ -103,25 +88,8 @@ public synchronized void run() this._state = State.preparingData; - int retries = 0; List qaResult = Core.retrieveXPathQuery(this.context, "//" + QueuedAction.getType() + "[ID=" + this.QAGuid + "]"); - /* sometimes it takes a few milliseconds for the record to end up in the database. Rescheduling leads to awkward behavior as - * the action numbers no longer follow the FIFO principle of the queue, so this is very much undesired. - * retries == 0 for always min. 1 retry is on purpose as 0 ms delay is not even enough when doing a simple 3 entities - * in a single loop commit. - * default 1000ms & 11 retries: 1 -> 2 -> 4 -> 8 -> 16 -> 32 -> 64 -> 128 -> 256 -> 512 -> 1024 seconds (sum 2047 seconds is 34 minutes, which is excessive but finite on purpose). - * An alternative queue method should be added for those that do not wish to rely on the FIFO order but also don't want - * don't want to skip any actions. - * - JPU (Dec 05, 2016) - */ - while (qaResult.size() == 0 && (retries == 0 || retries < this.max_retries)) { - _logNode.debug("QueuedAction: [" + this.QAGuid + "] is not available in the database yet so trying again... retries left: " + (this.max_retries - retries) + "."); - qaResult = Core.retrieveXPathQuery(this.context, "//" + QueuedAction.getType() + "[ID=" + this.QAGuid + "]"); - Thread.sleep(Math.round(this.retryTimeMs*Math.pow(2, retries))); - retries++; - } - if( qaResult.size() == 0 ) { /* this means that either the action is not available in the database yet due to a high application load or it means the * microflow creating this action has not successfully completed causing a rollback and the queued action is not committed @@ -140,9 +108,7 @@ public synchronized void run() * - JPU (Dec 05, 2016) */ String errorMessage = "QueuedAction: [" + this.QAGuid + "] is not available in the database " - + "(caused by high application load or rollback) so the QueuedAction is being skipped. " - + "Reference text: "+this.referenceText+" ; " - + "Calling microflow: "+this.callingMicroflowName; + + "(caused by high application load or rollback) so the QueuedAction is being skipped."; this._state = State.failed; @@ -186,15 +152,13 @@ public synchronized void run() _logNode.error("Error while executing: " + this.microflowName + " from the queue", e); setErrormessageAndCommit(this.context, this.action, "Error occured while executing the process, error:" + e.getMessage(), e, LogExecutionStatus.FailedExecuted, ( microflowResult != null && microflowResult ? ActionStatus.Finished : ActionStatus.Cancelled) ); } finally { - // while should not be necessary but sometimes is, unclear as to why... - // possibly a Runtime issue - found by Bart Luijten & Danny Roest - JUL 16 + this.context.endTransaction(); - while (this.context.isInTransaction()) { - this.context.endTransaction(); - } + // should not be possible, but there are some concerns this happens on occasion. + if (this.context.isInTransaction()) + _logNode.error("Error while closing transaction for action id: " + this.actionNr); } - setQueueNumber(0); if( microflowResult != null ) { if ( microflowResult == true ) { setExecutionLog(LogExecutionStatus.SuccesExecuted, ActionStatus.Finished); @@ -215,14 +179,14 @@ public synchronized void run() IMendixIdentifier processId = followUpAction.getValue(this.context, QueuedAction.MemberNames.QueuedAction_Process.toString()); if( processId != null ) { IMendixObject processObj = Core.retrieveId(this.context, processId); - QueueHandler.getQueueHandler().addActionToQueue(this.context, followUpAction, processObj, true, ""); + QueueHandler.getQueueHandler().appendActionForProcessing(this.context, followUpAction, processObj, true); } else { setErrormessageAndCommit(this.context, followUpAction, "No process found for the action", null, LogExecutionStatus.FailedExecuted, ActionStatus.Cancelled); } this._state = State.finishedFollowup; } - } catch (CoreException e) { + } catch (Exception e) { _logNode.info("Error during commit from queue", e); setErrormessageAndCommit(this.context, this.action, "An unknown error occured. Please contact your system administrator.", e, LogExecutionStatus.FailedExecuted, ActionStatus.Cancelled); } @@ -236,6 +200,9 @@ public synchronized void run() finally { this._state = State.threadFinished; } + + this.context = null; // to improve memory usage in Mendix 7 + } @@ -253,7 +220,7 @@ private static void setErrormessageAndCommit(IContext context, IMendixObject que paramMap.put("StackTrace", stackTraceToString(stacktrace)); paramMap.put("Phase", phase.toString()); Core.execute(context, "ProcessQueue.SF_WriteExecutionLog", paramMap); - } catch (CoreException e) { + } catch (Exception e) { _logNode.error("Error while setting log message with stacktrace and error message", e); } } @@ -271,7 +238,7 @@ private void setExecutionLog(LogExecutionStatus status, ActionStatus phase) paramMap.put("Phase", phase.toString()); Core.execute(this.context, "ProcessQueue.SF_WriteExecutionLog", paramMap); - } catch (CoreException e) { + } catch (Exception e) { _logNode.error("Error while setting execution log status: " + status , e); } } diff --git a/javasource/processqueue/queuehandler/QueueAction.java b/javasource/processqueue/queuehandler/QueueAction.java new file mode 100644 index 0000000..407369c --- /dev/null +++ b/javasource/processqueue/queuehandler/QueueAction.java @@ -0,0 +1,55 @@ +package processqueue.queuehandler; + +import com.mendix.systemwideinterfaces.core.IContext; +import com.mendix.systemwideinterfaces.core.IMendixObject; + +/** + * This class is responsible for ... + * @author KJP + * + */ + +public class QueueAction { + IContext context; + IMendixObject actionObject; + IMendixObject process; + Long actionNr; + boolean overrideFollowUp; + boolean existsInDB = false; + + public QueueAction(IContext context, Long actionNr, IMendixObject actionObject, IMendixObject process, boolean overrideFollowUp) { + this.context = context; + this.actionNr = actionNr; + this.actionObject = actionObject; + this.process = process; + this.overrideFollowUp = overrideFollowUp; + } + + public IContext getContext() { + return context; + } + + public IMendixObject getActionObject() { + return actionObject; + } + + public IMendixObject getProcess() { + return process; + } + + public Long getActionNr() { + return actionNr; + } + + public boolean isOverrideFollowUp() { + return overrideFollowUp; + } + + public boolean isExistsInDB() { + return existsInDB; + } + + public void setExistsInDB(boolean existsInDB) { + this.existsInDB = existsInDB; + } +} diff --git a/javasource/processqueue/queuehandler/QueueHandler.java b/javasource/processqueue/queuehandler/QueueHandler.java index 7a0868f..b1151e3 100644 --- a/javasource/processqueue/queuehandler/QueueHandler.java +++ b/javasource/processqueue/queuehandler/QueueHandler.java @@ -1,9 +1,14 @@ package processqueue.queuehandler; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.BlockingQueue; +import java.util.TreeMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -23,7 +28,7 @@ /** * This class manages all the different Queue's. * There should only exist one single instance of this class, that class keeps track of all Queues and appends new actions to it. - * @author JvdH + * @authors JvdH and KJP * */ public class QueueHandler { @@ -32,21 +37,32 @@ public class QueueHandler { * The map containing all running Queues, the key of the map is the unique Queue number. Which is the ReferenceNumber attribute of the QueueConfig * These ThreadPoolExecutors keep track of all Queued actions and start executing them. */ - private HashMap queueMap = new HashMap(); + private static HashMap queueMap = new HashMap(); /** To prevent unnecessary retrieves to acquire the correct Queue number for the process this map keeps track of the different processes and Queues they belong to. * The key of the map is the Guid for the process and its value is the Queue reference number */ - private HashMap processQueueConfig = new HashMap(); + private static HashMap processQueueConfig = new HashMap(); - private boolean running = false; + private static List queueActionList = Collections.synchronizedList(new ArrayList()); + private static Map queueActionMap = new TreeMap(); + +// private boolean running = false; private static ILogNode _node = Core.getLogger("QueueHandler"); private static QueueHandler _handler; + private static QueueAppender _queueAppender; + /** * @return The instance of the QueueHandler */ public static QueueHandler getQueueHandler() { + if (_queueAppender == null || _queueAppender.isAlive()==false) { + _queueAppender = new QueueAppender(); + _queueAppender.setDaemon(true); + _queueAppender.start(); + } + if ( _handler == null) { _handler = new QueueHandler(); @@ -55,9 +71,9 @@ public static QueueHandler getQueueHandler() return _handler; } - public boolean isRunning(){ - return this.running; - } +// public boolean isRunning(){ +// return this.running; +// } /** * Stop all running and scheduled Queued actions. @@ -67,22 +83,22 @@ public boolean isRunning(){ * @param gracefully */ public void stopProcess( boolean gracefully ) { - _node.info("Stopping running process"); - this.running = false; - for( Entry entry : this.queueMap.entrySet() ) { + _node.info("Shutting down all Queues " + (gracefully ? "gracefully": "")); +// this.running = false; + for( Entry entry : queueMap.entrySet() ) { if( gracefully ) entry.getValue().shutdown(); else entry.getValue().shutdownNow(); } - _node.debug("All pools are stopped"); + _node.debug("All Queues are stopped " + (gracefully ? "gracefully": "")); } public void stopProcess(IContext context, IMendixObject queueConfiguration, Boolean gracefully) { Long queueNr = queueConfiguration.getValue(context, SharedQueueConfiguration.MemberNames.QueueRefNr.toString()); _node.trace("Shutting down queue: " + queueNr); - ThreadPoolExecutor queue = this.queueMap.remove(queueNr); + ThreadPoolExecutor queue = queueMap.remove(queueNr); if( queue == null ) _node.error("Unable to locate queue: " + queueNr + ". Is this queue running?"); else { @@ -110,24 +126,30 @@ public void initializeQueue( IContext context, IMendixObject queueConfiguration _node.trace("Starting with values queuenr: " + queueNr + " nr of threads "+ nrOfThreads); - ThreadFactory customThreadFactory = new ThreadFactoryBuilder() - .setNamePrefix(queueName).setDaemon(false) - .setPriority(threadAffinity) - .build(); - - _node.info("(Re)setting pool with number: "+ queueNr); - /* - * Create a FixedThreadPool, this ThreadPool limits the number of threads. - * Unless specified there should be no limit on the Queue and neither will keep it processes waiting until a spot in the Queue opens op. - * - * Most other configuration options will keep the appending processes waiting in case the Queue becomes fuller - */ - ThreadPoolExecutor tPool = new ThreadPoolExecutor(nrOfThreads, nrOfThreads, - 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue(), - customThreadFactory); + ThreadFactory customThreadFactory = new ThreadFactoryBuilder() + .setNamePrefix(queueName).setDaemon(false) + .setPriority(threadAffinity) + .build(); + + _node.info("(Re)setting pool with number: "+ queueNr); + /* + * Create a FixedThreadPool, this ThreadPool limits the number of threads. + * Unless specified there should be no limit on the Queue and neither will keep it processes waiting until a spot in the Queue opens op. + * + * Most other configuration options will keep the appending processes waiting in case the Queue becomes fuller + */ + ThreadPoolExecutor tPool = new ThreadPoolExecutor(nrOfThreads, nrOfThreads, + Long.MAX_VALUE, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), + customThreadFactory); + + ThreadPoolExecutor existingTpool = queueMap.get(queueNr); - this.queueMap.put(queueNr, tPool); + queueMap.put(queueNr, tPool); // replace existing tPool + + if (existingTpool != null) { + existingTpool.shutdown(); // clean up previous tPool + } } /** @@ -139,9 +161,30 @@ public void initializeQueue( IContext context, IMendixObject queueConfiguration * @param overrideFollowUp : if this parameter is true the action will be added directly to the regardless of its dependencies * @throws CoreException */ - public synchronized void addActionToQueue( IContext context, IMendixObject actionObject, IMendixObject process, boolean overrideFollowUp, String calling_microflow_name ) throws CoreException { + public synchronized void appendActionForProcessing( IContext context, IMendixObject actionObject, IMendixObject process, boolean overrideFollowUp ) throws CoreException { + QueueAction action = new QueueAction( context, + actionObject.getValue(context, QueuedAction.MemberNames.ActionNumber.toString()), + actionObject, + process, + overrideFollowUp + ); + synchronized(queueActionList) { + queueActionList.add(action); + } + } + + /** + * Append the new action to the Queue, based on the configured process the action will be appended to the correct Queue. + * When an action is a follow-up action the Queue will make sure it only starts scheduling the actions without dependencies, once finished all dependent actions will be added to the Queue as well. + * + * @param actionObject + * @param process + * @param overrideFollowUp : if this parameter is true the action will be added directly to the regardless of its dependencies + * @throws CoreException + */ + public synchronized static void addActionToQueue( IContext context, IMendixObject actionObject, IMendixObject process, boolean overrideFollowUp ) throws CoreException { - Long queueNr = this.processQueueConfig.get( process.getId().toLong()); + Long queueNr = processQueueConfig.get( process.getId().toLong()); //In case the queue number isn't cached yet, just retrieve the associated QueueConfiguration to acquire the correct queue number if( queueNr == null ) { IMendixIdentifier queueId = process.getValue(context, Process.MemberNames.Process_QueueConfiguration.toString()); @@ -153,10 +196,12 @@ public synchronized void addActionToQueue( IContext context, IMendixObject actio throw new CoreException("Unable to schedule queued action: " + actionObject.getValue(context, QueuedAction.MemberNames.ActionNumber.toString()) + " / " + actionObject.getValue(context, QueuedAction.MemberNames.ReferenceText.toString()) + " the configured microflow: " + microflowName + " does not exist."); queueNr = queue.getValue(context, SharedQueueConfiguration.MemberNames.QueueRefNr.toString()); - this.processQueueConfig.put(process.getId().toLong(), queueNr); + processQueueConfig.put(process.getId().toLong(), queueNr); _node.debug("Adding queue to the pool: " + queueNr ); } + actionObject.setValue(context, QueuedAction.MemberNames.QueueNumber.toString(), queueNr); + /* * We don't want to process follow up actions immediately, just skip them and wait until the're passed again * once the override boolean is set to true we know that we are processing a follow up action for the second time @@ -183,10 +228,10 @@ else if (prevActionId == null ) _node.debug("Adding action to queue: " + queueNr ); - ThreadPoolExecutor tPool = this.queueMap.get(queueNr); + ThreadPoolExecutor tPool = queueMap.get(queueNr); if(tPool != null) { - ObjectQueueExecutor thread = new ObjectQueueExecutor(context, actionObject, process, calling_microflow_name); + ObjectQueueExecutor thread = new ObjectQueueExecutor(context, actionObject, process); tPool.execute(thread); } else { @@ -198,43 +243,65 @@ else if (prevActionId == null ) } } - /** - * @return All the relevant information about each Queue. It shows for each Queue, its name, possible queue size and the currently running/waiting actions - */ - public String monitor( boolean showDebug ) - { - try - { - String message = ""; - for( Entry entry : this.queueMap.entrySet() ) { - ThreadPoolExecutor te = entry.getValue(); - BlockingQueue queue = te.getQueue(); - - message += String.format( "Queue: %d, Total active: %d, Total number in waiting queue: %d, Max pool size: %d | ", - entry.getKey(), - te.getActiveCount(), - queue.size(), - te.getMaximumPoolSize() - ); - - if( showDebug ) { - message += "\r\n
ACTIVE: \r\n
"; - for( Runnable r : te.getActiveThreads() ) { - ObjectQueueExecutor qe = (ObjectQueueExecutor) r; - message += String.format( " - Action: %d, MF: %s, State: %s \r\n
", - qe.getActionNr(), - qe.getMicroflowName(), - qe.getState().toString()); - - } - } - } - return message; - } - catch (Exception e) - { - return "Unknown error occured in ThreadPoolManager. Please contact your system administrator!"; - } - - } + private static class QueueAppender extends Thread { + public void run() { + while (true) { + Boolean append = true; + + synchronized(queueActionList) { + int size = queueActionList.size(); + if (size > 0) { + int index = 0; + while (index < size) { + QueueAction action = queueActionList.get(index); + queueActionMap.put(action.actionNr, action); + index++; + } + queueActionList.subList(0, queueActionList.size()).clear(); + } + } + + Iterator it = queueActionMap.keySet().iterator(); + + while (it.hasNext()) { + Long actionNr = it.next(); + QueueAction action = queueActionMap.get(actionNr); + + if (action.existsInDB) { + try { + addActionToQueue(action.context, action.actionObject, action.process, action.overrideFollowUp); + it.remove(); + } catch (CoreException e) { + _node.error("Error occurred while trying to add action to the queue in the QueueAppender.", e); + } + + } + else { + Long qaGUID = action.actionObject.getId().toLong(); + try { + List qaResult = Core.retrieveXPathQuery(action.context, "//" + QueuedAction.getType() + "[ID=" + qaGUID + "]"); + if (qaResult.size() > 0) { + action.setExistsInDB(true); + if (append) { + addActionToQueue(action.context, action.actionObject, action.process, action.overrideFollowUp); + it.remove(); + } + } + else { + append = false; + } + } catch (CoreException e) { + _node.error("Error occurred while trying to add action to the queue in the QueueAppender.", e); + } + } + } + + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + _node.error("Queue Appender thread got interrupted while trying to pause.", e); + } + } + } + } } diff --git a/javasource/processqueue/queuehandler/ThreadFactoryBuilder.java b/javasource/processqueue/queuehandler/ThreadFactoryBuilder.java index 25cd9b9..aabb0c6 100644 --- a/javasource/processqueue/queuehandler/ThreadFactoryBuilder.java +++ b/javasource/processqueue/queuehandler/ThreadFactoryBuilder.java @@ -54,7 +54,7 @@ public ThreadFactoryBuilder setUncaughtExceptionHandler( public ThreadFactoryBuilder setThreadFactory( ThreadFactory backingThreadFactory) { - if (null == this.uncaughtExceptionHandler) { + if (null == backingThreadFactory) { throw new NullPointerException( "BackingThreadFactory cannot be null"); } diff --git a/javasource/processqueue/queuehandler/ThreadPoolExecutor.java b/javasource/processqueue/queuehandler/ThreadPoolExecutor.java index 8ab88f1..fc7a0e9 100644 --- a/javasource/processqueue/queuehandler/ThreadPoolExecutor.java +++ b/javasource/processqueue/queuehandler/ThreadPoolExecutor.java @@ -3,7 +3,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -11,7 +11,7 @@ public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor private List activeThreads; - public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory) { + public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, LinkedBlockingQueue workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); this.activeThreads = Collections.synchronizedList( new ArrayList(maximumPoolSize) );