-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
stream.ts
277 lines (246 loc) Β· 8.19 KB
/
stream.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
import { GenerationChunk } from "@langchain/core/outputs";
export function complexValue(value: unknown): unknown {
if (value === null || typeof value === "undefined") {
// I dunno what to put here. An error, probably
return undefined;
} else if (typeof value === "object") {
if (Array.isArray(value)) {
return {
list_val: value.map((avalue) => complexValue(avalue)),
};
} else {
const ret: Record<string, unknown> = {};
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const v: Record<string, any> = value;
Object.keys(v).forEach((key) => {
ret[key] = complexValue(v[key]);
});
return { struct_val: ret };
}
} else if (typeof value === "number") {
if (Number.isInteger(value)) {
return { int_val: value };
} else {
return { float_val: value };
}
} else {
return {
string_val: [value],
};
}
}
export function simpleValue(val: unknown): unknown {
if (val && typeof val === "object" && !Array.isArray(val)) {
// eslint-disable-next-line no-prototype-builtins
if (val.hasOwnProperty("stringVal")) {
return (val as { stringVal: string[] }).stringVal[0];
// eslint-disable-next-line no-prototype-builtins
} else if (val.hasOwnProperty("boolVal")) {
return (val as { boolVal: boolean[] }).boolVal[0];
// eslint-disable-next-line no-prototype-builtins
} else if (val.hasOwnProperty("listVal")) {
const { listVal } = val as { listVal: unknown[] };
return listVal.map((aval) => simpleValue(aval));
// eslint-disable-next-line no-prototype-builtins
} else if (val.hasOwnProperty("structVal")) {
const ret: Record<string, unknown> = {};
const struct = (val as { structVal: Record<string, unknown> }).structVal;
Object.keys(struct).forEach((key) => {
ret[key] = simpleValue(struct[key]);
});
return ret;
} else {
const ret: Record<string, unknown> = {};
const struct = val as Record<string, unknown>;
Object.keys(struct).forEach((key) => {
ret[key] = simpleValue(struct[key]);
});
return ret;
}
} else if (Array.isArray(val)) {
return val.map((aval) => simpleValue(aval));
} else {
return val;
}
}
export class JsonStream {
_buffer = "";
_bufferOpen = true;
_firstRun = true;
/**
* Add data to the buffer. This may cause chunks to be generated, if available.
* @param data
*/
appendBuffer(data: string): void {
this._buffer += data;
// Our first time, skip to the opening of the array
if (this._firstRun) {
this._skipTo("[");
this._firstRun = false;
}
this._parseBuffer();
}
/**
* Indicate there is no more data that will be added to the text buffer.
* This should be called when all the data has been read and added to indicate
* that we should process everything remaining in the buffer.
*/
closeBuffer(): void {
this._bufferOpen = false;
this._parseBuffer();
}
/**
* Skip characters in the buffer till we get to the start of an object.
* Then attempt to read a full object.
* If we do read a full object, turn it into a chunk and send it to the chunk handler.
* Repeat this for as much as we can.
*/
_parseBuffer(): void {
let obj = null;
do {
this._skipTo("{");
obj = this._getFullObject();
if (obj !== null) {
const chunk = this._simplifyObject(obj);
this._handleChunk(chunk);
}
} while (obj !== null);
if (!this._bufferOpen) {
// No more data will be added, and we have parsed everything we could,
// so everything else is garbage.
this._handleChunk(null);
this._buffer = "";
}
}
/**
* If the string is present, move the start of the buffer to the first occurrence
* of that string. This is useful for skipping over elements or parts that we're not
* really interested in parsing. (ie - the opening characters, comma separators, etc.)
* @param start The string to start the buffer with
*/
_skipTo(start: string): void {
const index = this._buffer.indexOf(start);
if (index > 0) {
this._buffer = this._buffer.slice(index);
}
}
/**
* Given what is in the buffer, parse a single object out of it.
* If a complete object isn't available, return null.
* Assumes that we are at the start of an object to parse.
*/
_getFullObject(): object | null {
let ret: object | null = null;
// Loop while we don't have something to return AND we have something in the buffer
let index = 0;
while (ret === null && this._buffer.length > index) {
// Advance to the next close bracket after our current index
index = this._buffer.indexOf("}", index + 1);
// If we don't find one, exit with null
if (index === -1) {
return null;
}
// If we have one, try to turn it into an object to return
try {
const objStr = this._buffer.substring(0, index + 1);
ret = JSON.parse(objStr);
// We only get here if it parsed it ok
// If we did turn it into an object, remove it from the buffer
this._buffer = this._buffer.slice(index + 1);
} catch (xx) {
// It didn't parse it correctly, so we swallow the exception and continue
}
}
return ret;
}
_simplifyObject(obj: unknown): object {
return obj as object;
}
// Set up a potential Promise that the handler can resolve.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
_chunkResolution: (chunk: any) => void;
// If there is no Promise (it is null), the handler must add it to the queue
// eslint-disable-next-line @typescript-eslint/no-explicit-any
_chunkPending: Promise<any> | null = null;
// A queue that will collect chunks while there is no Promise
// eslint-disable-next-line @typescript-eslint/no-explicit-any
_chunkQueue: any[] = [];
/**
* Register that we have another chunk available for consumption.
* If we are waiting for a chunk, resolve the promise waiting for it immediately.
* If not, then add it to the queue.
* @param chunk
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
_handleChunk(chunk: any): void {
if (this._chunkPending) {
this._chunkResolution(chunk);
this._chunkPending = null;
} else {
this._chunkQueue.push(chunk);
}
}
/**
* Get the next chunk that is coming from the stream.
* This chunk may be null, usually indicating the last chunk in the stream.
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
async nextChunk(): Promise<any> {
if (this._chunkQueue.length > 0) {
// If there is data in the queue, return the next queue chunk
return this._chunkQueue.shift() as GenerationChunk;
} else {
// Otherwise, set up a promise that handleChunk will cause to be resolved
this._chunkPending = new Promise((resolve) => {
this._chunkResolution = resolve;
});
return this._chunkPending;
}
}
/**
* Is the stream done?
* A stream is only done if all of the following are true:
* - There is no more data to be added to the text buffer
* - There is no more data in the text buffer
* - There are no chunks that are waiting to be consumed
*/
get streamDone(): boolean {
return (
!this._bufferOpen &&
this._buffer.length === 0 &&
this._chunkQueue.length === 0 &&
this._chunkPending === null
);
}
}
export class ComplexJsonStream extends JsonStream {
_simplifyObject(obj: unknown): object {
return simpleValue(obj) as object;
}
}
export class ReadableJsonStream extends JsonStream {
decoder: TextDecoder;
constructor(body: ReadableStream | null) {
super();
this.decoder = new TextDecoder();
if (body) {
void this.run(body);
} else {
console.error("Unexpected empty body while streaming");
}
}
async run(body: ReadableStream) {
const reader = body.getReader();
let isDone = false;
while (!isDone) {
const { value, done } = await reader.read();
if (!done) {
const svalue = this.decoder.decode(value);
this.appendBuffer(svalue);
} else {
isDone = done;
this.closeBuffer();
}
}
}
}