From 1892852c3d16e51cae21d1ead15d3a4600b2200e Mon Sep 17 00:00:00 2001 From: Jake Pruitt Date: Fri, 15 Jun 2018 17:44:11 -0700 Subject: [PATCH] Allow users to write to any volume (#200) * Restrict writes to volumes and clean them after every job * Try out the `ReadOnlyRootFilesystem` option * Capitalization * Add watchbot-log * use strict * No need to chmod now --- .travis.yml | 2 + bin/watchbot-log.js | 19 +++++ bin/watchbot.js | 3 +- lib/template.js | 75 +++++++++--------- lib/watcher.js | 2 +- lib/worker.js | 9 +++ package-lock.json | 26 ++++++- package.json | 7 +- test/Dockerfile | 8 ++ test/__snapshots__/template.jest.js.snap | 58 +++++++++----- test/bin.watchbot.test.js | 13 +++- test/template.jest.js | 11 +-- test/template.validation.js | 8 +- test/watcher.test.js | 22 ++++-- test/worker.test.js | 97 +++++++++++++++++++++--- 15 files changed, 271 insertions(+), 89 deletions(-) create mode 100644 bin/watchbot-log.js create mode 100644 test/Dockerfile diff --git a/.travis.yml b/.travis.yml index a7b9ca08..4d451c1a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,3 +1,5 @@ language: node_js node_js: - '8' +services: +- docker diff --git a/bin/watchbot-log.js b/bin/watchbot-log.js new file mode 100644 index 00000000..f4697ba7 --- /dev/null +++ b/bin/watchbot-log.js @@ -0,0 +1,19 @@ +#!/usr/bin/env node +'use strict'; + +/** + * watchbot-log "something that you want logged" + * - or - + * echo "somehing that you want logged" | watchbot-log + */ + +const Logger = require('..').Logger; +const args = process.argv.slice(2); + +const logger = new Logger('worker'); + +if (args[0]) { + return logger.log(args[0]); +} + +process.stdin.pipe(logger.stream()); diff --git a/bin/watchbot.js b/bin/watchbot.js index e5cf7ab7..f3dd3240 100755 --- a/bin/watchbot.js +++ b/bin/watchbot.js @@ -11,11 +11,12 @@ const main = async () => { const logger = Logger.create('watcher'); const command = process.argv.slice(3).join(' '); + const volumes = process.env.Volumes.split(','); const options = { queueUrl: process.env.QueueUrl, fresh: process.env.fresh === 'true' ? true : false, - workerOptions: { command } + workerOptions: { command, volumes } }; const watcher = Watcher.create(options); diff --git a/lib/template.js b/lib/template.js index 7cc5992f..15b20640 100644 --- a/lib/template.js +++ b/lib/template.js @@ -15,7 +15,7 @@ const pkg = require(path.resolve(__dirname, '..', 'package.json')); * repository where your images are stored. * @param {String|ref} options.serviceVersion - the version of you service to * deploy. This should reference a specific image in ECR. - * @param {String} options.command - the shell command that should be executed + * @param {String} [options.command] - the shell command that should be executed * in order to process a single message. * @param {Array} [options.permissions=[]] - permissions that your worker will * need in order to complete tasks. @@ -35,22 +35,17 @@ const pkg = require(path.resolve(__dirname, '..', 'package.json')); * to specify this in order to differentiate the resources. * @param {String} [options.family] - the name of the the task definition family * that watchbot will create revisions of. - * @param {Number|ref} [options.maxSize=1] - the maximum size for the service to - * scale up to. This parameter can be provided as either a number or a reference, - * i.e. `{"Ref": "..."}`. + * @param {Number} [options.maxSize=1] - the maximum size for the service to + * scale up to. This parameter must be provided as a number. * @param {Number|ref} [options.minSize=0] - the minimum size for the service to * scale down to. This parameter can be provided as either a number or a reference, * i.e. `{"Ref": "..."}`. - * @param {String} [options.mounts=''] - if your worker containers need to mount - * files or folders from the host EC2 file system, specify those mounts with this parameter. - * A single persistent mount point can be specified as `{host location}:{container location}`, - * e.g. /root:/mnt/root. A single ephemeral mount point can be specified as `{container location}`, - * e.g. /mnt/tmp. Separate multiple mount strings with commas if you need to mount - * more than one location. You can also specify a mount object with `container` - * and `host` property arrays, in which the indeces - * correspond: `{ container: [{container location}], host: [{host location}] }`, - * e.g. { container: [/mnt/root, /mnt/tmp], host: [/root, ''] }. A blank host - * entry will create an ephemeral mount point at the corresponding container filepath. + * @param {String} [options.mounts=''] - if your worker containers need to write + * files or folders inside its file system, specify those locations with this parameter. + * A single ephemeral mount point can be specified as `{container location}`, e.g. /mnt/tmp. + * Separate multiple mount strings with commas if you need to mount more than one location. + * You can also specify a mount object as an arrays of paths. Every mounted volume will be + * cleaned after each job. * @param {Object} [options.reservation={}] - worker container resource reservations * @param {Number|ref} [options.reservation.memory] - the number of MB of RAM * to reserve as a hard limit. If your worker container tries to utilize more @@ -135,7 +130,7 @@ module.exports = (options = {}) => { const prefixed = (name) => `${options.prefix}${name}`; - const unpackEnv = (env) => { + const unpackEnv = (env, mountPoints) => { return Object.keys(env).reduce( (unpacked, key) => { unpacked.push({ Name: key, Value: env[key] }); @@ -144,16 +139,17 @@ module.exports = (options = {}) => { [ { Name: 'WorkTopic', Value: cf.ref(prefixed('Topic')) }, { Name: 'QueueUrl', Value: cf.ref(prefixed('Queue')) }, - { Name: 'fresh', Value: options.fresh } + { Name: 'fresh', Value: options.fresh }, + { Name: 'Volumes', Value: mountPoints.map((m) => m.ContainerPath).join(',') } ] ); }; const mount = (mountInputs) => { - let formatted = { container: [], host: [] }; + let formatted = []; const mounts = { - mountPoints: [], - volumes: [] + mountPoints: [{ ContainerPath: '/tmp', SourceVolume: 'tmp' }], + volumes: [{ Name: 'tmp' }] }; if (typeof mountInputs === 'object') formatted = mountInputs; @@ -162,26 +158,19 @@ module.exports = (options = {}) => { if (!mountStr.length) return; const persistent = /:/.test(mountStr); - formatted.container.push( + formatted.push( persistent ? mountStr.split(':')[1] : mountStr ); - formatted.host.push(persistent ? mountStr.split(':')[0] : ''); }); } - formatted.container.forEach((container, i) => { + formatted.forEach((container, i) => { const name = 'mnt-' + i; - const host = formatted.host[i] ? { SourcePath: formatted.host[i] } : {}; mounts.mountPoints.push({ ContainerPath: container, SourceVolume: name }); - mounts.volumes.push({ Name: name, Host: host }); + mounts.volumes.push({ Name: name }); }); - if (!mounts.mountPoints.length) { - mounts.mountPoints = undefined; - mounts.volumes = undefined; - } - return mounts; }; @@ -264,7 +253,7 @@ module.exports = (options = {}) => { } }; - Resources[prefixed('Logs')] = { + Resources[prefixed('LogGroup')] = { Type: 'AWS::Logs::LogGroup', Properties: { LogGroupName: cf.join('-', [ @@ -314,7 +303,7 @@ module.exports = (options = {}) => { 'logs:PutLogEvents', 'logs:FilterLogEvents' ], - Resource: cf.getAtt(prefixed('Logs'), 'Arn') + Resource: cf.getAtt(prefixed('LogGroup'), 'Arn') }, { Effect: 'Allow', @@ -353,7 +342,7 @@ module.exports = (options = {}) => { ]), Cpu: options.reservation.cpu, Privileged: options.privileged, - Environment: unpackEnv(options.env), + Environment: unpackEnv(options.env, mounts.mountPoints), MountPoints: mounts.mountPoints, Command: ['watchbot', 'listen', `${options.command}`], Ulimits: [ @@ -363,10 +352,11 @@ module.exports = (options = {}) => { HardLimit: 10240 } ], + ReadonlyRootFilesystem: true, LogConfiguration: { LogDriver: 'awslogs', Options: { - 'awslogs-group': cf.ref(prefixed('Logs')), + 'awslogs-group': cf.ref(prefixed('LogGroup')), 'awslogs-region': cf.region, 'awslogs-stream-prefix': options.serviceVersion } @@ -391,7 +381,7 @@ module.exports = (options = {}) => { Type: 'AWS::ECS::Service', Properties: { Cluster: options.cluster, - DesiredCount: 0, + DesiredCount: options.minSize, TaskDefinition: cf.ref(prefixed('Task')) } }; @@ -481,7 +471,7 @@ module.exports = (options = {}) => { Type: 'AWS::CloudWatch::Alarm', Properties: { AlarmName: cf.sub('${AWS::StackName}-scale-up'), - AlarmDescription: 'Scale up workers due to visible messages in queue', + AlarmDescription: 'Scale up due to visible messages in queue', EvaluationPeriods: 1, Statistic: 'Maximum', Threshold: 0, @@ -521,7 +511,7 @@ module.exports = (options = {}) => { Properties: { AlarmName: cf.sub('${AWS::StackName}-scale-down'), AlarmDescription: - 'Scale down workers due to lack of in-flight messages in queue', + 'Scale down due to lack of in-flight messages in queue', EvaluationPeriods: 1, Statistic: 'Maximum', Threshold: 1, @@ -560,7 +550,7 @@ module.exports = (options = {}) => { Type: 'AWS::Logs::MetricFilter', Properties: { FilterPattern: '"[failure]"', - LogGroupName: cf.ref(prefixed('Logs')), + LogGroupName: cf.ref(prefixed('LogGroup')), MetricTransformations: [{ MetricName: cf.join([prefixed('WorkerErrors-'), cf.stackName]), MetricNamespace: 'Mapbox/ecs-watchbot', @@ -609,5 +599,14 @@ module.exports = (options = {}) => { } }; - return cf.merge({ Resources }); + const ref = { + logGroup: cf.ref(prefixed('LogGroup')), + topic: cf.ref(prefixed('Topic')), + queueUrl: cf.ref(prefixed('Queue')), + queueArn: cf.getAtt(prefixed('Queue'), 'Arn'), + queueName: cf.getAtt(prefixed('Queue'), 'QueueName'), + notificationTopic: cf.ref(prefixed('NotificationTopic')) + }; + + return { Resources, ref }; }; diff --git a/lib/watcher.js b/lib/watcher.js index d321045e..167a66fe 100644 --- a/lib/watcher.js +++ b/lib/watcher.js @@ -22,7 +22,7 @@ class Watcher { } listen() { - return new Promise((resolve) => { + return new Promise(async (resolve) => { const loop = async () => { if (this.stop) return resolve(); diff --git a/lib/worker.js b/lib/worker.js index 20f17a5f..d64ccd30 100644 --- a/lib/worker.js +++ b/lib/worker.js @@ -1,6 +1,7 @@ 'use strict'; const child_process = require('child_process'); +const fsExtra = require('fs-extra'); const Message = require('./message'); const Logger = require('./logger'); @@ -28,6 +29,7 @@ class Worker { if (!options.command) throw new Error('Missing options: command'); this.command = options.command; + this.volumes = options.volumes; this.message = message; this.logger = Logger.create('watcher', message); } @@ -52,6 +54,10 @@ class Worker { return await this.message.retry(); } + async clean(volumes) { + return Promise.all(volumes.map((volume) => fsExtra.emptyDir(volume))); + } + async waitFor() { const options = { shell: true, @@ -61,6 +67,9 @@ class Worker { try { const results = await child(this.command, options, this.logger); + + await this.clean(this.volumes); + if (results.code === 0) return await this.success(results); if (results.code === 3) return await this.ignore(results); if (results.code === 4) return await this.noop(results); diff --git a/package-lock.json b/package-lock.json index e1a3c6bb..f90ac7c1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1486,6 +1486,16 @@ "samsam": "1.3.0" } }, + "fs-extra": { + "version": "6.0.1", + "resolved": "https://registry.npmjs.org/fs-extra/-/fs-extra-6.0.1.tgz", + "integrity": "sha512-GnyIkKhhzXZUWFCaJzvyDLEEgDkPfb4/TPvJCJVuS8MWZgoSsErf++QpiAlDnKFcqhRlm+tIOcencCjyJE6ZCA==", + "requires": { + "graceful-fs": "4.1.11", + "jsonfile": "4.0.0", + "universalify": "0.1.1" + } + }, "fs.realpath": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/fs.realpath/-/fs.realpath-1.0.0.tgz", @@ -2485,8 +2495,7 @@ "graceful-fs": { "version": "4.1.11", "resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.1.11.tgz", - "integrity": "sha1-Dovf5NHduIVNZOBOp8AOKgJuVlg=", - "dev": true + "integrity": "sha1-Dovf5NHduIVNZOBOp8AOKgJuVlg=" }, "growly": { "version": "1.3.0", @@ -3484,6 +3493,14 @@ "integrity": "sha1-Hq3nrMASA0rYTiOWdn6tn6VJWCE=", "dev": true }, + "jsonfile": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/jsonfile/-/jsonfile-4.0.0.tgz", + "integrity": "sha1-h3Gq4HmbZAdrdmQPygWPnBDjPss=", + "requires": { + "graceful-fs": "4.1.11" + } + }, "jsonify": { "version": "0.0.0", "resolved": "https://registry.npmjs.org/jsonify/-/jsonify-0.0.0.tgz", @@ -6774,6 +6791,11 @@ "integrity": "sha512-UIEXBNeYmKptWH6z8ZnqTeS8fV74zG0/eRU9VGkpzz+LIJNs8W/zM/L+7ctCkRrgbNnnR0xxw4bKOr0cW0N0Og==", "dev": true }, + "universalify": { + "version": "0.1.1", + "resolved": "https://registry.npmjs.org/universalify/-/universalify-0.1.1.tgz", + "integrity": "sha1-+nG63UQ3r0wUiEHjs7Fl+enlkLc=" + }, "url": { "version": "0.10.3", "resolved": "https://registry.npmjs.org/url/-/url-0.10.3.tgz", diff --git a/package.json b/package.json index 532581d2..a36e4d91 100644 --- a/package.json +++ b/package.json @@ -7,12 +7,14 @@ "node": ">=8" }, "bin": { - "watchbot": "./bin/watchbot.js" + "watchbot": "./bin/watchbot.js", + "watchbot-log": "./bin/watchbot-log.js" }, "scripts": { "pretest": "npm run lint && npm run validate-templates", "lint": "eslint bin lib test index.js", - "test": "tape test/*.test.js | tap-spec && jest test/*.jest.js", + "test": "docker build -q -t ecs-watchbot -f test/Dockerfile ./ && docker run -t ecs-watchbot npm run test-container", + "test-container": "tape test/*.test.js | tap-spec && jest test/*.jest.js", "validate-templates": "tape test/template.validation.js | tap-spec", "update-jest-snapshots": "jest -u test/*.jest.js", "coverage": "nyc --reporter html tape test/*.test.js && opener coverage/index.html" @@ -42,6 +44,7 @@ "@mapbox/cloudfriend": "^1.9.0", "aws-sdk": "^2.188.0", "binary-split": "^1.0.3", + "fs-extra": "^6.0.1", "stream-combiner2": "^1.1.1" } } diff --git a/test/Dockerfile b/test/Dockerfile new file mode 100644 index 00000000..d50bd612 --- /dev/null +++ b/test/Dockerfile @@ -0,0 +1,8 @@ +FROM node:8.10 + +COPY ./package.json ./package.json +COPY ./package-lock.json ./package-lock.json + +RUN npm install + +COPY ./ ./ diff --git a/test/__snapshots__/template.jest.js.snap b/test/__snapshots__/template.jest.js.snap index 1c3bb251..9a3b4485 100644 --- a/test/__snapshots__/template.jest.js.snap +++ b/test/__snapshots__/template.jest.js.snap @@ -60,7 +60,7 @@ Object { }, "Type": "AWS::SQS::Queue", }, - "SoupLogs": Object { + "SoupLogGroup": Object { "Properties": Object { "LogGroupName": Object { "Fn::Join": Array [ @@ -242,7 +242,7 @@ Object { "Effect": "Allow", "Resource": Object { "Fn::GetAtt": Array [ - "SoupLogs", + "SoupLogGroup", "Arn", ], }, @@ -324,7 +324,7 @@ Object { "Ref": "SoupScaleDown", }, ], - "AlarmDescription": "Scale down workers due to lack of in-flight messages in queue", + "AlarmDescription": "Scale down due to lack of in-flight messages in queue", "AlarmName": Object { "Fn::Sub": "\${AWS::StackName}-scale-down", }, @@ -379,7 +379,7 @@ Object { "Ref": "SoupScaleUp", }, ], - "AlarmDescription": "Scale up workers due to visible messages in queue", + "AlarmDescription": "Scale up due to visible messages in queue", "AlarmName": Object { "Fn::Sub": "\${AWS::StackName}-scale-up", }, @@ -513,6 +513,10 @@ Object { "Name": "fresh", "Value": false, }, + Object { + "Name": "Volumes", + "Value": "/tmp,/data,/ephemeral", + }, Object { "Name": "MyKey", "Value": "MyValue", @@ -536,7 +540,7 @@ Object { "LogDriver": "awslogs", "Options": Object { "awslogs-group": Object { - "Ref": "SoupLogs", + "Ref": "SoupLogGroup", }, "awslogs-region": Object { "Ref": "AWS::Region", @@ -547,6 +551,10 @@ Object { "Memory": 512, "MemoryReservation": 128, "MountPoints": Array [ + Object { + "ContainerPath": "/tmp", + "SourceVolume": "tmp", + }, Object { "ContainerPath": "/data", "SourceVolume": "mnt-0", @@ -558,6 +566,7 @@ Object { ], "Name": "soup-example", "Privileged": true, + "ReadonlyRootFilesystem": true, "Ulimits": Array [ Object { "HardLimit": 10240, @@ -573,13 +582,12 @@ Object { }, "Volumes": Array [ Object { - "Host": Object { - "SourcePath": "/mnt/data", - }, + "Name": "tmp", + }, + Object { "Name": "mnt-0", }, Object { - "Host": Object {}, "Name": "mnt-1", }, ], @@ -637,7 +645,7 @@ Object { "Properties": Object { "FilterPattern": "\\"[failure]\\"", "LogGroupName": Object { - "Ref": "SoupLogs", + "Ref": "SoupLogGroup", }, "MetricTransformations": Array [ Object { @@ -723,7 +731,7 @@ Object { }, "Type": "AWS::SQS::Queue", }, - "WatchbotLogs": Object { + "WatchbotLogGroup": Object { "Properties": Object { "LogGroupName": Object { "Fn::Join": Array [ @@ -905,7 +913,7 @@ Object { "Effect": "Allow", "Resource": Object { "Fn::GetAtt": Array [ - "WatchbotLogs", + "WatchbotLogGroup", "Arn", ], }, @@ -965,7 +973,7 @@ Object { "Ref": "WatchbotScaleDown", }, ], - "AlarmDescription": "Scale down workers due to lack of in-flight messages in queue", + "AlarmDescription": "Scale down due to lack of in-flight messages in queue", "AlarmName": Object { "Fn::Sub": "\${AWS::StackName}-scale-down", }, @@ -1020,7 +1028,7 @@ Object { "Ref": "WatchbotScaleUp", }, ], - "AlarmDescription": "Scale up workers due to visible messages in queue", + "AlarmDescription": "Scale up due to visible messages in queue", "AlarmName": Object { "Fn::Sub": "\${AWS::StackName}-scale-up", }, @@ -1154,6 +1162,10 @@ Object { "Name": "fresh", "Value": false, }, + Object { + "Name": "Volumes", + "Value": "/tmp", + }, ], "Image": Object { "Fn::Join": Array [ @@ -1173,7 +1185,7 @@ Object { "LogDriver": "awslogs", "Options": Object { "awslogs-group": Object { - "Ref": "WatchbotLogs", + "Ref": "WatchbotLogGroup", }, "awslogs-region": Object { "Ref": "AWS::Region", @@ -1181,9 +1193,15 @@ Object { "awslogs-stream-prefix": "1", }, }, - "MountPoints": undefined, + "MountPoints": Array [ + Object { + "ContainerPath": "/tmp", + "SourceVolume": "tmp", + }, + ], "Name": "watchbot-example", "Privileged": false, + "ReadonlyRootFilesystem": true, "Ulimits": Array [ Object { "HardLimit": 10240, @@ -1197,7 +1215,11 @@ Object { "TaskRoleArn": Object { "Ref": "WatchbotRole", }, - "Volumes": undefined, + "Volumes": Array [ + Object { + "Name": "tmp", + }, + ], }, "Type": "AWS::ECS::TaskDefinition", }, @@ -1252,7 +1274,7 @@ Object { "Properties": Object { "FilterPattern": "\\"[failure]\\"", "LogGroupName": Object { - "Ref": "WatchbotLogs", + "Ref": "WatchbotLogGroup", }, "MetricTransformations": Array [ Object { diff --git a/test/bin.watchbot.test.js b/test/bin.watchbot.test.js index 9cfac2f2..424f1f29 100644 --- a/test/bin.watchbot.test.js +++ b/test/bin.watchbot.test.js @@ -12,6 +12,7 @@ test('[bin.watchbot] success', async (assert) => { const argv = process.argv; process.argv = ['', '', 'listen', 'echo', 'hello', 'world']; process.env.QueueUrl = 'https://faker'; + process.env.Volumes = '/tmp,/mnt'; try { await watchbot(); @@ -22,8 +23,11 @@ test('[bin.watchbot] success', async (assert) => { assert.ok( Watcher.create.calledWith({ queueUrl: 'https://faker', - workerOptions: { command: 'echo hello world' }, - fresh: false + fresh: false, + workerOptions: { + command: 'echo hello world', + volumes: ['/tmp', '/mnt'] + } }), 'watcher created with expected arguments' ); @@ -31,6 +35,7 @@ test('[bin.watchbot] success', async (assert) => { assert.equal(watcher.listen.callCount, 1, 'called watcher.listen()'); delete process.env.QueueUrl; + delete process.env.Volumes; process.argv = argv; watcher.teardown(); assert.end(); @@ -45,6 +50,7 @@ test('[bin.watchbot] error handling', async (assert) => { const argv = process.argv; process.argv = ['', '', 'listen', 'echo', 'hello', 'world']; process.env.QueueUrl = 'https://faker'; + process.env.Volumes = '/tmp,/mnt'; try { await watchbot(); @@ -58,6 +64,7 @@ test('[bin.watchbot] error handling', async (assert) => { ); delete process.env.QueueUrl; + delete process.env.Volumes; process.argv = argv; logger.teardown(); watcher.teardown(); @@ -68,6 +75,7 @@ test('[bin.watchbot] bad arguments', async (assert) => { const argv = process.argv; process.argv = ['', '', 'watch', 'echo', 'hello', 'world']; process.env.QueueUrl = 'https://faker'; + process.env.Volumes = '/tmp,/mnt'; try { await watchbot(); @@ -80,6 +88,7 @@ test('[bin.watchbot] bad arguments', async (assert) => { } delete process.env.QueueUrl; + delete process.env.Volumes; process.argv = argv; assert.end(); }); diff --git a/test/template.jest.js b/test/template.jest.js index 05d3a999..d4c1b375 100644 --- a/test/template.jest.js +++ b/test/template.jest.js @@ -4,6 +4,7 @@ const assert = require('assert'); const template = require('../lib/template'); +const cf = require('@mapbox/cloudfriend'); test('[template]', () => { assert.throws( @@ -12,17 +13,17 @@ test('[template]', () => { 'throws when missing required options' ); - const builtWithDefaults = template({ + const builtWithDefaults = cf.merge(template({ service: 'example', serviceVersion: '1', command: 'echo hello world', cluster: 'processing', notificationEmail: 'hello@mapbox.pagerduty.com' - }); + })); expect(builtWithDefaults).toMatchSnapshot('defaults'); - const setsAllOptions = template({ + const setsAllOptions = cf.merge(template({ service: 'example', serviceVersion: '1', command: 'echo hello world', @@ -40,7 +41,7 @@ test('[template]', () => { prefix: 'Soup', family: 'abc-123', maxSize: 90, - mounts: '/mnt/data:/data,/ephemeral', + mounts: '/data,/ephemeral', reservation: { memory: 512, softMemory: 128, @@ -50,7 +51,7 @@ test('[template]', () => { messageTimeout: 300, messageRetention: 1096, notificationEmail: 'hello@mapbox.pagerduty.com' - }); + })); expect(setsAllOptions).toMatchSnapshot('all-properties'); }); diff --git a/test/template.validation.js b/test/template.validation.js index 1413cd07..e097054b 100644 --- a/test/template.validation.js +++ b/test/template.validation.js @@ -9,13 +9,13 @@ const cf = require('@mapbox/cloudfriend'); const template = require('../lib/template'); test('[template validation] defaults', async (assert) => { - const builtWithDefaults = template({ + const builtWithDefaults = cf.merge(template({ service: 'example', serviceVersion: '1', command: 'echo hello world', cluster: 'processing', notificationEmail: 'hello@mapbox.pagerduty.com' - }); + })); const tmp = path.join(os.tmpdir(), crypto.randomBytes(8).toString('hex')); fs.writeFileSync(tmp, JSON.stringify(builtWithDefaults)); @@ -31,7 +31,7 @@ test('[template validation] defaults', async (assert) => { }); test('[template validation] options set', async (assert) => { - const setsAllOptions = template({ + const setsAllOptions = cf.merge(template({ service: 'example', serviceVersion: '1', command: 'echo hello world', @@ -59,7 +59,7 @@ test('[template validation] options set', async (assert) => { messageTimeout: 300, messageRetention: 1096, notificationEmail: 'hello@mapbox.pagerduty.com' - }); + })); const tmp = path.join(os.tmpdir(), crypto.randomBytes(8).toString('hex')); fs.writeFileSync(tmp, JSON.stringify(setsAllOptions)); diff --git a/test/watcher.test.js b/test/watcher.test.js index 09e655be..decf2909 100644 --- a/test/watcher.test.js +++ b/test/watcher.test.js @@ -26,7 +26,10 @@ test('[watcher] constructor', (assert) => { const options = { queueUrl: 'https://faker', - workerOptions: { command: 'echo hello world' } + workerOptions: { + command: 'echo hello world', + volumes: ['/tmp'] + } }; const watcher = new Watcher(options); @@ -55,8 +58,11 @@ test('[watcher] listens exactly once', async (assert) => { const watcher = new Watcher({ queueUrl: 'https://faker', - workerOptions: { command: 'echo hello world' }, - fresh: true + fresh: true, + workerOptions: { + command: 'echo hello world', + volumes: ['/tmp'] + } }); await watcher.listen(); @@ -73,7 +79,10 @@ test('[watcher] listen', async (assert) => { const messages = stubber(Messages).setup(); const worker = stubber(Worker).setup(); - const workerOptions = { command: 'echo hello world' }; + const workerOptions = { + command: 'echo hello world', + volumes: ['/tmp','/mnt'] + }; const watcher = new Watcher({ queueUrl: 'https://faker', @@ -122,7 +131,10 @@ test('[watcher] listen', async (assert) => { test('[watcher] factory', (assert) => { const watcher = Watcher.create({ queueUrl: 'https://faker', - workerOptions: { command: 'echo hello world' } + workerOptions: { + command: 'echo hello world', + volumes: ['/tmp'] + } }); assert.ok(watcher instanceof Watcher, 'creates a Watcher object'); diff --git a/test/worker.test.js b/test/worker.test.js index 36654208..e49f1252 100644 --- a/test/worker.test.js +++ b/test/worker.test.js @@ -3,6 +3,8 @@ const stream = require('stream'); const events = require('events'); const child_process = require('child_process'); +const fs = require('fs'); +const fsExtra = require('fs-extra'); const test = require('tape'); const sinon = require('sinon'); const Worker = require('../lib/worker'); @@ -30,7 +32,7 @@ test('[worker] constructor', (assert) => { ); const message = sinon.createStubInstance(Message); - const worker = new Worker(message, { command: 'echo hello world' }); + const worker = new Worker(message, { command: 'echo hello world', volumes: ['/tmp'] }); assert.equal(worker.message, message, 'sets .message'); assert.equal(worker.command, 'echo hello world', 'sets .command'); @@ -40,7 +42,7 @@ test('[worker] constructor', (assert) => { test('[worker] factory', (assert) => { const message = sinon.createStubInstance(Message); - const options = { command: 'echo hello world' }; + const options = { command: 'echo hello world', volumes: ['/tmp'] }; const worker = Worker.create(message, options); assert.ok(worker instanceof Worker, 'returns a Worker object'); assert.end(); @@ -49,7 +51,7 @@ test('[worker] factory', (assert) => { test('[worker] fail', async (assert) => { const logger = stubber(Logger).setup(); const message = sinon.createStubInstance(Message); - const options = { command: 'echo hello world' }; + const options = { command: 'echo hello world', volumes: ['/tmp'] }; const worker = new Worker(message, options); const results = { code: 124, signal: 'SIGTERM', duration: 12345 }; @@ -65,7 +67,7 @@ test('[worker] fail', async (assert) => { test('[worker] noop', async (assert) => { const logger = stubber(Logger).setup(); const message = sinon.createStubInstance(Message); - const options = { command: 'echo hello world' }; + const options = { command: 'echo hello world', volumes: ['/tmp'] }; const worker = new Worker(message, options); const results = { code: 4, duration: 12345 }; @@ -81,7 +83,7 @@ test('[worker] noop', async (assert) => { test('[worker] ignore', async (assert) => { const logger = stubber(Logger).setup(); const message = sinon.createStubInstance(Message); - const options = { command: 'echo hello world' }; + const options = { command: 'echo hello world', volumes: ['/tmp'] }; const worker = new Worker(message, options); const results = { code: 3, duration: 12345 }; @@ -98,7 +100,7 @@ test('[worker] success', async (assert) => { const logger = stubber(Logger).setup(); const message = sinon.createStubInstance(Message); - const options = { command: 'echo hello world' }; + const options = { command: 'echo hello world', volumes: ['/tmp'] }; const worker = new Worker(message, options); const results = { code: 0, duration: 12345 }; @@ -126,7 +128,7 @@ test('[worker] waitFor, exit 0', async (assert) => { logger.type = 'worker'; logger.message = message; - const options = { command: 'echo ${Message}' }; + const options = { command: 'echo ${Message}', volumes: ['/tmp', '/var/tmp'] }; const worker = new Worker(message, options); const env = process.env; @@ -134,6 +136,8 @@ test('[worker] waitFor, exit 0', async (assert) => { sinon.spy(child_process, 'spawn'); sinon.spy(process.stdout, 'write'); + sinon.spy(process.stderr, 'write'); + sinon.spy(fsExtra, 'emptyDir'); try { await worker.waitFor(); @@ -143,6 +147,7 @@ test('[worker] waitFor, exit 0', async (assert) => { const data = process.stdout.write.args[0][0]; process.stdout.write.restore(); + process.stderr.write.restore(); assert.equal( data, @@ -164,6 +169,76 @@ test('[worker] waitFor, exit 0', async (assert) => { assert.ok(results.duration, 'logged worker success duration'); assert.equal(message.complete.callCount, 1, 'called message.complete()'); + assert.equal(fsExtra.emptyDir.callCount, 2, 'called fsExtra.emptyDir() twice'); + assert.ok(fsExtra.emptyDir.calledWith('/tmp'), 'called fsExtra.emptyDir() on /tmp'); + assert.ok(fsExtra.emptyDir.calledWith('/var/tmp'), 'called fsExtra.emptyDir() on /tmp'); + + fsExtra.emptyDir.restore(); + Date.prototype.toGMTString.restore(); + child_process.spawn.restore(); + process.env = env; + logger.teardown(); + assert.end(); +}); + +test('[worker] waitFor, write to /tmp, exit 0', async (assert) => { + sinon + .stub(Date.prototype, 'toGMTString') + .returns('Fri, 09 Feb 2018 21:57:55 GMT'); + + const logger = stubber(Logger).setup(); + const message = sinon.createStubInstance(Message); + message.id = '895ab607-3767-4bbb-bd45-2a3b341cbc46'; + message.env = { Message: 'banana' }; + + logger.log.restore(); + logger.stream.restore(); + logger.type = 'worker'; + logger.message = message; + + const options = { command: 'echo ${Message} > /tmp/banana.txt && cat /tmp/banana.txt', volumes: ['/tmp'] }; + const worker = new Worker(message, options); + + const env = process.env; + process.env = { fake: 'environment' }; + + sinon.spy(child_process, 'spawn'); + sinon.spy(process.stdout, 'write'); + sinon.spy(process.stderr, 'write'); + + try { + await worker.waitFor(); + } catch (err) { + assert.ifError(err, 'failed'); + } + + const data = process.stdout.write.args[0][0]; + process.stdout.write.restore(); + process.stderr.write.restore(); + + assert.equal( + data, + '[Fri, 09 Feb 2018 21:57:55 GMT] [worker] [895ab607-3767-4bbb-bd45-2a3b341cbc46] banana\n', + 'prefixed worker output' + ); + + assert.ok( + child_process.spawn.calledWith('echo ${Message} > /tmp/banana.txt && cat /tmp/banana.txt', { + env: Object.assign(message.env, process.env), + shell: true, + stdio: [process.stdin, 'pipe', 'pipe'] + }), + 'spawned child process properly' + ); + + const results = logger.workerSuccess.args[0][0]; + assert.equal(results.code, 0, 'logged worker success exit code'); + assert.ok(results.duration, 'logged worker success duration'); + assert.equal(message.complete.callCount, 1, 'called message.complete()'); + + const tmpFiles = fs.readdirSync('/tmp'); + assert.equal(tmpFiles.length, 0, 'all files in /tmp are cleared out after the worker complets'); + Date.prototype.toGMTString.restore(); child_process.spawn.restore(); process.env = env; @@ -177,7 +252,7 @@ test('[worker] waitFor, exit 1', async (assert) => { logger.stream.restore(); const message = sinon.createStubInstance(Message); message.env = { Message: 'banana' }; - const options = { command: 'exit 1' }; + const options = { command: 'exit 1', volumes: ['/tmp'] }; const worker = new Worker(message, options); sinon.spy(child_process, 'spawn'); @@ -204,7 +279,7 @@ test('[worker] waitFor, exit 3', async (assert) => { logger.stream.restore(); const message = sinon.createStubInstance(Message); message.env = { Message: 'banana' }; - const options = { command: 'exit 3' }; + const options = { command: 'exit 3', volumes: ['/tmp'] }; const worker = new Worker(message, options); sinon.spy(child_process, 'spawn'); @@ -231,7 +306,7 @@ test('[worker] waitFor, exit 4', async (assert) => { logger.stream.restore(); const message = sinon.createStubInstance(Message); message.env = { Message: 'banana' }; - const options = { command: 'exit 4' }; + const options = { command: 'exit 4', volumes: ['/tmp'] }; const worker = new Worker(message, options); sinon.spy(child_process, 'spawn'); @@ -258,7 +333,7 @@ test('[worker] waitFor, child_process.spawn failure', async (assert) => { logger.stream.restore(); const message = sinon.createStubInstance(Message); message.env = { Message: 'banana' }; - const options = { command: 'echo ${Message}' }; + const options = { command: 'echo ${Message}', volumes: ['/tmp'] }; const worker = new Worker(message, options); const err = new Error('foo');