-
Notifications
You must be signed in to change notification settings - Fork 25
/
stream_manager.ts
368 lines (344 loc) · 13.1 KB
/
stream_manager.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
// Copyright Pravega Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import {
StreamManagerNew,
StreamManagerCreateScope,
StreamManagerDeleteScope,
StreamManagerListScopes,
StreamRetentionPolicyNone,
StreamRetentionPolicyBySize,
StreamRetentionPolicyByTime,
StreamScalingPolicyFixed,
StreamScalingPolicyByDataRate,
StreamScalingPolicyByEventRate,
StreamManagerCreateStreamWithPolicy,
StreamManagerUpdateStreamWithPolicy,
StreamManagerGetStreamTags,
StreamManagerSealStream,
StreamManagerDeleteStream,
StreamManagerListStreams,
StreamRetentionStreamCutHead,
StreamRetentionStreamCutTail,
StreamManagerCreateReaderGroup,
StreamManagerDeleteReaderGroup,
StreamManagerCreateWriter,
StreamManagerCreateTxnWriter,
StreamManagerToString,
} from './native_esm.js';
import { StreamReaderGroup } from './stream_reader_group.js';
import { StreamWriter } from './stream_writer.js';
import { StreamTxnWriter } from './stream_writer_transactional.js';
/**
* Pravega allows users to store data in Tier 2 as long as there is storage capacity available.
* But sometimes, users may not be interested to keep all the historical data related to a Stream.
* Instead, there are use-cases in which it may be useful to retain just a fraction of a Stream's data.
* For this reason, Streams can be configured with `StreamRetentionPolicy`.
*/
export interface StreamRetentionPolicy {}
/**
* Contains factory methods to create different StreamRetentionPolicy.
*/
export const StreamRetentionPolicy = {
/**
* Every event is retained in the Stream. No deletion.
*/
none: (): StreamRetentionPolicy => StreamRetentionPolicyNone(),
/**
* Set retention based on how many data in the Stream before it is deleted.
*/
by_size: (size_in_bytes: number): StreamRetentionPolicy => StreamRetentionPolicyBySize(size_in_bytes),
/**
* Set retention based on how long the data is kept in the Stream before it is deleted.
*/
by_time: (time_in_millis: number): StreamRetentionPolicy => StreamRetentionPolicyByTime(time_in_millis),
};
/**
* A policy that specifies how the number of segments in a stream should scale over time.
*/
export interface StreamScalingPolicy {}
/**
* Contains factory methods to create different StreamScalingPolicy.
*/
export const StreamScalingPolicy = {
/**
* No scaling, there will only ever be initial_segmentsat any given time.
*/
fixed_scaling_policy: (initial_segments: number): StreamScalingPolicy => StreamScalingPolicyFixed(initial_segments),
/**
* Scale based on the rate in bytes specified in target_rate_kbytes_per_sec.
*/
auto_scaling_policy_by_data_rate: (
target_rate_kbytes_per_sec: number,
scale_factor: number,
initial_segments: number
): StreamScalingPolicy => StreamScalingPolicyByDataRate(target_rate_kbytes_per_sec, scale_factor, initial_segments),
/**
* Scale based on the rate in events specified in target_events_per_sec.
*/
auto_scaling_policy_by_event_rate: (
target_events_per_sec: number,
scale_factor: number,
initial_segments: number
): StreamScalingPolicy => StreamScalingPolicyByEventRate(target_events_per_sec, scale_factor, initial_segments),
};
/**
* Represent a consistent position in the stream.
* Only `head` and `tail` are supported now.
*/
export interface StreamCut {}
/**
* Contains factory methods to create different StreamCut.
*/
export const StreamCut = {
head: (): StreamCut => StreamRetentionStreamCutHead(),
tail: (): StreamCut => StreamRetentionStreamCutTail(),
};
/**
* Used to create, delete, and manage Streams, ReaderGroups, and Writers.
*/
export interface StreamManager {
/**
* Create a Pravega scope.
*
* @param scope_name The scope name.
* @returns The scope creation result. `false` indicates that the scope exists before creation.
*/
create_scope: (scope_name: string) => boolean;
/**
* Delete a Pravega scope.
*
* @param scope_name The scope name.
* @returns The scope deletion result. `false` indicates that the scope does not exist before deletion.
*/
delete_scope: (scope_name: string) => boolean;
/**
* List all scopes in Pravega.
*
* @returns Scope names in Promise.
*/
list_scopes: () => Promise<string[]>;
/**
* Create a stream with or without specific policy in Pravega.
*
* @param scope_name The scope name.
* @param stream_name The stream name.
* @param retention_policy The retention policy. Default will be StreamRetentionPolicy.none()
* @param scaling_policy The scaling policy. Default will be StreamScalingPolicy.fixed_scaling_policy(1)
* @param tags The stream tags.
* @returns The stream creation result. `false` indicates that the stream exists before creation.
*/
create_stream: (
scope_name: string,
stream_name: string,
retention_policy?: StreamRetentionPolicy,
scaling_policy?: StreamScalingPolicy,
tags?: string[]
) => boolean;
/**
* Update a Pravega stream with new policies and tags.
*
* @param scope_name The scope name.
* @param stream_name The stream name.
* @param retention_policy The retention policy. Default will be StreamRetentionPolicy.none()
* @param scaling_policy The scaling policy. Default will be StreamScalingPolicy.fixed_scaling_policy(1)
* @param tags The stream tags.
* @returns The stream update result.
*/
update_stream: (
scope_name: string,
stream_name: string,
retention_policy?: StreamRetentionPolicy,
scaling_policy?: StreamScalingPolicy,
tags?: string[]
) => boolean;
/**
* Get tags of a Pravega stream.
*
* @param scope_name The scope name.
* @param stream_name The stream name.
* @returns The stream tags.
*/
get_stream_tags: (scope_name: string, stream_name: string) => string[];
/**
* Seal a Pravega stream. SEAL BEFORE DELETE!
*
* @param scope_name The scope name.
* @param stream_name The stream name.
* @returns The seal result.
*/
seal_stream: (scope_name: string, stream_name: string) => boolean;
/**
* Deleta a Pravega stream. SEAL BEFORE DELETE!
*
* @param scope_name The scope name.
* @param stream_name The stream name.
* @returns The deletion result.
*/
delete_stream: (scope_name: string, stream_name: string) => boolean;
/**
* List all streams in the specified Pravega scope.
*
* @param scope_name The scope name.
* @returns Stream names in Promise.
*/
list_streams: (scope_name: string) => Promise<string[]>;
/**
* Create a ReaderGroup for a given Stream.
*
* @param stream_cut The offset you would like to read from.
* @param reader_group_name The reader group name.
* @param scope_name The scope name.
* @param streams All stream names in this scope.
* @returns A StreamReaderGroup.
* @todo An optional element cannot follow a rest element. `...args: [...stream: string[], stream_cut?: StreamCut]`
*/
create_reader_group: (
stream_cut: StreamCut,
reader_group_name: string,
scope_name: string,
...streams: string[]
) => StreamReaderGroup;
/**
* Delete a ReaderGroup for a given Stream.
*
* @param scope_name The scope name.
* @param reader_group_name The reader group name.
*/
delete_reader_group: (scope_name: string, reader_group_name: string) => void;
/**
* Create a Writer for a given Stream.
*
* @param scope_name The scope name.
* @param stream_name The stream name.
* @returns A StreamWriter.
*/
create_writer: (scope_name: string, stream_name: string) => StreamWriter;
/**
* Create a transaction Writer for a given Stream.
*
* @param scope_name The scope name.
* @param stream_name The stream name.
* @param writer_id ID (no larger than unsigned int 128) of the writer.
* @returns A StreamTxnWriter.
*/
create_transaction_writer: (scope_name: string, stream_name: string, writer_id: BigInt) => StreamTxnWriter;
/**
* A detailed view of the StreamManager.
*
* @returns String representation of the StreamManager.
*/
toString: () => string;
}
/**
* Create a StreamManager by providing a controller uri.
*
* ```typescript
* const stream_manager = StreamManger('tcp://127.0.0.1:9090', false, false, true);
* ```
*
* Optionally enable tls support using tls:// scheme.
*
* ```typescript
* const stream_manager = StreamManger('tls://127.0.0.1:9090', false, false, true);
* ```
*
* @param controller_uri The Pravega controller RPC uri, start with tcp or tls.
* @param auth_enabled Whether authentication is enabled or not.
* @param tls_enabled Whether TLS is enabled or not.
* @param disable_cert_verification Disable certificate verification or not.
* @returns
*/
export const StreamManager = (
controller_uri: string,
auth_enabled: boolean = false,
tls_enabled: boolean = false,
disable_cert_verification: boolean = true
): StreamManager => {
// The internal rust StreamManager object. Should not be accessed directly.
const stream_manager = StreamManagerNew(controller_uri, auth_enabled, tls_enabled, disable_cert_verification);
const create_scope = (scope_name: string): boolean => StreamManagerCreateScope.call(stream_manager, scope_name);
const delete_scope = (scope_name: string): boolean => StreamManagerDeleteScope.call(stream_manager, scope_name);
const list_scopes = async (): Promise<string[]> => StreamManagerListScopes.call(stream_manager);
const create_stream = (
scope_name: string,
stream_name: string,
retention_policy: StreamRetentionPolicy = StreamRetentionPolicy.none(),
scaling_policy: StreamScalingPolicy = StreamScalingPolicy.fixed_scaling_policy(1),
tags: string[] = []
): boolean =>
StreamManagerCreateStreamWithPolicy.call(
stream_manager,
scope_name,
stream_name,
retention_policy,
scaling_policy,
tags
);
const update_stream = (
scope_name: string,
stream_name: string,
retention_policy: StreamRetentionPolicy = StreamRetentionPolicy.none(),
scaling_policy: StreamScalingPolicy = StreamScalingPolicy.fixed_scaling_policy(1),
tags: string[] = []
): boolean =>
StreamManagerUpdateStreamWithPolicy.call(
stream_manager,
scope_name,
stream_name,
retention_policy,
scaling_policy,
tags
);
const get_stream_tags = (scope_name: string, stream_name: string): string[] =>
StreamManagerGetStreamTags.call(stream_manager, scope_name, stream_name);
const seal_stream = (scope_name: string, stream_name: string): boolean =>
StreamManagerSealStream.call(stream_manager, scope_name, stream_name);
const delete_stream = (scope_name: string, stream_name: string): boolean =>
StreamManagerDeleteStream.call(stream_manager, scope_name, stream_name);
const list_streams = async (scope_name: string): Promise<string[]> =>
StreamManagerListStreams.call(stream_manager, scope_name);
const create_reader_group = (
stream_cut: StreamCut,
reader_group_name: string,
scope_name: string,
...streams: string[]
): StreamReaderGroup =>
StreamReaderGroup(
StreamManagerCreateReaderGroup.call(stream_manager, reader_group_name, scope_name, streams, stream_cut)
);
const delete_reader_group = (scope_name: string, reader_group_name: string) =>
StreamManagerDeleteReaderGroup.call(stream_manager, scope_name, reader_group_name);
const create_writer = (scope_name: string, stream_name: string): StreamWriter =>
StreamWriter(StreamManagerCreateWriter.call(stream_manager, scope_name, stream_name));
const create_transaction_writer = (scope_name: string, stream_name: string, writer_id: BigInt): StreamTxnWriter =>
StreamTxnWriter(
StreamManagerCreateTxnWriter.call(stream_manager, scope_name, stream_name, writer_id.toString())
);
const toString = (): string => StreamManagerToString.call(stream_manager);
return {
create_scope,
delete_scope,
list_scopes,
create_stream,
update_stream,
get_stream_tags,
seal_stream,
delete_stream,
list_streams,
create_reader_group,
delete_reader_group,
create_writer,
create_transaction_writer,
toString,
};
};