-
-
Notifications
You must be signed in to change notification settings - Fork 478
/
tcp_transport.js
427 lines (330 loc) · 11.5 KB
/
tcp_transport.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
"use strict";
/**
* @module opcua.transport
*/
// system requires
const EventEmitter = require("events").EventEmitter;
const assert = require("node-opcua-assert").assert;
const _ = require("underscore");
const util = require("util");
// opcua requires
const PacketAssembler = require("node-opcua-packet-assembler").PacketAssembler;
const writeTCPMessageHeader = require("./tools").writeTCPMessageHeader;
const readRawMessageHeader = require("./message_builder_base").readRawMessageHeader;
const buffer_utils = require("node-opcua-buffer-utils");
const createFastUninitializedBuffer = buffer_utils.createFastUninitializedBuffer;
const debug = require("node-opcua-debug");
const debugLog = debug.make_debugLog(__filename);
const doDebug = debug.checkDebugFlag(__filename);
let fakeSocket = {invalid: true};
exports.setFakeTransport = function (socket_like_mock) {
fakeSocket = socket_like_mock;
};
exports.getFakeTransport = function () {
if (fakeSocket.invalid){
throw new Error("getFakeTransport: setFakeTransport must be called first - BadProtocolVersionUnsupported");
}
return fakeSocket;
};
/**
* TCP_transport
*
* @class TCP_transport
* @constructor
* @extends EventEmitter
*/
function TCP_transport() {
/**
* timeout
* @property [timeout=30000]
* @type {number}
*/
this.timeout = 30000; // 30 seconds timeout
this._socket = null;
/**
* @property headerSize the size of the header in bytes
* @type {number}
* @default 8
*/
this.headerSize = 8;
/**
* @property protocolVersion indicates the version number of the OPCUA protocol used
* @type {number}
* @default 0
*/
this.protocolVersion = 0;
this.__disconnecting__ = false;
this.bytesWritten = 0;
this.bytesRead = 0;
this._the_callback = null;
/***
* @property chunkWrittenCount
* @type {number}
*/
this.chunkWrittenCount= 0;
/***
* @property chunkReadCount
* @type {number}
*/
this.chunkReadCount= 0;
}
util.inherits(TCP_transport, EventEmitter);
/**
* ```createChunk``` is used to construct a pre-allocated chunk to store up to ```length``` bytes of data.
* The created chunk includes a prepended header for ```chunk_type``` of size ```self.headerSize```.
*
* @method createChunk
* @param msg_type
* @param chunk_type {String} chunk type. should be 'F' 'C' or 'A'
* @param length
* @return {Buffer} a buffer object with the required length representing the chunk.
*
* Note:
* - only one chunk can be created at a time.
* - a created chunk should be committed using the ```write``` method before an other one is created.
*/
TCP_transport.prototype.createChunk = function (msg_type, chunk_type, length) {
assert(msg_type === "MSG");
assert(this._pending_buffer === undefined, "createChunk has already been called ( use write first)");
const total_length = length + this.headerSize;
const buffer = createFastUninitializedBuffer(total_length);
writeTCPMessageHeader("MSG", chunk_type, total_length, buffer);
this._pending_buffer = buffer;
return buffer;
};
let counter =0;
function record(data,extra) {
extra = extra || "";
const c = ("00000000" +counter.toString(10)).substr(-8);
data._serialNumber = c;
data.info = data.info || "";
const l ="";// "-L="+("00000000" +data.length).substr(-8);
require("fs").writeFileSync("data"+c+l+data.info+extra+".org",data,"binary");
counter++;
}
TCP_transport.prototype._write_chunk = function (message_chunk) {
if (this._socket) {
this.bytesWritten += message_chunk.length;
this.chunkWrittenCount ++;
this._socket.write(message_chunk);
}
};
/**
* write the message_chunk on the socket.
* @method write
* @param message_chunk {Buffer}
*
* Notes:
* - the message chunk must have been created by ```createChunk```.
* - once a message chunk has been written, it is possible to call ```createChunk``` again.
*
*/
TCP_transport.prototype.write = function (message_chunk) {
assert((this._pending_buffer === undefined) || this._pending_buffer === message_chunk, " write should be used with buffer created by createChunk");
const header = readRawMessageHeader(message_chunk);
assert(header.length === message_chunk.length);
assert(["F", "C", "A"].indexOf(header.messageHeader.isFinal) !== -1);
this._write_chunk(message_chunk);
this._pending_buffer = undefined;
};
function _fulfill_pending_promises(err, data) {
const self = this;
_cleanup_timers.call(self);
const the_callback = self._the_callback;
self._the_callback = null;
if (the_callback) {
the_callback(err, data);
return true;
}
return false;
}
function _on_message_received(message_chunk) {
const self = this;
const has_callback = _fulfill_pending_promises.call(self, null, message_chunk);
self.chunkReadCount ++;
if (!has_callback) {
/**
* notify the observers that a message chunk has been received
* @event message
* @param message_chunk {Buffer} the message chunk
*/
self.emit("message", message_chunk);
}
}
function _cleanup_timers() {
const self = this;
if (self._timerId) {
clearTimeout(self._timerId);
this._timerId = null;
}
}
function _start_timeout_timer() {
const self = this;
assert(!self._timerId, "timer already started");
self._timerId = setTimeout(function () {
self._timerId =null;
_fulfill_pending_promises.call(self, new Error("Timeout in waiting for data on socket ( timeout was = " + self.timeout + " ms )"));
}, self.timeout);
}
TCP_transport.prototype.on_socket_closed = function(err) {
const self = this;
if (self._on_socket_closed_called) {
return;
}
assert(!self._on_socket_closed_called);
self._on_socket_closed_called = true; // we don't want to send close event twice ...
/**
* notify the observers that the transport layer has been disconnected.
* @event socket_closed
* @param err the Error object or null
*/
self.emit("socket_closed", err || null);
};
TCP_transport.prototype.on_socket_ended = function(err) {
const self = this;
assert(!self._on_socket_ended_called);
self._on_socket_ended_called = true; // we don't want to send close event twice ...
/**
* notify the observers that the transport layer has been disconnected.
* @event close
* @param err the Error object or null
*/
self.emit("close", err || null);
};
TCP_transport.prototype._on_socket_ended_message = function(err) {
const self = this;
if (self.__disconnecting__) {
return;
}
self._on_socket_ended = null;
self._on_data_received = null;
debugLog("Transport Connection ended".red + " " + self.name);
assert(!self.__disconnecting__);
err = err || new Error("_socket has been disconnected by third party");
self.on_socket_ended(err);
self.__disconnecting__ = true;
debugLog(" bytesRead = ", self.bytesRead);
debugLog(" bytesWritten = ", self.bytesWritten);
_fulfill_pending_promises.call(self, new Error("Connection aborted - ended by server : " + (err ? err.message : "")));
};
/**
* @method _install_socket
* @param socket {Socket}
* @protected
*/
TCP_transport.prototype._install_socket = function (socket) {
assert(socket);
const self = this;
self.name = " Transport " + counter;
counter += 1;
self._socket = socket;
// install packet assembler ...
self.packetAssembler = new PacketAssembler({
readMessageFunc: readRawMessageHeader,
minimumSizeInBytes: self.headerSize
});
self.packetAssembler.on("message", function (message_chunk) {
_on_message_received.call(self, message_chunk);
});
self._socket.on("data", function (data) {
self.bytesRead += data.length;
if (data.length > 0) {
self.packetAssembler.feed(data);
}
}).on("close", function (had_error) {
// istanbul ignore next
if (doDebug) {
debugLog(" SOCKET CLOSE : ".red, "had_error =".yellow,had_error.toString().cyan,self.name);
}
if (self._socket ) {
debugLog(" remote address = ",self._socket.remoteAddress, " " , self._socket.remoteFamily, " ",self._socket.remotePort);
}
if (had_error) {
if (self._socket) {
self._socket.destroy();
}
}
const err = had_error ? new Error("ERROR IN SOCKET") : null;
self.on_socket_closed(err);
}).on("end", function (err) {
// istanbul ignore next
if (doDebug) {
debugLog(" SOCKET END : ".red, err ? err.message.yellow : "null", self._socket.name, self.name);
}
self._on_socket_ended_message(err);
}).on("error", function (err) {
// istanbul ignore next
if (doDebug) {
debugLog(" SOCKET ERROR : ".red, err.message.yellow, self._socket.name, self.name);
}
// node The "close" event will be called directly following this event.
});
const do_destroy_on_timeout =false;
if (do_destroy_on_timeout) {
// set socket timeout
debugLog("setting client/server socket timeout to ",self.timeout);
self._socket.setTimeout(self.timeout,function(){
console.log(" connection has timed out (timeout =",self.timeout,")");
self._socket.destroy();
});
}
};
/**
* @method _install_one_time_message_receiver
*
* install a one time message receiver callback
*
* Rules:
* * TCP_transport will not emit the ```message``` event, while the "one time message receiver" is in operation.
* * the TCP_transport will wait for the next complete message chunk and call the provided callback func
* ```callback(null,messageChunk);```
* * if a messageChunk is not received within ```TCP_transport.timeout``` or if the underlying socket reports an error,
* the callback function will be called with an Error.
*
* @param callback {Function} the callback function
* @param callback.err {null|Error}
* @param callback.messageChunk {Buffer|null}
* @protected
*/
TCP_transport.prototype._install_one_time_message_receiver = function (callback) {
const self = this;
assert(!self._the_callback, "callback already set");
assert(_.isFunction(callback));
self._the_callback = callback;
_start_timeout_timer.call(self);
};
/**
* disconnect the TCP layer and close the underlying socket.
* The ```"close"``` event will be emitted to the observers with err=null.
*
* @method disconnect
* @async
* @param callback
*/
TCP_transport.prototype.disconnect = function (callback) {
assert(_.isFunction(callback), "expecting a callback function, but got " + callback);
const self = this;
if (self.__disconnecting__) {
callback();
return;
}
assert(!self.__disconnecting__, "TCP Transport has already been disconnected");
self.__disconnecting__ = true;
assert(!self._the_callback, "disconnect shall not be called while the 'one time message receiver' is in operation");
_cleanup_timers.call(self);
if (self._socket) {
self._socket.end();
self._socket.destroy();
//Xx self._socket.removeAllListeners();
self._socket = null;
}
setImmediate(function () {
self.on_socket_ended(null);
callback();
});
};
TCP_transport.prototype.isValid = function() {
const self = this;
return self._socket !== null&& !self._socket.destroyed && !self.__disconnecting__;
};
exports.TCP_transport = TCP_transport;