Skip to content

Commit 731ccad

Browse files
committed
improve performance by using splitting steps to different transforms
1 parent 48f4398 commit 731ccad

File tree

2 files changed

+57
-1
lines changed

2 files changed

+57
-1
lines changed

packages/pg-protocol/src/index.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,24 @@
11
import { BackendMessage, DatabaseError } from './messages'
22
import { serialize } from './serializer'
33
import { Parser, MessageCallback } from './parser'
4+
import {pipeline} from 'stream';
45

56
export function parse(stream: NodeJS.ReadableStream, callback: MessageCallback): Promise<void> {
67
const parser = new Parser()
7-
stream.on('data', (buffer: Buffer) => parser.parse(buffer, callback))
8+
stream.on('data', (buffer: Buffer) => parser.parse(buffer, callback));
9+
10+
pipeline(
11+
stream,
12+
parser.splitMessagesTransform.bind(parser),
13+
parser.convertToMessageTransform.bind(parser),
14+
async function* (stream) {
15+
for await(const message of stream) {
16+
callback(message)
17+
}
18+
},
19+
(err) => err ? reject(err) : resolve()
20+
);
21+
822
return new Promise((resolve) => stream.on('end', () => resolve()))
923
}
1024

packages/pg-protocol/src/parser.ts

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,48 @@ export class Parser {
119119
}
120120
}
121121

122+
async* splitMessagesTransform(stream: NodeJS.ReadableStream) {
123+
for await(const buffer of stream) {
124+
this.mergeBuffer(buffer as Buffer)
125+
const bufferFullLength = this.bufferOffset + this.bufferLength
126+
let offset = this.bufferOffset
127+
while (offset + HEADER_LENGTH <= bufferFullLength) {
128+
// code is 1 byte long - it identifies the message type
129+
const code = this.buffer[offset]
130+
// length is 1 Uint32BE - it is the length of the message EXCLUDING the code
131+
const length = this.buffer.readUInt32BE(offset + CODE_LENGTH)
132+
const fullMessageLength = CODE_LENGTH + length
133+
if (fullMessageLength + offset <= bufferFullLength) {
134+
yield {
135+
offset: offset + HEADER_LENGTH,
136+
code,
137+
length,
138+
bytes: this.buffer
139+
};
140+
offset += fullMessageLength
141+
} else {
142+
break
143+
}
144+
}
145+
if (offset === bufferFullLength) {
146+
// No more use for the buffer
147+
this.buffer = emptyBuffer
148+
this.bufferLength = 0
149+
this.bufferOffset = 0
150+
} else {
151+
// Adjust the cursors of remainingBuffer
152+
this.bufferLength = bufferFullLength - offset
153+
this.bufferOffset = offset
154+
}
155+
}
156+
}
157+
158+
async* convertToMessageTransform(stream: NodeJS.ReadableStream) {
159+
for await(const rawMessage of stream) {
160+
yield this.handlePacket((rawMessage as any).offset, (rawMessage as any).code, (rawMessage as any).length, (rawMessage as any).bytes);
161+
}
162+
}
163+
122164
private mergeBuffer(buffer: Buffer): void {
123165
if (this.bufferLength > 0) {
124166
const newLength = this.bufferLength + buffer.byteLength

0 commit comments

Comments
 (0)