-
Notifications
You must be signed in to change notification settings - Fork 13
/
job.js
90 lines (76 loc) · 2.4 KB
/
job.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
'use strict';
const { get, isEmpty } = require('@terascope/utils');
const { JobValidator } = require('@terascope/job-components');
const { terasliceOpPath } = require('../../config');
const { makeJobStore, makeExStore, makeStateStore } = require('../../storage');
async function validateJob(context, jobSpec) {
const jobValidator = new JobValidator(context, {
terasliceOpPath,
assetPath: get(context, 'sysconfig.teraslice.assets_directory'),
});
try {
return await jobValidator.validateConfig(jobSpec);
} catch (error) {
throw new Error(`validating job: ${error}`);
}
}
async function initializeTestExecution({
context,
config,
stores = {},
isRecovery,
cleanupType,
createRecovery = true,
recoverySlices = [],
lastStatus = 'failed'
}) {
const jobStore = stores.jobStore || (await makeJobStore(context));
const exStore = stores.exStore || (await makeExStore(context));
const stateStore = stores.stateStore || (await makeStateStore(context));
const validJob = await validateJob(context, config, { skipRegister: true });
const jobSpec = await jobStore.create(config);
const job = Object.assign({}, jobSpec, validJob, {
job_id: jobSpec.job_id
});
const slicerHostname = job.slicer_hostname;
const slicerPort = job.slicer_port;
let ex;
if (isRecovery) {
ex = await exStore.create(job, lastStatus);
if (recoverySlices.length) {
await Promise.all(recoverySlices.map(({ slice, state }) => stateStore.createState(
ex.ex_id,
slice,
state,
slice.error
)));
await stateStore.refresh();
}
if (createRecovery) {
ex = await exStore.createRecoveredExecution(ex, cleanupType);
}
} else {
ex = await exStore.create(job);
}
if (slicerHostname && slicerPort) {
ex = await exStore.updatePartial(ex.ex_id, (existing) => Object.assign(existing, {
slicer_hostname: slicerHostname,
slicer_port: slicerPort,
}));
}
if (isEmpty(stores)) {
await Promise.all([
exStore.shutdown(true),
jobStore.shutdown(true),
stateStore.shutdown(true)
]);
}
return {
job,
ex,
};
}
module.exports = {
initializeTestExecution,
validateJob,
};