/
whatwg_stream.ts
127 lines (110 loc) · 2.72 KB
/
whatwg_stream.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import { parseLine, parseLineStrict } from './parser';
import { LtsvRecord } from './types';
export type LtsvToJsonStreamOptions = {
objectMode?: boolean;
strict?: boolean;
};
type LtsvToJsonStreamInstance = {
buffer: string;
objectMode: boolean;
parse: typeof parseLine | typeof parseLineStrict;
};
/**
* transform and push to stream
*
* @param text
* @param isFlush
* @param controller
*/
function push(
this: LtsvToJsonStreamInstance,
text: string,
isFlush: boolean,
controller: TransformStreamDefaultController<string | LtsvRecord>
): void {
let next = 0;
let last = 0;
let error: Error | null = null;
// eslint-disable-next-line no-constant-condition
while (true) {
let index = text.indexOf('\n', next);
if (index === -1) {
if (isFlush && next < text.length) {
// NOTE: subtract 1 from text.length,
// NOTE: because add 1 to index when slice.
index = text.length - 1;
} else {
break;
}
}
// NOTE: include `\n`.
// NOTE: foo:foo\tbar:bar\nfoo:foo\tbar:bar\n
// NOTE: -----------------|
const line = text.slice(next, index + 1);
let record: LtsvRecord = {};
try {
record = this.parse(line);
} catch (e) {
error = e;
}
if (error) {
break;
}
controller.enqueue(this.objectMode ? record : JSON.stringify(record));
// NOTE: save next start index.
// NOTE: foo:foo\tbar:bar\nfoo:foo\tbar:bar\n
// NOTE: ------------------|
last = next = index + 1;
}
this.buffer = text.slice(last);
if (error) {
controller.error(error);
}
}
/**
* LTSV to JSON transform stream
*
* @param options
*/
export function LtsvToJsonStream(
options: LtsvToJsonStreamOptions = {
objectMode: false,
strict: false
}
): Transformer<string, string | LtsvRecord> {
const { objectMode = false, strict = false } = options;
const instance: LtsvToJsonStreamInstance = {
buffer: '',
objectMode,
parse: strict ? parseLineStrict : parseLine
};
return {
/**
* transform implementation.
*
* @param chunk
* @param controller
*/
transform(
chunk: string,
controller: TransformStreamDefaultController<string | LtsvRecord>
): void {
push.call(instance, instance.buffer + chunk, false, controller);
},
/**
* flush implementation.
*
* @param controller
*/
flush(
controller: TransformStreamDefaultController<string | LtsvRecord>
): void {
push.call(instance, instance.buffer, true, controller);
}
};
}
export function createLtsvToJsonStream(
options?: LtsvToJsonStreamOptions
): TransformStream<string, string | LtsvRecord> {
return new TransformStream(LtsvToJsonStream(options));
}