|
1 | 1 | import { BackendMessage, DatabaseError } from './messages' |
2 | 2 | import { serialize } from './serializer' |
3 | 3 | import { Parser, MessageCallback } from './parser' |
4 | | -import {pipeline} from 'stream'; |
| 4 | +import { pipeline } from 'stream' |
5 | 5 |
|
6 | 6 | export function parse(stream: NodeJS.ReadableStream, callback: MessageCallback): Promise<void> { |
7 | 7 | const parser = new Parser() |
8 | | - stream.on('data', (buffer: Buffer) => parser.parse(buffer, callback)); |
9 | 8 |
|
10 | | - pipeline( |
11 | | - stream, |
12 | | - parser.splitMessagesTransform.bind(parser), |
13 | | - parser.convertToMessageTransform.bind(parser), |
14 | | - async function* (stream) { |
15 | | - for await(const message of stream) { |
16 | | - callback(message) |
17 | | - } |
18 | | - }, |
19 | | - (err) => err ? reject(err) : resolve() |
20 | | - ); |
21 | | - |
22 | | - return new Promise((resolve) => stream.on('end', () => resolve())) |
| 9 | + return new Promise((resolve, reject) => { |
| 10 | + pipeline( |
| 11 | + stream, |
| 12 | + parser.splitMessagesTransform.bind(parser), |
| 13 | + parser.convertToMessageTransform.bind(parser), |
| 14 | + async function* (stream) { |
| 15 | + for await(const message of stream) { |
| 16 | + callback(message) |
| 17 | + } |
| 18 | + }, |
| 19 | + (err) => err ? reject(err) : resolve() |
| 20 | + ); |
| 21 | + }) |
23 | 22 | } |
24 | 23 |
|
25 | 24 | export { serialize, DatabaseError } |
0 commit comments