-
Notifications
You must be signed in to change notification settings - Fork 13
/
job-harness.ts
104 lines (88 loc) · 3.13 KB
/
job-harness.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import { sortBy, map, groupBy, times } from 'lodash';
import {
TestContext,
JobConfig,
Assignment,
makeJobSchema,
makeExecutionContext,
validateJobConfig,
WorkerExecutionContext,
SlicerExecutionContext,
ExecutionConfig,
SliceRequest,
Slice,
isWorkerExecutionContext,
isSlicerExecutionContext,
DataEntity,
RunSliceResult,
} from '@terascope/job-components';
export default class JobHarness {
protected context: WorkerExecutionContext|SlicerExecutionContext;
constructor(job: JobConfig, options: JobHarnessOptions) {
const context = new TestContext(`job-harness:${job.name}`);
context.assignment = options.assignment || Assignment.Worker;
const jobSchema = makeJobSchema(context);
const executionConfig = validateJobConfig(jobSchema, job) as ExecutionConfig;
this.context = makeExecutionContext({
context,
executionConfig
});
}
async initialize() {
await this.context.initialize();
}
async createSlices({ fullResponse = false } = {}): Promise<SliceRequest[]|Slice[]> {
if (!isSlicerExecutionContext(this.context)) {
throwInvalidContext('createSlices', this.context);
return [];
}
const { slicer } = this.context;
const slicers = slicer.slicers();
await slicer.handle();
const slices = slicer.getSlices(10000);
const sliceRequests = [];
const slicesBySlicers = Object.values(groupBy(slices, 'slicer_id'));
for (const perSlicer of slicesBySlicers) {
const sorted = sortBy(perSlicer, 'slicer_order');
if (fullResponse) {
sliceRequests.push(...sorted);
} else {
const mapped = map(sorted, 'request');
sliceRequests.push(...mapped);
}
}
const remaining = slicers - sliceRequests.length;
if (remaining > 0) {
const nulls = times(remaining, () => null);
return sliceRequests.concat(nulls);
}
return sliceRequests;
}
async runSlice(slice: Slice, { fullResponse = false } = {}): Promise<DataEntity[]|RunSliceResult> {
if (!isWorkerExecutionContext(this.context)) {
throwInvalidContext('runSlice', this.context);
return [];
}
const result = await this.context.runSlice(slice);
if (fullResponse) {
return result || {
results: [],
};
}
return result.results || [];
}
async cleanup() {
return this.context.shutdown();
}
}
function throwInvalidContext(method: string, context: WorkerExecutionContext|SlicerExecutionContext): never {
const { assignment } = context.context;
const expected = assignment === Assignment.Worker ? Assignment.ExecutionController : Assignment.Worker;
const error = new Error(`${method} can only be run with assignment of "${expected}"`);
Error.captureStackTrace(error, throwInvalidContext);
throw error;
}
export interface JobHarnessOptions {
assignment?: Assignment;
assetDir: string;
}