-
Notifications
You must be signed in to change notification settings - Fork 30
/
template.js
430 lines (397 loc) · 15.6 KB
/
template.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
'use strict';
const cf = require('@mapbox/cloudfriend');
/**
* Builds Watchbot resources for you to include in a CloudFormation template
*
* @param {Object} options - configuration parameters
* @param {String|ref} options.cluster - the ARN for the ECS cluster that will host Watchbot's containers.
* @param {String} options.service - the name of your service. This is usually the name of your Github repository. It **must** match the name of the ECR repository where your images are stored.
* @param {String|ref} options.serviceVersion - the version of you service to deploy. This should reference a specific image in ECR.
* @param {String} options.command - the shell command that should be executed in order to process a single message.
* @param {Array} [options.permissions=[]] - permissions that your worker will need in order to complete tasks.
* @param {Object} [options.env={}] - key-value pairs that will be provided to the worker containers as environment variables. Keys must be strings, and values can either be strings or references to other CloudFormation resources via `{"Ref": "..."}`.
* @param {String} [options.prefix='Watchbot'] - a prefix that will be applied to the logical names of all the resources Watchbot creates. If you're building a template that includes more than one Watchbot system, you'll need to specify this in order to differentiate the resources.
* @param {String} [options.family] - the name of the the task definition family that watchbot will create revisions of.
* @param {Number|ref} [options.workers=1] - the maximum number of worker containers that can be launched to process jobs concurrently. This parameter can be provided as either a number or a reference, i.e. `{"Ref": "..."}`.
* @param {String} [options.mounts=''] - if your worker containers need to mount files or folders from the host EC2 file system, specify those mounts with this parameter. A single persistent mount point can be specified as `{host location}:{container location}`, e.g. /root:/mnt/root. A single ephemeral mount point can be specified as `{container location}`, e.g. /mnt/tmp. Separate multiple mount strings with commas if you need to mount more than one location. You can also specify a mount object with `container` and `host` property arrays, in which the indeces correspond: `{ container: [{container location}], host: [{host location}] }`, e.g. { container: [/mnt/root, /mnt/tmp], host: [/root, ''] }. A blank host entry will create an ephemeral mount point at the corresponding container filepath.
* @param {Object} [options.reservation={}] - worker container resource reservations
* @param {Number|ref} [options.reservation.memory] - the number of MB of RAM to reserve as a hard limit. If your worker container tries to utilize more than this much RAM, it will be shut down. This parameter can be provided as either a number or a reference, i.e. `{"Ref": "..."}`.
* @param {Number|ref} [options.reservation.softMemory] - the number of MB of RAM to reserve as a soft limit. Your worker container will be able to utilize more than this much RAM if it happens to be available on the host. This parameter can be provided as either a number or a reference, i.e. `{"Ref": "..."}`.
* @param {Number|ref} [options.reservation.cpu] - the number of CPU units to reserve for your worker container. This will only impact the placement of your container on an EC2 with sufficient CPU capacity, but will not limit your container's utilization. This parameter can be provided as either a number or a reference, i.e. `{"Ref": "..."}`.
* @param {Boolean} [options.privileged=false] - give the container elevated privileges on the host container instance
* @param {Number|ref} [options.messageTimeout=600] - once Watchbot pulls a message from SQS and spawns a worker to process it, SQS will wait this many seconds for a response. If the worker has not yet finished processing the message for any reason, SQS will make the message visible again and Watchbot will spawn another worker to process it. This is helpful when containers or processing scripts crash, but make sure that it allows sufficient time for routine processing to occur. If set too low, you will end up processing jobs multiple times. This parameter can be provided as either a number or a reference, i.e. `{"Ref": "..."}`.
* @param {Number|ref} [options.messageRetention=1209600] - the number of seconds that a message will exist in SQS until it is deleted. The default value is the maximum time that SQS allows, 14 days. This parameter can be provided as either a number or a reference, i.e. `{"Ref": "..."}`.
*/
module.exports = (options = {}) => {
['service', 'serviceVersion', 'command', 'cluster'].forEach((required) => {
if (!options[required]) throw new Error(`options.${required} is required`);
});
options = Object.assign(
{
prefix: 'Watchbot',
reservation: {},
env: {},
messageTimeout: 600,
messageRetention: 1209600,
workers: 1,
mounts: '',
privileged: false,
family: options.service
},
options
);
const prefixed = (name) => `${options.prefix}${name}`;
const unpackEnv = (env, mountPoints) => {
return Object.keys(env).reduce(
(unpacked, key) => {
unpacked.push({ Name: key, Value: env[key] });
return unpacked;
},
[
{ Name: 'WorkTopic', Value: cf.ref(prefixed('Topic')) },
{ Name: 'QueueUrl', Value: cf.ref(prefixed('Queue')) },
{ Name: 'Volumes', Value: mountPoints.map((m) => m.ContainerPath).join(',') }
]
);
};
const mount = (mountInputs) => {
let formatted = { container: [], host: [] };
const mounts = {
mountPoints: [{ ContainerPath: '/tmp', SourceVolume: 'tmp' }],
volumes: [{ Name: 'tmp' }]
};
if (typeof mountInputs === 'object') formatted = mountInputs;
if (typeof mountInputs === 'string') {
mountInputs.split(',').forEach((mountStr) => {
if (!mountStr.length) return;
const persistent = /:/.test(mountStr);
formatted.container.push(
persistent ? mountStr.split(':')[1] : mountStr
);
formatted.host.push(persistent ? mountStr.split(':')[0] : '');
});
}
formatted.container.forEach((container, i) => {
const name = 'mnt-' + i;
const host = formatted.host[i] ? { SourcePath: formatted.host[i] } : {};
mounts.mountPoints.push({ ContainerPath: container, SourceVolume: name });
mounts.volumes.push({ Name: name, Host: host });
});
return mounts;
};
const mounts = mount(options.mounts);
const Resources = {};
Resources[prefixed('DeadLetterQueue')] = {
Type: 'AWS::SQS::Queue',
Description: 'List of messages that failed to process 14 times',
Properties: {
QueueName: cf.join([cf.stackName, '-', prefixed('DeadLetterQueue')]),
MessageRetentionPeriod: 1209600
}
};
Resources[prefixed('Queue')] = {
Type: 'AWS::SQS::Queue',
Properties: {
VisibilityTimeout: options.messageTimeout,
QueueName: cf.join([cf.stackName, '-', prefixed('Queue')]),
MessageRetentionPeriod: options.messageRetention,
RedrivePolicy: {
deadLetterTargetArn: cf.getAtt(prefixed('DeadLetterQueue'), 'Arn'),
maxReceiveCount: 10
}
}
};
Resources[prefixed('Topic')] = {
Type: 'AWS::SNS::Topic',
Properties: {
Subscription: [
{
Endpoint: cf.getAtt(prefixed('Queue'), 'Arn'),
Protocol: 'sqs'
}
]
}
};
Resources[prefixed('QueuePolicy')] = {
Type: 'AWS::SQS::QueuePolicy',
Properties: {
Queues: [cf.ref(prefixed('Queue'))],
PolicyDocument: {
Version: '2008-10-17',
Id: prefixed('WatchbotQueue'),
Statement: [
{
Sid: 'SendSomeMessages',
Effect: 'Allow',
Principal: { AWS: '*' },
Action: ['sqs:SendMessage'],
Resource: cf.getAtt(prefixed('Queue'), 'Arn'),
Condition: {
ArnEquals: {
'aws:SourceArn': cf.ref(prefixed('Topic'))
}
}
}
]
}
}
};
Resources[prefixed('Logs')] = {
Type: 'AWS::Logs::LogGroup',
Properties: {
LogGroupName: cf.join('-', [
cf.stackName,
cf.region,
options.prefix.toLowerCase()
]),
RetentionInDays: 14
}
};
Resources[prefixed('Role')] = {
Type: 'AWS::IAM::Role',
Properties: {
AssumeRolePolicyDocument: {
Statement: [
{
Effect: 'Allow',
Principal: { Service: ['ecs-tasks.amazonaws.com'] },
Action: ['sts:AssumeRole']
}
]
},
Policies: [
{
PolicyName: cf.join([cf.stackName, '-default-worker']),
PolicyDocument: {
Statement: [
{
Effect: 'Allow',
Action: 'sns:Publish',
Resource: cf.ref(prefixed('Topic'))
},
{
Effect: 'Allow',
Action: [
'sqs:ReceiveMessage',
'sqs:DeleteMessage',
'sqs:ChangeMessageVisibility'
],
Resource: cf.getAtt(prefixed('Queue'), 'Arn')
},
{
Effect: 'Allow',
Action: [
'logs:CreateLogStream',
'logs:PutLogEvents',
'logs:FilterLogEvents'
],
Resource: cf.getAtt(prefixed('Logs'), 'Arn')
},
{
Effect: 'Allow',
Action: 'kms:Decrypt',
Resource: cf.importValue('cloudformation-kms-production')
}
]
}
}
]
}
};
if (options.permissions)
Resources[prefixed('Role')].Properties.Policies.push({
PolicyName: cf.join([cf.stackName, '-user-defined-worker']),
PolicyDocument: {
Statement: options.permissions
}
});
Resources[prefixed('Task')] = {
Type: 'AWS::ECS::TaskDefinition',
Properties: {
TaskRoleArn: cf.ref(prefixed('Role')),
Family: options.family,
ContainerDefinitions: [
{
Name: prefixed(`-${options.service}`).toLowerCase(),
Image: cf.join([
cf.accountId,
'.dkr.ecr.us-east-1.amazonaws.com/',
options.service,
':',
options.serviceVersion
]),
Cpu: options.reservation.cpu,
Privileged: options.privileged,
Environment: unpackEnv(options.env, mounts.mountPoints),
MountPoints: mounts.mountPoints,
Command: ['watchbot', 'listen', `${options.command}`],
Ulimits: [
{
Name: 'nofile',
SoftLimit: 10240,
HardLimit: 10240
}
],
LogConfiguration: {
LogDriver: 'awslogs',
Options: {
'awslogs-group': cf.ref(prefixed('Logs')),
'awslogs-region': cf.region,
'awslogs-stream-prefix': options.serviceVersion
}
}
}
],
Volumes: mounts.volumes
}
};
if (options.reservation.memory)
Resources[prefixed('Task')].Properties.ContainerDefinitions[0].Memory =
options.reservation.memory;
if (options.reservation.softMemory)
Resources[
prefixed('Task')
].Properties.ContainerDefinitions[0].MemoryReservation =
options.reservation.softMemory;
Resources[prefixed('Service')] = {
Type: 'AWS::ECS::Service',
Properties: {
Cluster: options.cluster,
DesiredCount: 0,
TaskDefinition: cf.ref(prefixed('Task'))
}
};
if (options.placementConstraints)
Resources[prefixed('Service')].Properties.PlacementConstraints =
options.placementConstraints;
if (options.placementStrategies)
Resources[prefixed('Service')].Properties.PlacementStrategies =
options.placementStrategies;
Resources[prefixed('ScalingRole')] = {
Type: 'AWS::IAM::Role',
Properties: {
AssumeRolePolicyDocument: {
Statement: [
{
Effect: 'Allow',
Principal: { Service: ['application-autoscaling.amazonaws.com'] },
Action: ['sts:AssumeRole']
}
]
},
Path: '/',
Policies: [
{
PolicyName: 'watchbot-autoscaling',
PolicyDocument: {
Statement: [
{
Effect: 'Allow',
Action: [
'application-autoscaling:*',
'cloudwatch:DescribeAlarms',
'cloudwatch:PutMetricAlarm',
'ecs:UpdateService',
'ecs:DescribeServices'
],
Resource: '*'
}
]
}
}
]
}
};
Resources[prefixed('ScalingTarget')] = {
Type: 'AWS::ApplicationAutoScaling::ScalableTarget',
Properties: {
ServiceNamespace: 'ecs',
ScalableDimension: 'ecs:service:DesiredCount',
ResourceId: cf.join([
'service/',
options.cluster,
'/',
cf.getAtt(prefixed('Service'), 'Name')
]),
MinCapacity: 0,
MaxCapacity: options.workers,
RoleARN: cf.getAtt(prefixed('ScalingRole'), 'Arn')
}
};
Resources[prefixed('ScaleUp')] = {
Type: 'AWS::ApplicationAutoScaling::ScalingPolicy',
Properties: {
ScalingTargetId: cf.ref(prefixed('ScalingTarget')),
PolicyName: cf.sub('${AWS::StackName}-scale-up'),
PolicyType: 'StepScaling',
StepScalingPolicyConfiguration: {
AdjustmentType: 'ChangeInCapacity',
Cooldown: 300,
MetricAggregationType: 'Average',
StepAdjustments: [
{
ScalingAdjustment: Math.ceil(options.workers / 10),
MetricIntervalLowerBound: 0.0
}
]
}
}
};
Resources[prefixed('ScaleUpTrigger')] = {
Type: 'AWS::CloudWatch::Alarm',
Properties: {
AlarmName: cf.sub('${AWS::StackName}-scale-up'),
AlarmDescription: 'Scale up workers due to visible messages in queue',
EvaluationPeriods: 1,
Statistic: 'Maximum',
Threshold: 0,
Period: 300,
ComparisonOperator: 'GreaterThanThreshold',
Namespace: 'AWS/SQS',
Dimensions: [
{ Name: 'QueueName', Value: cf.getAtt(prefixed('Queue'), 'QueueName') }
],
MetricName: 'ApproximateNumberOfMessagesVisible',
AlarmActions: [cf.ref(prefixed('ScaleUp'))]
}
};
Resources[prefixed('ScaleDown')] = {
Type: 'AWS::ApplicationAutoScaling::ScalingPolicy',
Properties: {
ScalingTargetId: cf.ref(prefixed('ScalingTarget')),
PolicyName: cf.sub('${AWS::StackName}-scale-down'),
PolicyType: 'StepScaling',
StepScalingPolicyConfiguration: {
AdjustmentType: 'PercentChangeInCapacity',
Cooldown: 300,
MetricAggregationType: 'Average',
StepAdjustments: [
{
ScalingAdjustment: -100,
MetricIntervalUpperBound: 0.0
}
]
}
}
};
Resources[prefixed('ScaleDownTrigger')] = {
Type: 'AWS::CloudWatch::Alarm',
Properties: {
AlarmName: cf.sub('${AWS::StackName}-scale-down'),
AlarmDescription:
'Scale down workers due to lack of in-flight messages in queue',
EvaluationPeriods: 1,
Statistic: 'Maximum',
Threshold: 1,
Period: 600,
ComparisonOperator: 'LessThanThreshold',
Namespace: 'AWS/SQS',
Dimensions: [
{ Name: 'QueueName', Value: cf.getAtt(prefixed('Queue'), 'QueueName') }
],
MetricName: 'ApproximateNumberOfMessagesNotVisible',
AlarmActions: [cf.ref(prefixed('ScaleDown'))]
}
};
return cf.merge({ Resources });
};