/
legacy-reader-shim.ts
115 lines (99 loc) · 3.39 KB
/
legacy-reader-shim.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import {
DataEntity,
DataInput,
Logger,
times,
isFunction
} from '@terascope/utils';
import SlicerClass from '../slicer';
import operationAPIShim, { APIs } from './operation-api-shim';
import legacySliceEventsShim from './legacy-slice-events-shim';
import {
SchemaConstructor
} from '../interfaces';
import {
SliceRequest,
ReaderFn,
SlicerFns,
LegacyReader,
WorkerContext,
SlicerRecoveryData,
Context,
ValidatedJobConfig,
SysConfig,
ExecutionConfig,
LegacyExecutionContext,
} from '../../interfaces';
import ConvictSchema from '../convict-schema';
// This file for backwards compatibility and functionality will be limited
// but it should allow you to write processors using the new way today
type SchemaType = SchemaConstructor;
export default function legacyReaderShim(
Slicer: unknown,
Fetcher: unknown,
Schema: SchemaType,
apis?: APIs
): LegacyReader {
let schema: ConvictSchema<any, any>|undefined;
return {
// @ts-expect-error
Slicer,
Fetcher,
Schema,
schema(context: Context): any {
if (Schema.type() !== 'convict') {
throw new Error('Backwards compatibility only works for "convict" schemas');
}
// @ts-expect-error
schema = new Schema(context);
// @ts-expect-error
return schema.schema;
},
crossValidation(job: ValidatedJobConfig, sysconfig: SysConfig): void {
if (Schema.type() !== 'convict') {
throw new Error('Backwards compatibility only works for "convict" schemas');
}
// @ts-expect-error
const _schema = schema || new Schema({ sysconfig });
if (isFunction(_schema.validateJob)) {
_schema.validateJob(job);
}
},
async newReader(
context: Context, opConfig: Record<string, any>, executionConfig: ExecutionConfig
): Promise<ReaderFn<DataInput[]>> {
const fetcher = new (Fetcher as any)(
context as WorkerContext, opConfig, executionConfig
);
await fetcher.initialize();
legacySliceEventsShim(fetcher);
operationAPIShim(context, apis);
return async (sliceRequest: SliceRequest): Promise<DataInput[]> => {
const output = await fetcher.handle(sliceRequest);
return DataEntity.makeArray(output);
};
},
async newSlicer(
context: Context,
executionContext: LegacyExecutionContext,
recoveryData: SlicerRecoveryData[],
logger: Logger
): Promise<SlicerFns> {
const executionConfig = executionContext.config;
const opConfig = executionConfig.operations[0];
const slicer = new (Slicer as any)(context, opConfig, executionConfig);
slicer.logger = logger;
await slicer.initialize(recoveryData);
slicer.events.once('worker:shutdown', async () => {
await slicer.shutdown();
});
if (slicer instanceof SlicerClass) {
return [
(): any => slicer.slice()
];
}
const slicers = times(executionConfig.slicers, () => slicer.newSlicer());
return Promise.all(slicers);
}
};
}