Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enforce 12 hour maxJobDuration #275

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 9 additions & 1 deletion docs/building-a-template.md
Expand Up @@ -81,7 +81,7 @@ When creating your watchbot stacks with the `watchbot.template()` method, you no
**reservation.cpu** | The number of CPU units to reserve for your worker container. This will only impact the placement of your container on an EC2 with sufficient CPU capacity, but will not limit your container's utilization. This parameter can be provided as either a number or a reference, i.e. `{"Ref": "..."}`. | Number/Ref | Yes | -
**privileged** | Give the container elevated privileges on the host container instance | Boolean | No | false
**messageRetention** | The number of seconds that a message will exist in SQS until it is deleted. The default value is the maximum time that SQS allows, 14 days. This parameter can be provided as either a number or a reference, i.e. `{"Ref": "..."}`. | Number/Ref | No | 1209600 (14 days)
**maxJobDuration** | The maximum number of seconds that a job is allowed to run. After this time period, the worker will be stopped and the job will be returned to the queue. | Number/Ref | No | No | -
**maxJobDuration** | The maximum number of seconds that a job is allowed to run. After this time period, the worker will be stopped and the job will be returned to the queue. The maximum timeout is 43020 seconds, or twelve hours. | Number/Ref | No | 43020 | -
**notificationEmail** | The email to send alarm notifications to | String/Ref | No. Must specify either a `notificationTopic` or `notificationEmail` | -
**notificationTopic** | An SNS topic to send alarms to | String/Ref | No. Must specify either a `notificationTopic` or `notificationEmail` | -
**alarmPeriods** | Use this parameter to control the duration that the SQS queue must be over the message threshold before triggering an alarm. You specify the number of 5-minute periods before an alarm is triggered. The default is 24 periods, or 2 hours. This parameter can be provided as either a number or a reference, i.e. `{"Ref": "..."}`. | String/Ref | No | 24
Expand All @@ -103,6 +103,14 @@ In writableFilesystem mode, the whole file system is writable and containers are

writableFilesystem mode has no restrictions to the file system: workers can write anywhere and read from anywhere, their files being instantly deleted after the job finishes and the container dies.

### maxJobDuration explained

When maxJobDuration is exceeded by a worker, the worker will stop all processes and return the processing message to the queue. Typically, the maxJobDuration should be set marginally above the maximum time that it takes your application to process and delete a message from the queue. This ensures all work can be completed; while providing protection against erroneous processing continuing to the maximum of twelve hours.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm likely confused but it looks like if maxJobDuration is exceeded, the behavior is the same as when SQS hard limit of 12 hours is exceeded, because in both cases the message is returned to the queue, is that right?

Or will a worker keep working when the 12-hour visibility timeout is breached?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or will a worker keep working when the 12-hour visibility timeout is breached?

Yes, I believe the worker will keep working regardless of hitting the timeout. We could implement a hard cutoff, but that may or may not be desirable depending on the application context.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah makes sense, thanks @rclark!


Default maxJobDuration in watchbot is 43020 seconds, or roughly 12 hours. This parameter can be set lower than 43020 seconds; but not extended past 43020. If processing of one message takes over 12 hours then watchbot is not a good fit for your workload.

The maximum of 43020 is due to the [maximum visibility timeout](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html) on SQS messages. Whenever a worker is processing a message, a heartbeat function will continue to extend the timeout as long as the worker is processing the message. If the worker container is unexpectedly interupted, the message will return to visibility in a maximum of three minutes. Per AWS limits, the visibility timeout cannot be extended past twelve hours. Once the message visibility can no longer be extended, work would be duplicated as the message returns to the queue. Watchbot enforces a maxJobDuration to avoid this duplication.

### watchbot.template references

After building Watchbot resources using `watchbot.template()`, you may wish to reference some of those resources. The object returned from `watchbot.template()` provides references to a few of its resources through a `.ref` property:
Expand Down
9 changes: 7 additions & 2 deletions lib/template.js
Expand Up @@ -63,9 +63,10 @@ const dashboard = require(path.resolve(__dirname, 'dashboard.js'));
* number or a reference, i.e. `{"Ref": "..."}`.
* @param {Boolean} [options.privileged=false] - give the container elevated
* privileges on the host container instance
* @param {Number|ref} [options.maxJobDuration] - the maximum number of seconds
* @param {Number|ref} [options.maxJobDuration] - the number of seconds

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we say that the default is 12 hours?

* before a worker will exit and SQS will once again make the message visible.
* This parameter can be provided as either a number or a reference, i.e. `{"Ref": "..."}`.
* The maximum value for this option is 43020 (roughly 12 hours).
* @param {Number|ref} [options.messageRetention=1209600] - the number of seconds
* that a message will exist in SQS until it is deleted. The default value is
* the maximum time that SQS allows, 14 days. This parameter can be provided as
Expand Down Expand Up @@ -107,13 +108,17 @@ module.exports = (options = {}) => {
if (!options[required]) throw new Error(`options.${required} is required`);
});

if (options.maxJobDuration > 43020) throw new Error('options.maxJobDuration exceeds max value');
if (options.maxJobDuration < 1) throw new Error('options.maxJobDuration must be > 0');


options = Object.assign(
{
prefix: 'Watchbot',
watchbotVersion: 'v' + pkg.version,
reservation: {},
env: {},
maxJobDuration: 0,
maxJobDuration: 43020,
messageRetention: 1209600,
maxSize: 1,
minSize: 0,
Expand Down
18 changes: 9 additions & 9 deletions lib/worker.js
Expand Up @@ -17,18 +17,16 @@ const child = async (command, options, logger, maxJobDuration) =>
.on('error', (err) => reject(err))
.on('exit', (code, signal) => {
const duration = Date.now() - start;
if (maxJobDuration > 0) clearTimeout(maxTimeout);
clearTimeout(maxTimeout);
resolve({ code, signal, duration });
});

if (maxJobDuration > 0) {
maxTimeout = setTimeout(() => {
logger.log(`[worker] running killAll. duration has exceeded maxJobDuration. duration: ${Date.now() - start}`);
killAll(child.pid,(err) => {
if (err) logger.log(`[worker] killAll Error: ${err}`);
});
}, maxJobDuration * 1000);
}
maxTimeout = setTimeout(() => {
logger.log(`[worker] running killAll. duration has exceeded maxJobDuration. duration: ${Date.now() - start}`);
killAll(child.pid,(err) => {
if (err) logger.log(`[worker] killAll Error: ${err}`);
});
}, maxJobDuration * 1000);

child.stdout.pipe(logger.stream());
child.stderr.pipe(logger.stream());
Expand All @@ -40,6 +38,8 @@ class Worker {
throw new Error('Invalid Message object');

if (!options.command) throw new Error('Missing options: command');
if (!options.maxJobDuration) throw new Error('Missing options: maxJobDuration');


this.command = options.command;
this.volumes = options.volumes;
Expand Down