Skip to content

Commit

Permalink
streams: introduce StreamWrap and JSStream
Browse files Browse the repository at this point in the history
Introduce a way to wrap plain-js `stream.Duplex` streams into C++
StreamBase's child class. With such method at hand it is now possible to
pass `stream.Duplex` instance as a `socket` parameter to
`tls.connect()`.

PR-URL: #926
Reviewed-By: Chris Dickinson <christopher.s.dickinson@gmail.com>
  • Loading branch information
indutny committed Feb 24, 2015
1 parent e00c938 commit 1738c77
Show file tree
Hide file tree
Showing 15 changed files with 498 additions and 52 deletions.
118 changes: 118 additions & 0 deletions lib/_stream_wrap.js
@@ -0,0 +1,118 @@
const util = require('util');
const Socket = require('net').Socket;
const JSStream = process.binding('js_stream').JSStream;
const uv = process.binding('uv');

function StreamWrap(stream) {
var handle = new JSStream();

this.stream = stream;

var self = this;
handle.close = function(cb) {
cb();
};
handle.isAlive = function() {
return self.isAlive();
};
handle.isClosing = function() {
return self.isClosing();
};
handle.onreadstart = function() {
return self.readStart();
};
handle.onreadstop = function() {
return self.readStop();
};
handle.onshutdown = function(req) {
return self.shutdown(req);
};
handle.onwrite = function(req, bufs) {
return self.write(req, bufs);
};

this.stream.pause();
this.stream.on('data', function(chunk) {
self._handle.readBuffer(chunk);
});
this.stream.once('end', function() {
self._handle.emitEOF();
});
this.stream.on('error', function(err) {
self.emit('error', err);
});

Socket.call(this, {
handle: handle
});
}
util.inherits(StreamWrap, Socket);
module.exports = StreamWrap;

// require('_stream_wrap').StreamWrap
StreamWrap.StreamWrap = StreamWrap;

StreamWrap.prototype.isAlive = function isAlive() {
return this.readable && this.writable;
};

StreamWrap.prototype.isClosing = function isClosing() {
return !this.isAlive();
};

StreamWrap.prototype.readStart = function readStart() {
this.stream.resume();
return 0;
};

StreamWrap.prototype.readStop = function readStop() {
this.stream.pause();
return 0;
};

StreamWrap.prototype.shutdown = function shutdown(req) {
var self = this;

this.stream.end(function() {
// Ensure that write was dispatched
setImmediate(function() {
self._handle.finishShutdown(req, 0);
});
});
return 0;
};

StreamWrap.prototype.write = function write(req, bufs) {
var pending = bufs.length;
var self = this;

self.stream.cork();
bufs.forEach(function(buf) {
self.stream.write(buf, done);
});
self.stream.uncork();

function done(err) {
if (!err && --pending !== 0)
return;

// Ensure that this is called once in case of error
pending = 0;

// Ensure that write was dispatched
setImmediate(function() {
var errCode = 0;
if (err) {
if (err.code && uv['UV_' + err.code])
errCode = uv['UV_' + err.code];
else
errCode = uv.UV_EPIPE;
}

self._handle.doAfterWrite(req);
self._handle.finishWrite(req, errCode);
});
}

return 0;
};
15 changes: 13 additions & 2 deletions lib/_tls_wrap.js
Expand Up @@ -7,6 +7,8 @@ const tls = require('tls');
const util = require('util');
const listenerCount = require('events').listenerCount;
const common = require('_tls_common');
const StreamWrap = require('_stream_wrap').StreamWrap;
const Duplex = require('stream').Duplex;
const debug = util.debuglog('tls');
const Timer = process.binding('timer_wrap').Timer;
const tls_wrap = process.binding('tls_wrap');
Expand Down Expand Up @@ -224,6 +226,10 @@ function TLSSocket(socket, options) {
this.authorized = false;
this.authorizationError = null;

// Wrap plain JS Stream into StreamWrap
if (!(socket instanceof net.Socket) && socket instanceof Duplex)
socket = new StreamWrap(socket);

// Just a documented property to make secure sockets
// distinguishable from regular ones.
this.encrypted = true;
Expand Down Expand Up @@ -280,7 +286,8 @@ TLSSocket.prototype._wrapHandle = function(handle) {
// Proxy HandleWrap, PipeWrap and TCPWrap methods
proxiedMethods.forEach(function(name) {
res[name] = function methodProxy() {
return handle[name].apply(handle, arguments);
if (handle[name])
return handle[name].apply(handle, arguments);
};
});

Expand Down Expand Up @@ -373,7 +380,7 @@ TLSSocket.prototype._init = function(socket) {
this.setTimeout(options.handshakeTimeout, this._handleTimeout);

// Socket already has some buffered data - emulate receiving it
if (socket && socket._readableState.length) {
if (socket && socket._readableState && socket._readableState.length) {
var buf;
while ((buf = socket.read()) !== null)
ssl.receive(buf);
Expand All @@ -388,6 +395,10 @@ TLSSocket.prototype._init = function(socket) {
self._connecting = false;
self.emit('connect');
});

socket.on('error', function(err) {
self._tlsError(err);
});
}

// Assume `tls.connect()`
Expand Down
3 changes: 3 additions & 0 deletions node.gyp
Expand Up @@ -56,6 +56,7 @@
'lib/_stream_duplex.js',
'lib/_stream_transform.js',
'lib/_stream_passthrough.js',
'lib/_stream_wrap.js',
'lib/string_decoder.js',
'lib/sys.js',
'lib/timers.js',
Expand Down Expand Up @@ -95,6 +96,7 @@
'src/fs_event_wrap.cc',
'src/cares_wrap.cc',
'src/handle_wrap.cc',
'src/js_stream.cc',
'src/node.cc',
'src/node_buffer.cc',
'src/node_constants.cc',
Expand Down Expand Up @@ -132,6 +134,7 @@
'src/env.h',
'src/env-inl.h',
'src/handle_wrap.h',
'src/js_stream.h',
'src/node.h',
'src/node_buffer.h',
'src/node_constants.h',
Expand Down
1 change: 1 addition & 0 deletions src/async-wrap.h
Expand Up @@ -17,6 +17,7 @@ namespace node {
V(FSREQWRAP) \
V(GETADDRINFOREQWRAP) \
V(GETNAMEINFOREQWRAP) \
V(JSSTREAM) \
V(PIPEWRAP) \
V(PROCESSWRAP) \
V(QUERYWRAP) \
Expand Down
7 changes: 7 additions & 0 deletions src/env.h
Expand Up @@ -107,6 +107,8 @@ namespace node {
V(ipv4_string, "IPv4") \
V(ipv6_lc_string, "ipv6") \
V(ipv6_string, "IPv6") \
V(isalive_string, "isAlive") \
V(isclosing_string, "isClosing") \
V(issuer_string, "issuer") \
V(issuercert_string, "issuerCertificate") \
V(kill_signal_string, "killSignal") \
Expand Down Expand Up @@ -141,9 +143,13 @@ namespace node {
V(onnewsessiondone_string, "onnewsessiondone") \
V(onocspresponse_string, "onocspresponse") \
V(onread_string, "onread") \
V(onreadstart_string, "onreadstart") \
V(onreadstop_string, "onreadstop") \
V(onselect_string, "onselect") \
V(onshutdown_string, "onshutdown") \
V(onsignal_string, "onsignal") \
V(onstop_string, "onstop") \
V(onwrite_string, "onwrite") \
V(output_string, "output") \
V(order_string, "order") \
V(owner_string, "owner") \
Expand Down Expand Up @@ -225,6 +231,7 @@ namespace node {
V(context, v8::Context) \
V(domain_array, v8::Array) \
V(fs_stats_constructor_function, v8::Function) \
V(jsstream_constructor_template, v8::FunctionTemplate) \
V(module_load_list_array, v8::Array) \
V(pipe_constructor_template, v8::FunctionTemplate) \
V(process_object, v8::Object) \
Expand Down

0 comments on commit 1738c77

Please sign in to comment.