Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

♻ that container #184

Merged
merged 33 commits into from Jun 16, 2018
Merged

♻ that container #184

merged 33 commits into from Jun 16, 2018

Conversation

rclark
Copy link
Contributor

@rclark rclark commented Feb 8, 2018

This is a work-in-progress PR that demonstrates a watchbot system that, instead of launching a new ECS task for each SQS message, uses individual containers to process multiple tasks.

Primary benefits:

  • Higher work throughput: you don't have to pay the cost of container startup for each message.
  • Stable reservation loads: ECS clusters can scale to meet processing demands more effectively.
  • Uses an ECS service: developers get CPU and memory utilization metrics in CloudWatch.
  • No RunTask API calls: avoid risk of being rate-limited and race-conditions in the ECS scheduler.
  • Easy ramp-up: the ECS service can scale up slowly to a massive spike of work in the queue.

The biggest caveats are:

  • Developers must install watchbot as part of the Dockerfiles that describe their workers.
  • Watchbot code runs as the container's main process, workers are run in child processes.
  • Scaling down the ECS service is tricky.

How it works

The developer writes a Dockerfile that describes the environment needed to process an SQS message. This includes the code that does the processing and must also include code for watchbot itself. The processing code must be encapsulated in a shell command that expects certain environment variables to convey the details of the message to be processed.

The developer constructs a CloudFormation template. Just like in previous versions, they use watchbot.template() to create a set of CloudForrmation resources to include in the template (SQS queue, roles, task definition, service, scaling policies, and alarms).

Launching this stack sets up an ECS service which will launch containers based on the developer's Dockerfile. Scaling policies will increase the service's desired number of containers when there are visible messages in the SQS queue. The maximum number of containers in the ECS service defines the maximum number of SQS messages that can be processed concurrently.

When a container launches, its main command is to run watchbot code. This code is responsible for polling the SQS queue, receiving a message, then processing the message by running the developer's shell command as a child process. Once the child process exits, watchbot code determines whether to delete the message or return it to the queue based on the process exit code, then polls the SQS queue again.

When there are no messages in-flight, meaning there is nothing being processed, then the ECS service scales itself down to 0 containers. The next time messages arrive in the SQS queue, the service will begin scaling itself up again.

Proof-of-concept

The container-recycling branch of ecs-telephone uses this ♻️ version of watchbot. The PR provides a good overview of what a developer needs to change about their existing watchbot system to work with this new version.

It looked good when I ran it. A single container got to the point where it was processing almost 2 SQS messages per second. Granted, ecs-telephone "processing" takes approximately no time. We should run more proof-of-concept stacks that do some more realistic work and expose more realistic SQS queue loads.

To-Do

A lot.

This was a rewrite, not a refactor, and the current state of the rewrite is intentionally very, very minimalistic. Some things that are missing that I know need to be added:

  • Alarms for when things go severely wrong
  • Logs. Its always been important that the "watcher" code give some insight into what's going on, and that "worker" logs are appropriately structured for search after the fact. Neither of these are implemented yet.
  • Track and log worker durations
  • Reduce-mode
  • CLI tools for managing dead-letter queue
  • This also includes no code to send notifications in the event of worker processing failure. We introduced dead-letter queues in v2, and this really altered the way that most developers are notified of worker failures. This PR currently includes a dead-letter queue, but no mechanisms for alarms or emails to be sent on any kind of failure. We should rebuild the notification settings in a way that makes sense in the new dead-letter world.

Other "interesting" things

  • I wanted to learn to write code using async/await so here we are in node.js v8. It won't work in v4 or v6.
  • Since developers are going to need to "install" watchbot in their Dockerfiles, we should make that trivial, and it should bundle node v8 in a way that will allow watchbot to run independently of whatever node version the actual processing code requires.
  • I tried using Jest as a way to do fixture-based testing of the watchbot.template() command. I think this is wise for what .template() is doing, but it is weird to have tape & jest tests.
  • I went heavily object-oriented in the code. I have this dream where that means someday we could introduce derivative "worker" classes that run work on other computing platforms. Maybe Lambda? Maybe straight on ECS with RunTask? The real hope is that this is making the system more maintainable / extensible and future changes maybe don't have to be rewrites.

cc @mapbox/platform

@rclark rclark requested a review from arunasank February 8, 2018 18:18
Copy link

@jakepruitt jakepruitt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SO EXCITED! Tried to give feedback about clarity wherever I could, but I also came in with bias from knowing a bit about watchbot beforehand. Would love to hear feedback from other folks.

lib/watcher.js Outdated
listen() {
return new Promise((resolve) => {
const loop = async () => {
if (this.stop) return resolve();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain this line a bit more? Maybe add a comment about the usage of this.stop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is a bummer testing caveat. In real-life, .stop will always be undefined. This promise will never resolve.

Tests can set .stop so that the intended infinite recursion will end. See the test here and a similar pattern in the messages.js polling loop. ... come to think of it I may be able to avoid it over there.

lib/watcher.js Outdated
const loop = async () => {
if (this.stop) return resolve();

const messages = await this.messages.waitFor();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How many messages get pulled at once?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ask here rather than in lib/messages.js because I assumed there would only ever be one message you're waiting for. With the plural messages I feel like there's a few things that could use some explaining at the watcher level - like how many messages get pulled in at a time? Do the workers run concurrently or serially?

Copy link
Contributor Author

@rclark rclark Feb 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.waitFor(num) is actually the function signature, with a default value of 1. I'm hedging a little bit here between concurrent vs serial processing.

It is so easy to write a Messages class that is accommodating of receiving 1-10 messages per API call that it almost feels like an oversight not to allow for that.

But for this watchbot iteration, we're looking at running 1 worker at a time per watcher, one after the other. Since this is .waitFor(1), that's what will happen.

What I guess I have done is put in some of the structure that would allow for concurrent workers per watcher, but not all of it. Maybe that's a bad idea.

lib/watcher.js Outdated
const worker = Worker.create(message, this.workerOptions);

worker.on('error', (err) => this.emit('error', err));
message.on('error', (err) => this.emit('error', err));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the retry error handling happen deeper in the workers/messages? What distinguishes these errors from retried errors?

Copy link
Contributor Author

@rclark rclark Feb 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think of these emitted errors as "watcher-level" errors that don't have anything to do with the developer's processing code encountering a failure. These errors are chained through event emitters, rather than via rejected promises, because the watcher is supposed to keep on chugging along if it...

  • fails to return a message to the queue because its receipt handle isn't valid anymore
  • failed an SQS receiveMessage API call due to an IntervalServerError
  • couldn't launch a child process because of xyz

When a child process exits non-zero, those "errors" are all translated into returning the message to the queue. Since there's currently no logging or notifications or alarms incorporated in this PR, processing errors just mean retry the job and nothing else. Finding the right way to surface those failures is probably the biggest TODO at the moment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the PR #185 added logging and that removed the need to bubble up error through event emitters. Instead, when an error is encountered it just gets logged.

lib/watcher.js Outdated
this.emit('error', err);
}

setImmediate(loop);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question about await - does it make this file hang at line 44 until all of the workers complete? Or does it ride through that line and call setImmediate again immediately? Mostly just asking for my own curiosity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code hangs on L44 until the promise resolves. If it rejects, the error is thrown, hence the try/catch.


static create(options) {
return new Watcher(options);
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I appreciate isolating the usage of new, or giving folks a way around it, just because I personally don't like new. Did you do this for the sake of tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, did it for tests. There's no easy way to mock constructors with sinon, and I spent way too long looking at other frameworks that would allow for it. Way too long.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually really like the usage of the static method to create an instance of the object as opposed to new. It makes the code way more readable, even though it might seem redundant.

lib/worker.js Outdated

async fail(err) {
if (err) this.emit('error', err);
return await this.message.retry();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So... will the worker both emit an error and retry the message? Will it only retry if an err is passed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another logging TBD here.

assert.end();
});

test('[bin.watchbot] error handling', async (assert) => {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the watcher continue to work if the watcher.listen() errors out? Does the container die and another one start up? Could you add a line confirming the exit code of the watchbot() process in either case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that there's no reject handler on watcher.listen()s promise. The only way this fails is through JS syntax errors in watchbot code itself, or if the OS decides to shut it down for some reason. Either way would result in a error getting printed and the process exiting. I can't actually simulate this in a test because it is not supposed to happen.

assert.end();
});

test('[message] retry, too many receieves', async (assert) => {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this where the dead letter queue lived before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default is for messages to go dead-letter after the 11th receive. That's still set up, but in the past this limit has been configurable (or at least configurability has been considered). So this check is about making sure we don't send an API call with a visibility timeout that's too long, in the event that we decide to make the dead-letter limit configurable.

@@ -0,0 +1,24 @@
'use strict';

const sinon = require('sinon');

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you go the route of a local helper instead of the "repeat yourself in tests" philosophy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh the stubber? Because

a) its a little messy to stub the .create() function AND end up with access to a stub instance of the class so you can spy on instance method calls.

b) all these things are event emitters, and I wanted those emitters to actually work on the stub instances.

c) I thought about this waaaaaaaaaaay too long. At one point this stubber was trying to get things done without having to use .create() factories, but I eventually got to the point where I'd lost most of a day and gained maybe 50 lines of code and a bad headache. This is what I fell back on.

cluster: 'processing'
});

expect(builtWithDefaults).toMatchSnapshot('defaults');

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😱 oh wow, so this is part of jest? Why did you want to go with this over tape?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty much all I know about Jest is that people use this to manage fixture-based tests for another type of complex object: the HTML on a frontend app. I thought I would see how it feels for cloudformation JSON. I could do fixture-based tests in tape for sure, but this was easier to set up.

Would encourage you to kick the tires a little and see if you think its worth keeping or if I should write a little fixture-checker in tape (think: readable diffs when tests fail).

@arunasank
Copy link
Contributor

arunasank commented Feb 9, 2018

Hi @rclark, this is a bad accountabilibuddy week. I spent all my day pushing on https://github.com/mapbox/mbxcli/pull/783, and am a little exhausted to be able to review this PR in a nice way. Super excited to read and understand this though, and hope to have lots of questions for you to answer further, on Monday!

@rclark rclark mentioned this pull request Feb 9, 2018
lib/message.js Outdated
constructor(sqsMessage = {}, options = {}) {
let valid = ['Body', 'MessageId', 'ReceiptHandle', 'Attributes'].reduce(
(valid, key) => {
if (!valid) return valid;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naive q: Why not just return false instead of valid? This wouldn't preserve the truthy value of valid from a previous call, and returning false would be simpler to read?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 will do

params: { QueueUrl: options.queueUrl }
});

this.logger = Logger.create('watcher', this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naive q 2: How does the value of the message instance percolate down to the logger class, considering we only have the options variable in https://github.com/mapbox/ecs-watchbot/blob/container-recycling/lib/logger.js#L65?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a bug, thanks, will test and fix.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok. Thought there was magic happening. 🙂 @rclark follow up: In both cases, we only have the queueUrl property in the options object (Both in this class as well as in the Messages class). Is there a reason why you haven't just used a string? Is it for extensibility, where you see this object containing other kinds of metadata in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, just for extensibility.


const params = {
ReceiptHandle: this.handle,
VisibilityTimeout: Math.pow(2, receives)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we setting the VisibilityTimeout using a power of 2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want there to be an exponential increase in the number of seconds that a message "waits" before it gets retried again. This is an attempt to protect any underlying resources (maybe s3, dynamo, ecs) that may get throttled by too-rapid retry.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related: #184 (comment)

data = await this.sqs.receiveMessage(params).promise();
} catch (err) {
this.logger.queueError(err);
return setImmediate(poll);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rclark is there a reason you prefer setImmediate over process.nextTick?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some subtle difference between the two that I never remember. setImmediate is my habit.

@arunasank
Copy link
Contributor

Hey @rclark, loved shadowing you on this PR and learned a ton. Wanted to list a bunch of stuff I saw in code for the first time:

  • setImmediate 🤔
  • async, await 😍 loving this - so easy to combine async/sync.

Also, this is the first time I am reviewing code that contains a lot of object-oriented JavaScript, and it makes the various pieces of the code so much clearer. Especially love how the various pieces come together in the watcher and the worker

rclark and others added 9 commits June 15, 2018 17:48
* adds logging of watcher-level errors, worker receives, and completion status

* prefixed logs from child processes
* change scale-down MetricIntervalLowerBound to MetricIntervalUpperBound
* Add alarms and alarm docs

* Add failedPlacementAlarmPeriods

* Add CloudWatch Alarms snapshots

* Update template jest snapshots

* Add CloudWatch Alarms snapshots

* Add failedworker and failedworkerplacement metric

* Typo r/LogGroup/Logs

* Change metric name

* Metric Filter of worker errors to "[failure]"

* Have current published version instead of undefined

* Jake's Review

* uh update-jest

* Update alarms.md
* Add travis user

* Ensure this fails

* Add validation for notificationEmail or notificationTopic
tapaswenipathak and others added 24 commits June 15, 2018 17:48
…queue threshold, info to doc (#211)

* Closes #208, #207, #206, #182, #149, #72, #15

(cherry picked from commit 8de328df79ccf52b8d612c625891555808c2fa0e)

* Add minSize as option

* update jest tests

* Change MinSize to 0

* update jest

* identation and minSize to 0

* Add deadletterThreshold info in Worker-retry-cycle
* Restrict writes to volumes and clean them after every job

* Try out the `ReadOnlyRootFilesystem` option

* Capitalization

* Add watchbot-log

* use strict

* No need to chmod now
@jakepruitt jakepruitt merged commit 7fc31c2 into master Jun 16, 2018
@arunasank arunasank deleted the container-recycling branch August 9, 2018 06:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants