-
Notifications
You must be signed in to change notification settings - Fork 25
/
stream_reader.ts
161 lines (148 loc) · 4.91 KB
/
stream_reader.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
// Copyright Pravega Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import {
EventDataData,
EventDataOffset,
EventDataToString,
SliceNext,
StreamReaderGetSegementSlice,
StreamReaderReaderOffline,
StreamReaderReleaseSegment,
StreamReaderToString,
} from './native_esm.js';
/**
* This represents an event that was read from a Pravega Segment and the offset at which the event
* was read from.
*/
export interface Event {
/**
* Return the event data as `ArrayBuffer`.
*
* @returns ArrayBuffer that contains raw data.
*/
data: () => ArrayBuffer;
/**
* Return the event offset in the segment.
*
* @returns offset
*/
offset: () => number;
toString: () => string;
}
/**
* Returns a wrapped Event that helps users to get raw data and/or offset.
*
* Note: A Event cannot be created directly without using the Slice.
*/
const Event = (event): Event => {
const data = (): ArrayBuffer => EventDataData.call(event);
const offset = (): number => EventDataOffset.call(event);
const toString = (): string => EventDataToString.call(event);
return { data, offset, toString };
};
/**
* This represents a segment slice which can be used to read events from a Pravega segment as an iterator.
*
* Individual events can be read from the Slice using the following snippets.
* ```javascript
* const seg_slice: Slice = await stream_reader.get_segment_slice();
* for (const event of seg_slice) {
* const raw_value: ArrayBuffer = event.data();
* // do your things
* }
* ```
*/
export interface Slice extends IterableIterator<Event> {
/**
* The internal rust object used to release segment. Should **not** be used in user code!
*/
readonly internal_slice;
}
/**
* Returns a wrapped Slice that helps users to iterate.
*
* Note: A Slice cannot be created directly without using the StreamReader.
*/
const Slice = (slice): Slice => {
return {
internal_slice: slice,
next: (): IteratorResult<Event> => {
let event: Event;
try {
event = Event(SliceNext(slice));
} catch (e) {
return {
done: true,
value: null,
};
}
return {
done: false,
value: event,
};
},
[Symbol.iterator]: function () {
return this;
},
};
};
/**
* A reader for a stream.
*
* Note: A StreamReader cannot be created directly without using the StreamReaderGroup.
*/
export interface StreamReader {
/**
* This function returns a SegmentSlice from the SegmentStore(s).
* Individual events can be read from the Slice with the following snippets.
* ```javascript
* const seg_slice: Slice = await stream_reader.get_segment_slice();
* for (const event of seg_slice) {
* const raw_value: ArrayBuffer = event.data();
* // do your things
* }
* ```
*
* Invoking this function multiple times ensure multiple SegmentSlices corresponding
* to different Segments of the stream are received. In-case we receive data for an already
* acquired SegmentSlice, this method waits until SegmentSlice is completely consumed before
* returning the data.
*
* @returns Slice in Promise.
*/
get_segment_slice: () => Promise<Slice>;
/**
* Mark the reader as offline.
*
* This will ensure the segments owned by this reader is distributed to other readers in the ReaderGroup.
*/
reader_offline: () => void;
/**
* Release a partially read segment slice back to event reader.
*/
release_segment: (slice: Slice) => void;
toString: () => string;
}
/**
* Returns a wrapped StreamReader that helps users to call Rust code.
*
* Note: A StreamReader cannot be created directly without using the StreamReaderGroup.
*/
export const StreamReader = (stream_reader): StreamReader => {
const get_segment_slice = async (): Promise<Slice> => Slice(await StreamReaderGetSegementSlice.call(stream_reader));
const reader_offline = (): void => StreamReaderReaderOffline.call(stream_reader);
const release_segment = (slice: Slice): void =>
StreamReaderReleaseSegment.call(stream_reader, slice.internal_slice);
const toString = (): string => StreamReaderToString.call(stream_reader);
return { get_segment_slice, reader_offline, release_segment, toString };
};