191 changes: 118 additions & 73 deletions packages/node-opcua-transport/source/tcp_transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,24 @@
*/
import { EventEmitter } from "events";
import { Socket } from "net";

import * as chalk from "chalk";

import { assert } from "node-opcua-assert";
import { createFastUninitializedBuffer } from "node-opcua-buffer-utils";
import * as debug from "node-opcua-debug";
import { BinaryStream } from "node-opcua-binary-stream";
import { make_debugLog, checkDebugFlag, make_errorLog, hexDump } from "node-opcua-debug";
import { ObjectRegistry } from "node-opcua-object-registry";
import { PacketAssembler } from "node-opcua-packet-assembler";
import { ErrorCallback, CallbackWithData } from "node-opcua-status-code";
import { PacketAssembler, PacketAssemblerErrorCode } from "node-opcua-packet-assembler";
import { ErrorCallback, CallbackWithData, StatusCode } from "node-opcua-status-code";

import { StatusCodes2 } from "./status_codes";
import { readRawMessageHeader } from "./message_builder_base";
import { writeTCPMessageHeader } from "./tools";
import { doTraceIncomingChunk } from "./utils";
import { TCPErrorMessage } from "./TCPErrorMessage";
import { packTcpMessage } from "./tools";

const debugLog = debug.make_debugLog(__filename);
const doDebug = debug.checkDebugFlag(__filename);
const errorLog = debug.make_errorLog(__filename);
const debugLog = make_debugLog(__filename);
const doDebug = checkDebugFlag(__filename);
const errorLog = make_errorLog(__filename);

export interface MockSocket {
invalid?: boolean;
Expand All @@ -28,7 +30,7 @@ export interface MockSocket {
}
let fakeSocket: MockSocket = {
invalid: true,

destroy() {
errorLog("MockSocket.destroy");
},
Expand All @@ -52,11 +54,11 @@ export function getFakeTransport(): any {
let counter = 0;

export interface TCP_transport {
on(eventName: "message", eventHandler: (message: Buffer) => void): this;
on(eventName: "chunk", eventHandler: (messageChunk: Buffer) => void): this;
on(eventName: "socket_closed", eventHandler: (err: Error | null) => void): this;
on(eventName: "close", eventHandler: (err: Error | null) => void): this;

once(eventName: "message", eventHandler: (message: Buffer) => void): this;
once(eventName: "chunk", eventHandler: (messageChunk: Buffer) => void): this;
once(eventName: "socket_closed", eventHandler: (err: Error | null) => void): this;
once(eventName: "close", eventHandler: (err: Error | null) => void): this;
}
Expand All @@ -69,6 +71,10 @@ export class TCP_transport extends EventEmitter {
* @default 0
*/
public protocolVersion: number;
public maxMessageSize: number;
public maxChunkCount: number;
public sendBufferSize: number;
public receiveBufferSize: number;

public bytesWritten: number;
public bytesRead: number;
Expand All @@ -89,7 +95,6 @@ export class TCP_transport extends EventEmitter {
private _onSocketEndedHasBeenCalled: boolean;
private _theCallback?: CallbackWithData;
private _on_error_during_one_time_message_receiver: any;
private _pendingBuffer?: any;
private packetAssembler?: PacketAssembler;
private _timeout: number;

Expand All @@ -103,10 +108,14 @@ export class TCP_transport extends EventEmitter {
this._timeout = 30000; // 30 seconds timeout
this._socket = null;
this.headerSize = 8;

this.maxMessageSize = 0;
this.maxChunkCount = 0;
this.receiveBufferSize = 0;
this.sendBufferSize = 0;
this.protocolVersion = 0;

this._disconnecting = false;
this._pendingBuffer = undefined;

this.bytesWritten = 0;
this.bytesRead = 0;
Expand All @@ -120,6 +129,26 @@ export class TCP_transport extends EventEmitter {
TCP_transport.registry.register(this);
}

public setLimits({
receiveBufferSize,
sendBufferSize,
maxMessageSize,
maxChunkCount
}: {
receiveBufferSize: number;
sendBufferSize: number;
maxMessageSize: number;
maxChunkCount: number;
}) {
this.receiveBufferSize = receiveBufferSize;
this.sendBufferSize = sendBufferSize;
this.maxMessageSize = maxMessageSize;
this.maxChunkCount = maxChunkCount;

// reinstall packetAssembler with correct limits
this._install_packetAssembler();
}

public get timeout(): number {
return this._timeout;
}
Expand All @@ -138,52 +167,19 @@ export class TCP_transport extends EventEmitter {
TCP_transport.registry.unregister(this);
}

/**
* ```createChunk``` is used to construct a pre-allocated chunk to store up to ```length``` bytes of data.
* The created chunk includes a prepended header for ```chunk_type``` of size ```self.headerSize```.
*
* @method createChunk
* @param msgType
* @param chunkType {String} chunk type. should be 'F' 'C' or 'A'
* @param length
* @return a buffer object with the required length representing the chunk.
*
* Note:
* - only one chunk can be created at a time.
* - a created chunk should be committed using the ```write``` method before an other one is created.
*/
public createChunk(msgType: string, chunkType: string, length: number): Buffer {
assert(msgType === "MSG");
assert(this._pendingBuffer === undefined, "createChunk has already been called ( use write first)");

const totalLength = length + this.headerSize;
const buffer = createFastUninitializedBuffer(totalLength);
writeTCPMessageHeader("MSG", chunkType, totalLength, buffer);
this._pendingBuffer = buffer;

return buffer;
}

/**
* write the message_chunk on the socket.
* @method write
* @param messageChunk
*
* Notes:
* - the message chunk must have been created by ```createChunk```.
* - once a message chunk has been written, it is possible to call ```createChunk``` again.
*
*/
public write(messageChunk: Buffer): void {
assert(
this._pendingBuffer === undefined || this._pendingBuffer === messageChunk,
" write should be used with buffer created by createChunk"
);
public write(messageChunk: Buffer, callback?: (err?: Error) => void | undefined): void {
const header = readRawMessageHeader(messageChunk);
assert(header.length === messageChunk.length);
assert(["F", "C", "A"].indexOf(header.messageHeader.isFinal) !== -1);
this._write_chunk(messageChunk);
this._pendingBuffer = undefined;
const c = header.messageHeader.isFinal;
assert(c === "F" || c === "C" || c === "A");
this._write_chunk(messageChunk, (err) => {
callback && callback(err);
});
}

public get isDisconnecting(): boolean {
Expand Down Expand Up @@ -228,11 +224,15 @@ export class TCP_transport extends EventEmitter {
return this._socket !== null && !this._socket.destroyed && !this._disconnecting;
}

protected _write_chunk(messageChunk: Buffer): void {
protected _write_chunk(messageChunk: Buffer, callback?: (err?: Error) => void | undefined): void {
if (this._socket !== null) {
this.bytesWritten += messageChunk.length;
this.chunkWrittenCount++;
this._socket.write(messageChunk);
this._socket.write(messageChunk, callback);
} else {
if (callback) {
callback();
}
}
}

Expand All @@ -247,6 +247,35 @@ export class TCP_transport extends EventEmitter {
this.emit("close", err || null);
}

protected _install_packetAssembler() {
if (this.packetAssembler) {
this.packetAssembler.removeAllListeners();
this.packetAssembler = undefined;
}

// install packet assembler ...
this.packetAssembler = new PacketAssembler({
readChunkFunc: readRawMessageHeader,
minimumSizeInBytes: this.headerSize,
maxChunkSize: this.receiveBufferSize //Math.max(this.receiveBufferSize, this.sendBufferSize)
});

this.packetAssembler.on("chunk", (chunk: Buffer) => this._on_message_chunk_received(chunk));

this.packetAssembler.on("error", (err, code) => {
let statusCode = StatusCodes2.BadTcpMessageTooLarge;
switch (code) {
case PacketAssemblerErrorCode.ChunkSizeExceeded:
statusCode = StatusCodes2.BadTcpMessageTooLarge;
break;
default:
statusCode = StatusCodes2.BadTcpInternalError;
}

this.sendErrorMessage(statusCode, err.message);
this.prematureTerminate(new Error("Packet Assembler : " + err.message), statusCode);
});
}
/**
* @method _install_socket
* @param socket {Socket}
Expand All @@ -259,18 +288,7 @@ export class TCP_transport extends EventEmitter {
debugLog(" TCP_transport#_install_socket ", this.name);
}

// install packet assembler ...
this.packetAssembler = new PacketAssembler({
readMessageFunc: readRawMessageHeader,

minimumSizeInBytes: this.headerSize
});

/* istanbul ignore next */
if (!this.packetAssembler) {
throw new Error("Internal Error");
}
this.packetAssembler.on("message", (messageChunk: Buffer) => this._on_message_received(messageChunk));
this._install_packetAssembler();

this._socket
.on("data", (data: Buffer) => this._on_socket_data(data))
Expand All @@ -284,14 +302,36 @@ export class TCP_transport extends EventEmitter {
// let use a large timeout here to make sure that we not conflict with our internal timeout
this._socket!.setTimeout(this.timeout + 2000, () => {
debugLog(` _socket ${this.name} has timed out (timeout = ${this.timeout})`);
this.prematureTerminate(new Error("socket timeout : timeout=" + this.timeout));
this.prematureTerminate(new Error("socket timeout : timeout=" + this.timeout), StatusCodes2.BadTimeout);
});
}

public prematureTerminate(err: Error): void {
debugLog("prematureTerminate", err ? err.message : "");
public sendErrorMessage(statusCode: StatusCode, extraErrorDescription: string | null): void {
// When the Client receives an Error Message it reports the error to the application and closes the TransportConnection gracefully.
// If a Client encounters a fatal error, it shall report the error to the application and send a CloseSecureChannel Message.

/* istanbul ignore next*/
if (doDebug) {
debugLog(chalk.red(" sendErrorMessage ") + chalk.cyan(statusCode.toString()));
debugLog(chalk.red(" extraErrorDescription ") + chalk.cyan(extraErrorDescription));
}

const reason = `${statusCode.toString()}:${extraErrorDescription || ""}`;
const errorResponse = new TCPErrorMessage({
statusCode,
reason
});
const messageChunk = packTcpMessage("ERR", errorResponse);
this.write(messageChunk);
}

public prematureTerminate(err: Error, statusCode: StatusCode): void {
// https://reference.opcfoundation.org/v104/Core/docs/Part6/6.7.3/

debugLog("prematureTerminate", err ? err.message : "", statusCode.toString());

if (this._socket) {
err.message = "socket has timeout: EPIPE: " + err.message;
err.message = "premature socket termination " + err.message;
// we consider this as an error
const _s = this._socket;
_s.end();
Expand All @@ -301,6 +341,7 @@ export class TCP_transport extends EventEmitter {
this.dispose();
_s.removeAllListeners();
}
// this.gracefullShutdown(err);
}
/**
* @method _install_one_time_message_receiver
Expand Down Expand Up @@ -341,7 +382,10 @@ export class TCP_transport extends EventEmitter {
return false;
}

private _on_message_received(messageChunk: Buffer) {
private _on_message_chunk_received(messageChunk: Buffer) {
if (doTraceIncomingChunk) {
console.log(hexDump(messageChunk));
}
const hadCallback = this._fulfill_pending_promises(null, messageChunk);
this.chunkReadCount++;
if (!hadCallback) {
Expand All @@ -350,7 +394,7 @@ export class TCP_transport extends EventEmitter {
* @event message
* @param message_chunk the message chunk
*/
this.emit("message", messageChunk);
this.emit("chunk", messageChunk);
}
}

Expand Down Expand Up @@ -397,6 +441,7 @@ export class TCP_transport extends EventEmitter {
}

private _on_socket_data(data: Buffer): void {
// istanbul ignore next
if (!this.packetAssembler) {
throw new Error("internal Error");
}
Expand Down
5 changes: 3 additions & 2 deletions packages/node-opcua-transport/source/utils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//
const transportFlag = (process.env?.NODEOPCUADEBUG?.match(/TRANSPORT{([^}]*)}/) || [])[1] || "";
export const doTraceHelloAck = transportFlag.match(/HELACK/);
export const doTraceChunk = transportFlag.match(/CHUNK/);
export const doTraceHelloAck = !!transportFlag.match(/HELACK/);
export const doTraceChunk = !!transportFlag.match(/CHUNK/);
export const doTraceIncomingChunk = !!transportFlag.match(/FLOW/);
38 changes: 30 additions & 8 deletions packages/node-opcua-transport/test/test_client_tcp_transport.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
"use strict";
const should = require("should");
const { assert } = require("node-opcua-assert");
const chalk = require("chalk");
const sinon = require("sinon");

const { StatusCodes, StatusCode } = require("node-opcua-status-code");

const { assert } = require("node-opcua-assert");
const { hexDump } = require("node-opcua-debug");

const { make_debugLog, make_errorLog } = require("node-opcua-debug");
const { StatusCodes, StatusCode } = require("node-opcua-status-code");
const { compare_buffers } = require("node-opcua-utils");

const { make_debugLog, make_errorLog } = require("node-opcua-debug");

const debugLog = make_debugLog("TEST");
const errorLog = make_errorLog("TEST");

Expand All @@ -19,6 +18,7 @@ const { FakeServer } = require("../dist/test_helpers");
const port = 5678;

const { AcknowledgeMessage, TCPErrorMessage, ClientTCP_transport, packTcpMessage } = require("..");
const { MessageBuilderBase, writeTCPMessageHeader } = require("..");

describe("testing ClientTCP_transport", function () {
this.timeout(15000);
Expand Down Expand Up @@ -215,7 +215,7 @@ describe("testing ClientTCP_transport", function () {

transport.timeout = 1000; // very short timeout;

transport.on("message", function (message_chunk) {
transport.on("chunk", function (message_chunk) {
debugLog(chalk.cyan.bold(hexDump(message_chunk)));
compare_buffers(message_chunk.slice(8), message1);

Expand All @@ -228,12 +228,34 @@ describe("testing ClientTCP_transport", function () {
done();
});

transport.connect(endpointUrl, function (err) {

/**
* ```createChunk``` is used to construct a pre-allocated chunk to store up to ```length``` bytes of data.
* The created chunk includes a prepended header for ```chunk_type``` of size ```self.headerSize```.
*
* @method createChunk
* @param msgType
* @param chunkType {String} chunk type. should be 'F' 'C' or 'A'
* @param length
* @return a buffer object with the required length representing the chunk.
*
* Note:
* - only one chunk can be created at a time.
* - a created chunk should be committed using the ```write``` method before an other one is created.
*/
function createChunk(msgType, chunkType, headerSize, length) {
assert(msgType === "MSG");
const totalLength = length + headerSize;
const buffer = Buffer.alloc(totalLength);
writeTCPMessageHeader("MSG", chunkType, totalLength, buffer);
return buffer;
}
transport.connect(endpointUrl, (err) => {
if (err) {
errorLog(chalk.bgWhite.red(" err = "), err.message);
}
assert(!err);
const buf = transport.createChunk("MSG", "F", message1.length);
const buf = createChunk("MSG", "F", transport.headerSize, message1.length);
message1.copy(buf, transport.headerSize, 0, message1.length);
transport.write(buf);
});
Expand Down
91 changes: 89 additions & 2 deletions packages/node-opcua-transport/test/test_message_builder_base.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
"use strict";
const sinon = require("sinon");

const { BinaryStream } = require("node-opcua-binary-stream");
const { compare_buffers } = require("node-opcua-utils");

Expand All @@ -25,7 +27,7 @@ describe("MessageBuilderBase", function () {
done();
});

builder.on("chunk", function (message_chunk) {
builder.on("chunk", (message_chunk) => {
compare_buffers(message_chunk, original_message_chunk, original_message_chunk.length);
});

Expand All @@ -47,7 +49,7 @@ describe("MessageBuilderBase", function () {

const expected = [original_message_chunk_1, original_message_chunk_2];
let expected_count = 0;
builder.on("chunk", function (message_chunk) {
builder.on("chunk", (message_chunk) => {
const expected_chunk = expected[expected_count];
expected_count += 1;
compare_buffers(message_chunk, expected_chunk, expected_chunk.length);
Expand All @@ -56,4 +58,89 @@ describe("MessageBuilderBase", function () {
builder.feed(original_message_chunk_1);
builder.feed(original_message_chunk_2);
});

it("should not allow more chunks that maxChunkCount ", function (done) {

const builder = new MessageBuilderBase({
maxChunkCount: 5,
maxMessageSize: 64 * 1024,
});

const onChunkSpy = sinon.spy();
builder.on("chunk", onChunkSpy);

const onFullMessageBodySpy = sinon.spy();
builder.on("full_message_body", onFullMessageBodySpy);

const onErrorSpy = sinon.spy();
builder.on("error", onErrorSpy);

const message_body = Buffer.alloc(16 * 1024);
const chunk1 = wrap_message_in_chunk(message_body.slice(0 * 1024, 1 * 1024), "C");
const chunk2 = wrap_message_in_chunk(message_body.slice(1 * 1024, 2 * 1024), "C");
const chunk3 = wrap_message_in_chunk(message_body.slice(2 * 1024, 3 * 1024), "C");
const chunk4 = wrap_message_in_chunk(message_body.slice(3 * 1024, 4 * 1024), "C");
const chunk5 = wrap_message_in_chunk(message_body.slice(4 * 1024, 5 * 1024), "C");
const chunk6 = wrap_message_in_chunk(message_body.slice(5 * 1024, 6 * 1024), "C");
const chunk7 = wrap_message_in_chunk(message_body.slice(6 * 1024, 7 * 1024), "F");

builder.feed(chunk1);
builder.feed(chunk2);
builder.feed(chunk3);
builder.feed(chunk4);
builder.feed(chunk5);
builder.feed(chunk6);
builder.feed(chunk7);

onChunkSpy.callCount.should.eql(6);
onErrorSpy.callCount.should.eql(1);
onFullMessageBodySpy.callCount.should.eql(0);

onErrorSpy.getCall(0).args[0].should.match(/max chunk count exceeded/);

done();

});
it("should not allow message bigger than maxMessageSize ", function (done) {

const builder = new MessageBuilderBase({
maxChunkCount: 1000,
maxMessageSize: 4 * 1024,
});

const onChunkSpy = sinon.spy();
builder.on("chunk", onChunkSpy);

const onFullMessageBodySpy = sinon.spy();
builder.on("full_message_body", onFullMessageBodySpy);

const onErrorSpy = sinon.spy();
builder.on("error", onErrorSpy);

const message_body = Buffer.alloc(16 * 1024);
const chunk1 = wrap_message_in_chunk(message_body.slice(0 * 1024, 1 * 1024), "C");
const chunk2 = wrap_message_in_chunk(message_body.slice(1 * 1024, 2 * 1024), "C");
const chunk3 = wrap_message_in_chunk(message_body.slice(2 * 1024, 3 * 1024), "C");
const chunk4 = wrap_message_in_chunk(message_body.slice(3 * 1024, 4 * 1024), "C");
const chunk5 = wrap_message_in_chunk(message_body.slice(4 * 1024, 5 * 1024), "C");
const chunk6 = wrap_message_in_chunk(message_body.slice(5 * 1024, 6 * 1024), "C");
const chunk7 = wrap_message_in_chunk(message_body.slice(6 * 1024, 7 * 1024), "F");

builder.feed(chunk1);
builder.feed(chunk2);
builder.feed(chunk3);
builder.feed(chunk4);
builder.feed(chunk5);
builder.feed(chunk6);
builder.feed(chunk7);

onChunkSpy.callCount.should.eql(4);
onErrorSpy.callCount.should.eql(1);
onFullMessageBodySpy.callCount.should.eql(0);

onErrorSpy.getCall(0).args[0].should.match(/maxMessageSize/);

done();

});
});
16 changes: 13 additions & 3 deletions packages/node-opcua-transport/test/test_server_tcp_transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ describe("testing ServerTCP_transport", function () {
debugLog("failed !", err.message);
});

transport.on("message", (messageChunk) => {
transport.on("chunk", (messageChunk) => {
// console.log("message ", messageChunk);
done(new Error("Not expecting an message"));
});
Expand Down Expand Up @@ -146,11 +146,21 @@ describe("testing ServerTCP_transport", function () {
function perform_sever_receiving_a_HEL_MESSAGE_followed_by_OpenChannelRequest_scenario(done) {
const transport = new ServerTCP_transport();

transport.setLimits({
maxChunkCount: 10000,
maxMessageSize: 10000,
receiveBufferSize: 10000,
sendBufferSize: 10000
});

transport.init(fakeSocket.server, (err) => {
if (err) {
console.log(err.message);
}
assert(!err);
});

transport.on("message", (messageChunk) => {
transport.on("chunk", (messageChunk) => {
utils.compare_buffers(messageChunk, openChannelRequest);

// it should provide bytesRead and bytesWritten
Expand Down Expand Up @@ -273,7 +283,7 @@ describe("testing ServerTCP_transport", function () {
/** */
});

transport.on("message", (messageChunk) => {
transport.on("chunk", (messageChunk) => {
// console.log("message ", messageChunk);
done();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ import { assert } from "node-opcua-assert";
import { setFakeTransport } from "../source";
import { HalfComChannel } from "./half_com_channel";

export interface DirectTransport {
on(eventName: "end", eventHandler:()=>void): this;
}
export class DirectTransport extends EventEmitter {
public client: HalfComChannel;
public server: HalfComChannel;
Expand Down Expand Up @@ -32,7 +35,7 @@ export class DirectTransport extends EventEmitter {
this.server._hasEnded = true;
});

this.server.on("end", (err: Error) => {
this.server.on("end", (err?: Error) => {
this.emit("end", err);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@
import { EventEmitter } from "events";
import { assert } from "node-opcua-assert";

export interface HalfComChannel {
on(eventName: "data", eventHandler:(data: Buffer)=>void): this;
on(eventName: "send_data", eventHandler:(data: Buffer)=>void): this;
on(eventName: "ending", eventHandler:()=>void): this;
on(eventName: "end", eventHandler:(err?: Error)=>void): this;

}
export class HalfComChannel extends EventEmitter {
public _hasEnded: boolean;

Expand Down