Skip to content

Commit

Permalink
Allow users to write to any volume (#200)
Browse files Browse the repository at this point in the history
* Restrict writes to volumes and clean them after every job

* Try out the `ReadOnlyRootFilesystem` option

* Capitalization

* Add watchbot-log

* use strict

* No need to chmod now
  • Loading branch information
Jake Pruitt committed Jun 16, 2018
1 parent c6aa113 commit 1892852
Show file tree
Hide file tree
Showing 15 changed files with 271 additions and 89 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
@@ -1,3 +1,5 @@
language: node_js
node_js:
- '8'
services:
- docker
19 changes: 19 additions & 0 deletions bin/watchbot-log.js
@@ -0,0 +1,19 @@
#!/usr/bin/env node
'use strict';

/**
* watchbot-log "something that you want logged"
* - or -
* echo "somehing that you want logged" | watchbot-log
*/

const Logger = require('..').Logger;
const args = process.argv.slice(2);

const logger = new Logger('worker');

if (args[0]) {
return logger.log(args[0]);
}

process.stdin.pipe(logger.stream());
3 changes: 2 additions & 1 deletion bin/watchbot.js
Expand Up @@ -11,11 +11,12 @@ const main = async () => {

const logger = Logger.create('watcher');
const command = process.argv.slice(3).join(' ');
const volumes = process.env.Volumes.split(',');

const options = {
queueUrl: process.env.QueueUrl,
fresh: process.env.fresh === 'true' ? true : false,
workerOptions: { command }
workerOptions: { command, volumes }
};

const watcher = Watcher.create(options);
Expand Down
75 changes: 37 additions & 38 deletions lib/template.js
Expand Up @@ -15,7 +15,7 @@ const pkg = require(path.resolve(__dirname, '..', 'package.json'));
* repository where your images are stored.
* @param {String|ref} options.serviceVersion - the version of you service to
* deploy. This should reference a specific image in ECR.
* @param {String} options.command - the shell command that should be executed
* @param {String} [options.command] - the shell command that should be executed
* in order to process a single message.
* @param {Array} [options.permissions=[]] - permissions that your worker will
* need in order to complete tasks.
Expand All @@ -35,22 +35,17 @@ const pkg = require(path.resolve(__dirname, '..', 'package.json'));
* to specify this in order to differentiate the resources.
* @param {String} [options.family] - the name of the the task definition family
* that watchbot will create revisions of.
* @param {Number|ref} [options.maxSize=1] - the maximum size for the service to
* scale up to. This parameter can be provided as either a number or a reference,
* i.e. `{"Ref": "..."}`.
* @param {Number} [options.maxSize=1] - the maximum size for the service to
* scale up to. This parameter must be provided as a number.
* @param {Number|ref} [options.minSize=0] - the minimum size for the service to
* scale down to. This parameter can be provided as either a number or a reference,
* i.e. `{"Ref": "..."}`.
* @param {String} [options.mounts=''] - if your worker containers need to mount
* files or folders from the host EC2 file system, specify those mounts with this parameter.
* A single persistent mount point can be specified as `{host location}:{container location}`,
* e.g. /root:/mnt/root. A single ephemeral mount point can be specified as `{container location}`,
* e.g. /mnt/tmp. Separate multiple mount strings with commas if you need to mount
* more than one location. You can also specify a mount object with `container`
* and `host` property arrays, in which the indeces
* correspond: `{ container: [{container location}], host: [{host location}] }`,
* e.g. { container: [/mnt/root, /mnt/tmp], host: [/root, ''] }. A blank host
* entry will create an ephemeral mount point at the corresponding container filepath.
* @param {String} [options.mounts=''] - if your worker containers need to write
* files or folders inside its file system, specify those locations with this parameter.
* A single ephemeral mount point can be specified as `{container location}`, e.g. /mnt/tmp.
* Separate multiple mount strings with commas if you need to mount more than one location.
* You can also specify a mount object as an arrays of paths. Every mounted volume will be
* cleaned after each job.
* @param {Object} [options.reservation={}] - worker container resource reservations
* @param {Number|ref} [options.reservation.memory] - the number of MB of RAM
* to reserve as a hard limit. If your worker container tries to utilize more
Expand Down Expand Up @@ -135,7 +130,7 @@ module.exports = (options = {}) => {

const prefixed = (name) => `${options.prefix}${name}`;

const unpackEnv = (env) => {
const unpackEnv = (env, mountPoints) => {
return Object.keys(env).reduce(
(unpacked, key) => {
unpacked.push({ Name: key, Value: env[key] });
Expand All @@ -144,16 +139,17 @@ module.exports = (options = {}) => {
[
{ Name: 'WorkTopic', Value: cf.ref(prefixed('Topic')) },
{ Name: 'QueueUrl', Value: cf.ref(prefixed('Queue')) },
{ Name: 'fresh', Value: options.fresh }
{ Name: 'fresh', Value: options.fresh },
{ Name: 'Volumes', Value: mountPoints.map((m) => m.ContainerPath).join(',') }
]
);
};

const mount = (mountInputs) => {
let formatted = { container: [], host: [] };
let formatted = [];
const mounts = {
mountPoints: [],
volumes: []
mountPoints: [{ ContainerPath: '/tmp', SourceVolume: 'tmp' }],
volumes: [{ Name: 'tmp' }]
};

if (typeof mountInputs === 'object') formatted = mountInputs;
Expand All @@ -162,26 +158,19 @@ module.exports = (options = {}) => {
if (!mountStr.length) return;

const persistent = /:/.test(mountStr);
formatted.container.push(
formatted.push(
persistent ? mountStr.split(':')[1] : mountStr
);
formatted.host.push(persistent ? mountStr.split(':')[0] : '');
});
}

formatted.container.forEach((container, i) => {
formatted.forEach((container, i) => {
const name = 'mnt-' + i;
const host = formatted.host[i] ? { SourcePath: formatted.host[i] } : {};

mounts.mountPoints.push({ ContainerPath: container, SourceVolume: name });
mounts.volumes.push({ Name: name, Host: host });
mounts.volumes.push({ Name: name });
});

if (!mounts.mountPoints.length) {
mounts.mountPoints = undefined;
mounts.volumes = undefined;
}

return mounts;
};

Expand Down Expand Up @@ -264,7 +253,7 @@ module.exports = (options = {}) => {
}
};

Resources[prefixed('Logs')] = {
Resources[prefixed('LogGroup')] = {
Type: 'AWS::Logs::LogGroup',
Properties: {
LogGroupName: cf.join('-', [
Expand Down Expand Up @@ -314,7 +303,7 @@ module.exports = (options = {}) => {
'logs:PutLogEvents',
'logs:FilterLogEvents'
],
Resource: cf.getAtt(prefixed('Logs'), 'Arn')
Resource: cf.getAtt(prefixed('LogGroup'), 'Arn')
},
{
Effect: 'Allow',
Expand Down Expand Up @@ -353,7 +342,7 @@ module.exports = (options = {}) => {
]),
Cpu: options.reservation.cpu,
Privileged: options.privileged,
Environment: unpackEnv(options.env),
Environment: unpackEnv(options.env, mounts.mountPoints),
MountPoints: mounts.mountPoints,
Command: ['watchbot', 'listen', `${options.command}`],
Ulimits: [
Expand All @@ -363,10 +352,11 @@ module.exports = (options = {}) => {
HardLimit: 10240
}
],
ReadonlyRootFilesystem: true,
LogConfiguration: {
LogDriver: 'awslogs',
Options: {
'awslogs-group': cf.ref(prefixed('Logs')),
'awslogs-group': cf.ref(prefixed('LogGroup')),
'awslogs-region': cf.region,
'awslogs-stream-prefix': options.serviceVersion
}
Expand All @@ -391,7 +381,7 @@ module.exports = (options = {}) => {
Type: 'AWS::ECS::Service',
Properties: {
Cluster: options.cluster,
DesiredCount: 0,
DesiredCount: options.minSize,
TaskDefinition: cf.ref(prefixed('Task'))
}
};
Expand Down Expand Up @@ -481,7 +471,7 @@ module.exports = (options = {}) => {
Type: 'AWS::CloudWatch::Alarm',
Properties: {
AlarmName: cf.sub('${AWS::StackName}-scale-up'),
AlarmDescription: 'Scale up workers due to visible messages in queue',
AlarmDescription: 'Scale up due to visible messages in queue',
EvaluationPeriods: 1,
Statistic: 'Maximum',
Threshold: 0,
Expand Down Expand Up @@ -521,7 +511,7 @@ module.exports = (options = {}) => {
Properties: {
AlarmName: cf.sub('${AWS::StackName}-scale-down'),
AlarmDescription:
'Scale down workers due to lack of in-flight messages in queue',
'Scale down due to lack of in-flight messages in queue',
EvaluationPeriods: 1,
Statistic: 'Maximum',
Threshold: 1,
Expand Down Expand Up @@ -560,7 +550,7 @@ module.exports = (options = {}) => {
Type: 'AWS::Logs::MetricFilter',
Properties: {
FilterPattern: '"[failure]"',
LogGroupName: cf.ref(prefixed('Logs')),
LogGroupName: cf.ref(prefixed('LogGroup')),
MetricTransformations: [{
MetricName: cf.join([prefixed('WorkerErrors-'), cf.stackName]),
MetricNamespace: 'Mapbox/ecs-watchbot',
Expand Down Expand Up @@ -609,5 +599,14 @@ module.exports = (options = {}) => {
}
};

return cf.merge({ Resources });
const ref = {
logGroup: cf.ref(prefixed('LogGroup')),
topic: cf.ref(prefixed('Topic')),
queueUrl: cf.ref(prefixed('Queue')),
queueArn: cf.getAtt(prefixed('Queue'), 'Arn'),
queueName: cf.getAtt(prefixed('Queue'), 'QueueName'),
notificationTopic: cf.ref(prefixed('NotificationTopic'))
};

return { Resources, ref };
};
2 changes: 1 addition & 1 deletion lib/watcher.js
Expand Up @@ -22,7 +22,7 @@ class Watcher {
}

listen() {
return new Promise((resolve) => {
return new Promise(async (resolve) => {
const loop = async () => {
if (this.stop) return resolve();

Expand Down
9 changes: 9 additions & 0 deletions lib/worker.js
@@ -1,6 +1,7 @@
'use strict';

const child_process = require('child_process');
const fsExtra = require('fs-extra');
const Message = require('./message');
const Logger = require('./logger');

Expand Down Expand Up @@ -28,6 +29,7 @@ class Worker {
if (!options.command) throw new Error('Missing options: command');

this.command = options.command;
this.volumes = options.volumes;
this.message = message;
this.logger = Logger.create('watcher', message);
}
Expand All @@ -52,6 +54,10 @@ class Worker {
return await this.message.retry();
}

async clean(volumes) {
return Promise.all(volumes.map((volume) => fsExtra.emptyDir(volume)));
}

async waitFor() {
const options = {
shell: true,
Expand All @@ -61,6 +67,9 @@ class Worker {

try {
const results = await child(this.command, options, this.logger);

await this.clean(this.volumes);

if (results.code === 0) return await this.success(results);
if (results.code === 3) return await this.ignore(results);
if (results.code === 4) return await this.noop(results);
Expand Down
26 changes: 24 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions package.json
Expand Up @@ -7,12 +7,14 @@
"node": ">=8"
},
"bin": {
"watchbot": "./bin/watchbot.js"
"watchbot": "./bin/watchbot.js",
"watchbot-log": "./bin/watchbot-log.js"
},
"scripts": {
"pretest": "npm run lint && npm run validate-templates",
"lint": "eslint bin lib test index.js",
"test": "tape test/*.test.js | tap-spec && jest test/*.jest.js",
"test": "docker build -q -t ecs-watchbot -f test/Dockerfile ./ && docker run -t ecs-watchbot npm run test-container",
"test-container": "tape test/*.test.js | tap-spec && jest test/*.jest.js",
"validate-templates": "tape test/template.validation.js | tap-spec",
"update-jest-snapshots": "jest -u test/*.jest.js",
"coverage": "nyc --reporter html tape test/*.test.js && opener coverage/index.html"
Expand Down Expand Up @@ -42,6 +44,7 @@
"@mapbox/cloudfriend": "^1.9.0",
"aws-sdk": "^2.188.0",
"binary-split": "^1.0.3",
"fs-extra": "^6.0.1",
"stream-combiner2": "^1.1.1"
}
}
8 changes: 8 additions & 0 deletions test/Dockerfile
@@ -0,0 +1,8 @@
FROM node:8.10

COPY ./package.json ./package.json
COPY ./package-lock.json ./package-lock.json

RUN npm install

COPY ./ ./

0 comments on commit 1892852

Please sign in to comment.