-
Notifications
You must be signed in to change notification settings - Fork 13
/
job-validator.ts
96 lines (75 loc) · 3.28 KB
/
job-validator.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import convict from 'convict';
import { cloneDeep } from '@terascope/utils';
import {
Context, OpConfig, JobConfig, ValidatedJobConfig
} from './interfaces';
import { validateJobConfig } from './config-validators';
import { jobSchema } from './job-schemas';
import { OperationLoader } from './operation-loader';
import { registerApis } from './register-apis';
import { OperationModule } from './operations';
export class JobValidator {
public schema: convict.Schema<any>;
private readonly context: Context;
private readonly opLoader: OperationLoader;
constructor(context: Context, options: { terasliceOpPath?: string } = {}) {
this.context = context;
this.opLoader = new OperationLoader({
terasliceOpPath: options.terasliceOpPath,
assetPath: context.sysconfig.teraslice.assets_directory,
});
this.schema = jobSchema(context);
}
/** Validate the job configuration, including the Operations and APIs configuration */
validateConfig(jobSpec: JobConfig): ValidatedJobConfig {
// top level job validation occurs, but not operations
const jobConfig = validateJobConfig(this.schema, cloneDeep(jobSpec));
const assetIds = jobConfig.assets || [];
const apis = {};
type ValidateJobFn = (job: ValidatedJobConfig) => void;
let validateJobFns: ValidateJobFn[] = [];
const handleModule = (opConfig: OpConfig, op: OperationModule) => {
const { Schema, API } = op;
if (API != null) {
apis[opConfig._op] = API;
}
const schema = new Schema(this.context);
validateJobFns.push((job) => {
if (!schema.validateJob) return;
schema.validateJob(job);
});
return schema.validate(opConfig);
};
jobConfig.operations = jobConfig.operations.map((opConfig, index) => {
if (index === 0) {
return handleModule(opConfig, this.opLoader.loadReader(opConfig._op, assetIds));
}
return handleModule(opConfig, this.opLoader.loadProcessor(opConfig._op, assetIds));
});
validateJobFns.forEach((fn) => { fn(jobConfig); });
validateJobFns = [];
jobConfig.apis = jobConfig.apis.map((apiConfig) => {
const { Schema } = this.opLoader.loadAPI(apiConfig._name, assetIds);
const schema = new Schema(this.context, 'api');
validateJobFns.push((job) => {
if (!schema.validateJob) return;
schema.validateJob(job);
});
return schema.validate(apiConfig);
});
validateJobFns.forEach((fn) => { fn(jobConfig); });
registerApis(this.context, jobConfig);
Object.keys(apis).forEach((name) => {
const api = apis[name];
this.context.apis.executionContext.addToRegistry(name, api);
});
return jobConfig;
}
hasSchema(obj: Record<string, any>, name: string): void {
if (!obj.schema || typeof obj.schema !== 'function') {
throw new Error(`${name} needs to have a method named "schema"`);
} else if (typeof obj.schema() !== 'object') {
throw new Error(`${name} schema needs to return an object`);
}
}
}