/
operations.ts
132 lines (113 loc) · 3.28 KB
/
operations.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import { Schema } from 'convict';
import { Logger } from '@terascope/utils';
import {
ValidatedJobConfig, OpConfig, ExecutionConfig, LegacyExecutionContext
} from './jobs';
import { Context, SysConfig } from './context';
export type CrossValidationFn = (job: ValidatedJobConfig, sysconfig: SysConfig) => void;
export type SelfValidationFn = (config: OpConfig) => void;
export type SliceQueueLengthFn = (executionContext: LegacyExecutionContext) => number | string;
export interface LegacyOperation {
crossValidation?: CrossValidationFn;
selfValidation?: SelfValidationFn;
schema(context?: Context): Schema<any>;
}
export interface LegacyReader extends LegacyOperation {
slicerQueueLength?: SliceQueueLengthFn;
schema(context?: Context): Schema<any>;
newReader(
context: Context,
opConfig: OpConfig,
exectutionConfig: ExecutionConfig
): Promise<ReaderFn<any>>;
newSlicer(
context: Context,
executionContext: LegacyExecutionContext,
recoveryData: SlicerRecoveryData[],
logger: Logger
): Promise<SlicerFns>;
}
export type ReaderFn<T> = (sliceRequest: SliceRequest, logger: Logger) => Promise<T> | T;
export interface LegacyProcessor extends LegacyOperation {
schema(context?: Context): Schema<any>;
newProcessor(
context: Context,
opConfig: OpConfig,
executionConfig: ExecutionConfig
): Promise<ProcessorFn<any>>;
}
export type ProcessorFn<T> = (
data: T,
logger: Logger,
sliceRequest: SliceRequest
) => Promise<T> | T;
/**
* The metadata created by the Slicer and ran through a job pipeline
*
* See [[Slice]]
*/
export interface SliceRequest {
/** A reserved key for sending work to a particular worker */
request_worker?: string;
/** The slice request can contain any metdata */
[prop: string]: any;
}
/**
* The metadata given to Slicer after succefully recovering the execution
*/
export interface SlicerRecoveryData {
slicer_id: number;
lastSlice?: SliceRequest;
}
/**
* A trackable set of work to be preformed by a "Worker"
*/
export interface Slice {
/**
* A unique identifier for the slice
*/
slice_id: string;
/**
* A reference to the slicer that created the slice.
*/
slicer_id: number;
/**
* A reference to the slicer
*/
slicer_order: number;
request: SliceRequest;
_created: string;
}
export interface SliceAnalyticsData {
time: number[];
size: number[];
memory: number[];
}
export const sliceAnalyticsMetrics: readonly (keyof SliceAnalyticsData)[] = ['memory', 'size', 'time'];
export type SlicerResult = Slice | SliceRequest | SliceRequest[] | null;
export interface SliceResult {
slice: Slice;
analytics: SliceAnalyticsData;
error?: string;
}
export interface SlicerFn {
(): Promise<SlicerResult>;
}
export type SlicerFns = SlicerFn[];
export type OpAPIFn = (...args: any[]) => any;
export type OpAPIInstance = {
[method: string]: OpAPIFn | any;
};
export type OpAPI = OpAPIFn | OpAPIInstance;
interface ExecutionWorkerStats {
connected: number;
available: number;
}
interface ExecutionSliceStats {
processed: number;
failed: number;
}
export interface ExecutionStats {
workers: ExecutionWorkerStats;
slices: ExecutionSliceStats;
}