Skip to content
Permalink
Browse files Browse the repository at this point in the history
PacketAssembler add chunksize verification & refactor
  • Loading branch information
erossignon committed Jun 2, 2022
1 parent e43bc5b commit dbcb5d5
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 83 deletions.
3 changes: 2 additions & 1 deletion packages/node-opcua-packet-assembler/package.json
Expand Up @@ -12,7 +12,8 @@
"author": "Etienne Rossignon",
"license": "MIT",
"dependencies": {
"node-opcua-assert": "2.66.0"
"node-opcua-assert": "2.66.0",
"node-opcua-debug": "2.66.0"
},
"devDependencies": {
"should": "^13.2.3"
Expand Down
59 changes: 46 additions & 13 deletions packages/node-opcua-packet-assembler/source/packet_assembler.ts
@@ -1,7 +1,9 @@
import { EventEmitter } from "events";
import { assert } from "node-opcua-assert";
import { make_warningLog } from "node-opcua-debug";

const doDebug = false;
const warningLog = make_warningLog("PacketAssembler");

/***
* @class PacketAssembler
Expand All @@ -20,20 +22,35 @@ export interface PacketInfo {
extra: string;
}

export type ReadMessageFuncType = (data: Buffer) => PacketInfo;
export type ReadChunkFuncType = (data: Buffer) => PacketInfo;

export interface PacketAssemblerOptions {
readMessageFunc: ReadMessageFuncType;

// the minimum number of bytes that need to be received before the readMessageFunc can be called
readChunkFunc: ReadChunkFuncType;
// the minimum number of bytes that need to be received before the readChunkFunc can be called
minimumSizeInBytes: number;
maxChunkSize: number;
}

export interface PacketAssembler {
on(eventName: "startChunk", eventHandler: (packetInfo: PacketInfo, partial: Buffer) => void): this;
on(eventName: "chunk", eventHandler: (chunk: Buffer) => void): this;
on(eventName: "error", eventHandler: (err: Error) => void): this;
}
/**
* this class is used to assemble partial data from the tranport layer
* into message chunks
*/
export class PacketAssembler extends EventEmitter {
public static defaultMaxChunkCount = 777;
public static defaultMaxMessageSize = 1024 * 64 - 7;

private readonly _stack: Buffer[];
private expectedLength: number;
private currentLength: number;
private readonly readMessageFunc: ReadMessageFuncType;

private maxChunkSize: number;

private readonly readChunkFunc: ReadChunkFuncType;
private readonly minimumSizeInBytes: number;
private packetInfo?: PacketInfo;

Expand All @@ -42,9 +59,15 @@ export class PacketAssembler extends EventEmitter {
this._stack = [];
this.expectedLength = 0;
this.currentLength = 0;
this.readMessageFunc = options.readMessageFunc;
this.readChunkFunc = options.readChunkFunc;
this.minimumSizeInBytes = options.minimumSizeInBytes || 8;
assert(typeof this.readMessageFunc === "function", "packet assembler requires a readMessageFunc");
assert(typeof this.readChunkFunc === "function", "packet assembler requires a readChunkFunc");

// istanbul ignore next
assert(options.maxChunkSize === undefined || options.maxChunkSize !== 0);

this.maxChunkSize = options.maxChunkSize || PacketAssembler.defaultMaxMessageSize;
assert(this.maxChunkSize >= this.minimumSizeInBytes);
}

public feed(data: Buffer) {
Expand All @@ -60,12 +83,22 @@ export class PacketAssembler extends EventEmitter {

// we can extract the expected length here
this.packetInfo = this._readPacketInfo(data);
this.expectedLength = this.packetInfo.length;

assert(this.currentLength === 0);
assert(this.expectedLength > 0);
if (this.packetInfo.length < this.minimumSizeInBytes) {
this.emit("error", new Error("maximum message size exceeded"));
return;
}

if (this.packetInfo.length > this.maxChunkSize) {
const message = `maximum chunk size exceeded (maxChunkSize=${this.maxChunkSize} current chunk size = ${this.packetInfo.length})`;
warningLog(message);
this.emit("error", new Error(message));
return;
}
// we can now emit an event to signal the start of a new packet
this.emit("newMessage", this.packetInfo, data);
this.emit("startChunk", this.packetInfo, data);
this.expectedLength = this.packetInfo.length;
}

if (this.expectedLength === 0 || this.currentLength + data.length < this.expectedLength) {
Expand All @@ -87,7 +120,7 @@ export class PacketAssembler extends EventEmitter {
this.currentLength = 0;
this.expectedLength = 0;

this.emit("message", messageChunk);
this.emit("chunk", messageChunk);
} else {
// there is more data in this chunk than expected...
// the chunk need to be split
Expand All @@ -104,10 +137,10 @@ export class PacketAssembler extends EventEmitter {
}

private _readPacketInfo(data: Buffer) {
return this.readMessageFunc(data);
return this.readChunkFunc(data);
}

private _buildData(data: Buffer) {
private _buildData(data: Buffer): Buffer {
if (data && this._stack.length === 0) {
return data;
}
Expand Down
201 changes: 132 additions & 69 deletions packages/node-opcua-packet-assembler/test/test_packet_assembler.js
@@ -1,8 +1,8 @@
const PacketAssembler = require("..").PacketAssembler;
const should = require("should");
const sinon = require("sinon");
const { PacketAssembler } = require("..");

function makeMessage(msgType, length) {

function makeChunk(msgType, length) {
const total_length = length + 4 + 1;

total_length.should.be.greaterThan(0);
Expand All @@ -19,107 +19,170 @@ function makeMessage(msgType, length) {
return buf;
}

function readerHeader(data) {
function readChunkHeader(data) {
const msgType = String.fromCharCode(data.readUInt8(0));
const length = data.readUInt32LE(1);
return {length: length, extra: msgType};
return { length: length, extra: msgType };
}


describe("PacketAssembler", function () {

it("should assemble a single packet", function (done) {


const packet_assembler = new PacketAssembler({readMessageFunc: readerHeader, minimumSizeInBytes: 5})
.on("message", function (message) {

const info = readerHeader(message);
info.length.should.equal(message.length);

it("should assemble a single packet", (done) => {
const packetAssembler = new PacketAssembler({ readChunkFunc: readChunkHeader, minimumSizeInBytes: 5 });
packetAssembler.on(
"chunk",
(chunk) => {
const info = readChunkHeader(chunk);
info.length.should.equal(chunk.length);

done();
});

packet_assembler.feed(makeMessage("A", 200));
}
);

packetAssembler.feed(makeChunk("A", 200));
});


it("should assemble a message sent over several packets", function (done) {

const packet_assembler = new PacketAssembler({readMessageFunc: readerHeader, minimumSizeInBytes: 5})
.on("message", function (message) {

const info = readerHeader(message);
info.length.should.equal(message.length);
it("should assemble a chunk sent over several packets", (done) => {
const packetAssembler = new PacketAssembler({ readChunkFunc: readChunkHeader, minimumSizeInBytes: 5 });
packetAssembler.on(
"chunk",
(chunk) => {
const info = readChunkHeader(chunk);
info.length.should.equal(chunk.length);
info.length.should.equal(2000 + 5);
done();
});

const message1 = makeMessage("A", 2000);
}
);

const packet1 = message1.slice(0, 100);
const packet2 = message1.slice(100, 200);
const packet3 = message1.slice(200);
const chunk1 = makeChunk("A", 2000);

packet_assembler.feed(packet1);
packet_assembler.feed(packet2);
packet_assembler.feed(packet3);
const packet1 = chunk1.slice(0, 100);
const packet2 = chunk1.slice(100, 200);
const packet3 = chunk1.slice(200);

packetAssembler.feed(packet1);
packetAssembler.feed(packet2);
packetAssembler.feed(packet3);
});

it("should assemble a message sent one byte at a time", function (done) {

const packet_assembler = new PacketAssembler({readMessageFunc: readerHeader, minimumSizeInBytes: 5})
.on("message", function (message) {

const info = readerHeader(message);
info.length.should.equal(message.length);

done();
});

const message = makeMessage("A", 200);

for (let i = 0; i < message.length; i++) {
const single_byte_chunk = message.slice(i, i + 1);
packet_assembler.feed(single_byte_chunk);
it("should assemble a chunk sent one byte at a time", () => {
const packetAssembler = new PacketAssembler({
readChunkFunc: readChunkHeader,
minimumSizeInBytes: 5,
maxChunkCount: 10000
});
packetAssembler.on(
"chunk",
(chunk) => {
const info = readChunkHeader(chunk);
info.length.should.equal(chunk.length);
}
);
const onChunkSpy = sinon.spy();
const errorSpy = sinon.spy();
packetAssembler.on("chunk", onChunkSpy);
packetAssembler.on("error", errorSpy);


const chunk = makeChunk("A", 200);

for (let i = 0; i < chunk.length; i++) {
const single_byte_chunk = chunk.slice(i, i + 1);
packetAssembler.feed(single_byte_chunk);
}

});
errorSpy.callCount.should.equal(0);
onChunkSpy.callCount.should.equal(1);

it("should deal with packets containing data from 2 different messages", function (done) {
});

it("should deal with packets containing data from 2 different chunks", () => {
let counter = 0;
const packet_assembler = new PacketAssembler({readMessageFunc: readerHeader, minimumSizeInBytes: 5})
.on("message", function (message) {

const info = readerHeader(message);
info.length.should.equal(message.length);

const packetAssembler = new PacketAssembler({
readChunkFunc: readChunkHeader,
minimumSizeInBytes: 5
});

packetAssembler.on(
"chunk",
(chunk) => {
const info = readChunkHeader(chunk);
info.length.should.equal(chunk.length);
info.length.should.equal(200 + 5);
counter += 1;
if (counter === 1) {
info.extra.should.equal("A");
}
if (counter === 2) {
info.extra.should.equal("B");
done();
}
});
}
);

const onChunkSpy = sinon.spy();
const errorSpy = sinon.spy();
packetAssembler.on("chunk", onChunkSpy);
packetAssembler.on("error", errorSpy);

const message1 = makeMessage("A", 200);
const message2 = makeMessage("B", 200);

const packet1 = message1.slice(0, 150);
const packet2_a = message1.slice(150);
const packet2_b = message2.slice(0, 150);
const chunk1 = makeChunk("A", 200);
const chunk2 = makeChunk("B", 200);

const packet1 = chunk1.slice(0, 150);
const packet2_a = chunk1.slice(150);
const packet2_b = chunk2.slice(0, 150);
const packet2 = Buffer.concat([packet2_a, packet2_b]);
const packet3 = message2.slice(150);
const packet3 = chunk2.slice(150);

packetAssembler.feed(packet1);
packetAssembler.feed(packet2);
packetAssembler.feed(packet3);

packet_assembler.feed(packet1);
packet_assembler.feed(packet2);
packet_assembler.feed(packet3);
errorSpy.callCount.should.equal(0);
onChunkSpy.callCount.should.equal(2);

});

it("limits: max chunk size exceeded", () => {
const packetAssembler = new PacketAssembler({
readChunkFunc: readChunkHeader,
minimumSizeInBytes: 5,
maxChunkSize: 100
});

const onChunkSpy = sinon.spy();
const errorSpy = sinon.spy();
packetAssembler.on("chunk", onChunkSpy);
packetAssembler.on("error", errorSpy);

let counter = 0;
packetAssembler.on("chunk", (chunk) => {
const info = readChunkHeader(chunk);
info.length.should.equal(chunk.length);
info.length.should.equal(200 + 5);
counter += 1;
if (counter === 1) {
info.extra.should.equal("A");
}
if (counter === 2) {
info.extra.should.equal("B");
}
});
let errorCount = 0;
packetAssembler.on("error", (err) => {
err.message.should.match(/maximum chunk size exceeded/);
errorCount += 1;
});
const chunk1 = makeChunk("A", 200);
const chunk2 = makeChunk("B", 200);
packetAssembler.feed(chunk1);
packetAssembler.feed(chunk2);

errorCount.should.eql(2);

onChunkSpy.callCount.should.eql(0);
errorSpy.callCount.should.eql(2);
});

});

0 comments on commit dbcb5d5

Please sign in to comment.