forked from alexcasalboni/aws-lambda-power-tuning
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexecutor.js
195 lines (165 loc) · 6.34 KB
/
executor.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
'use strict';
const AWS = require('aws-sdk');
// the executor needs a longer socket timeout to invoke long-running functions
// 15 minutes is fine here because the Executor will timeout anyway
AWS.config.update({httpOptions: {timeout: 15 * 60 * 1000}});
const utils = require('./utils');
const minRAM = parseInt(process.env.minRAM, 10);
/**
* Execute the given function N times in series or in parallel.
* Then compute execution statistics (average cost and duration).
*/
module.exports.handler = async(event, context) => {
// read input from event
let {
lambdaARN,
value,
num,
enableParallel,
payload,
dryRun,
preProcessorARN,
postProcessorARN,
discardTopBottom,
sleepBetweenRunsMs,
} = await extractDataFromInput(event);
validateInput(lambdaARN, value, num); // may throw
// force only 1 execution if dryRun
if (dryRun) {
console.log('[Dry-run] forcing num=1');
num = 1;
}
const lambdaAlias = 'RAM' + value;
let results;
// fetch architecture from $LATEST
const {architecture, isPending} = await utils.getLambdaConfig(lambdaARN, lambdaAlias);
console.log(`Detected architecture type: ${architecture}, isPending: ${isPending}`);
// pre-generate an array of N payloads
const payloads = utils.generatePayloads(num, payload);
const runInput = {
num: num,
lambdaARN: lambdaARN,
lambdaAlias: lambdaAlias,
payloads: payloads,
preARN: preProcessorARN,
postARN: postProcessorARN,
sleepBetweenRunsMs: sleepBetweenRunsMs,
};
// wait if the function/alias state is Pending
if (isPending) {
await utils.waitForAliasActive(lambdaARN, lambdaAlias);
console.log('Alias active');
}
if (enableParallel) {
results = await runInParallel(runInput);
} else {
results = await runInSeries(runInput);
}
// get base cost for Lambda
const baseCost = utils.lambdaBaseCost(utils.regionFromARN(lambdaARN), architecture);
return computeStatistics(baseCost, results, value, discardTopBottom);
};
const validateInput = (lambdaARN, value, num) => {
if (!lambdaARN) {
throw new Error('Missing or empty lambdaARN');
}
if (!value || isNaN(value)) {
throw new Error('Invalid value: ' + value);
}
if (!num || isNaN(num)) {
throw new Error('Invalid num: ' + num);
}
};
const extractPayloadValue = async(input) => {
if (input.payloadS3) {
return await utils.fetchPayloadFromS3(input.payloadS3); // might throw if access denied or 404
} else if (input.payload) {
return input.payload;
}
return null;
};
const extractDiscardTopBottomValue = (event) => {
// extract discardTopBottom used to trim values from average duration
let discardTopBottom = event.discardTopBottom;
if (typeof discardTopBottom === 'undefined') {
discardTopBottom = 0.2;
}
// discardTopBottom must be between 0 and 0.4
return Math.min(Math.max(discardTopBottom, 0.0), 0.4);
};
const extractSleepTime = (event) => {
let sleepBetweenRunsMs = event.sleepBetweenRunsMs;
if (isNaN(sleepBetweenRunsMs)) {
sleepBetweenRunsMs = 0;
} else {
sleepBetweenRunsMs = parseInt(sleepBetweenRunsMs, 10);
}
return sleepBetweenRunsMs;
};
const extractDataFromInput = async(event) => {
const input = event.input; // original state machine input
const payload = await extractPayloadValue(input);
const discardTopBottom = extractDiscardTopBottomValue(input);
const sleepBetweenRunsMs = extractSleepTime(input);
return {
value: parseInt(event.value, 10),
lambdaARN: input.lambdaARN,
num: parseInt(input.num, 10),
enableParallel: !!input.parallelInvocation,
payload: payload,
dryRun: input.dryRun === true,
preProcessorARN: input.preProcessorARN,
postProcessorARN: input.postProcessorARN,
discardTopBottom: discardTopBottom,
sleepBetweenRunsMs: sleepBetweenRunsMs,
};
};
const runInParallel = async({num, lambdaARN, lambdaAlias, payloads, preARN, postARN}) => {
const results = [];
// run all invocations in parallel ...
const invocations = utils.range(num).map(async(_, i) => {
const {invocationResults, actualPayload} = await utils.invokeLambdaWithProcessors(lambdaARN, lambdaAlias, payloads[i], preARN, postARN);
// invocation errors return 200 and contain FunctionError and Payload
if (invocationResults.FunctionError) {
throw new Error(`Invocation error (running in parallel): ${invocationResults.Payload} with payload ${JSON.stringify(actualPayload)}`);
}
results.push(invocationResults);
});
// ... and wait for results
await Promise.all(invocations);
return results;
};
const runInSeries = async({num, lambdaARN, lambdaAlias, payloads, preARN, postARN, sleepBetweenRunsMs}) => {
const results = [];
for (let i = 0; i < num; i++) {
// run invocations in series
const {invocationResults, actualPayload} = await utils.invokeLambdaWithProcessors(lambdaARN, lambdaAlias, payloads[i], preARN, postARN);
// invocation errors return 200 and contain FunctionError and Payload
if (invocationResults.FunctionError) {
throw new Error(`Invocation error (running in series): ${invocationResults.Payload} with payload ${JSON.stringify(actualPayload)}`);
}
if (sleepBetweenRunsMs > 0) {
await utils.sleep(sleepBetweenRunsMs);
}
results.push(invocationResults);
}
return results;
};
const computeStatistics = (baseCost, results, value, discardTopBottom) => {
// use results (which include logs) to compute average duration ...
const durations = utils.parseLogAndExtractDurations(results);
const averageDuration = utils.computeAverageDuration(durations, discardTopBottom);
console.log('Average duration: ', averageDuration);
// ... and overall statistics
const averagePrice = utils.computePrice(baseCost, minRAM, value, averageDuration);
// .. and total cost (exact $)
const totalCost = utils.computeTotalCost(baseCost, minRAM, value, durations);
const stats = {
averagePrice,
averageDuration,
totalCost,
value,
};
console.log('Stats: ', stats);
return stats;
};