Skip to content

Commit

Permalink
creating a consitent and extensible interface
Browse files Browse the repository at this point in the history
  • Loading branch information
pghalliday committed Nov 23, 2012
1 parent dcb2ad4 commit ac4423b
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 20 deletions.
20 changes: 16 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ npm install stream-multiplex
```javascript
var MultiplexStream = require('multiplex-stream');

// create 2 multiplex instances and listen for connections downstream
var upstreamMultiplex = new MultiplexStream();
// create downstream multiplex that listens for connections
var downstreamMultiplex = new MultiplexStream(function(downstreamConnection) {
// a multiplexed stream has connected from upstream
// a multiplexed stream has connected from upstream.
// The assigned id will be accessible as downstreamConnection.id
downstreamConnection.setEncoding();
downstreamConnection.on('data', function(data) {
// received data, send reply upstream
Expand All @@ -39,12 +39,24 @@ var downstreamMultiplex = new MultiplexStream(function(downstreamConnection) {
});
});

// create upstream multiplex that will be used to initiate connections
var upstreamMultiplex = new MultiplexStream({
// The connectTimeout optionally specifies how long to
// wait in milliseconds for the downstream multiplex to
// accept to connections. It defaults to 3000 milliseconds
connectTimeout: 5000
});

// pipe from one multiplex to the other (there could
// be other carrier streams in between, for instance a net socket)
upstreamMultiplex.pipe(downstreamMultiplex).pipe(upstreamMultiplex);

// create a new upstream multiplexed stream
var upstreamConnection = upstreamMultiplex.connect(function() {
var upstreamConnection = upstreamMultiplex.connect({
// optionally specify an id for the stream. By default
// a v1 UUID will be assigned as the id for anonymous streams
id: 'MyStream'
}, function() {
upstreamConnection.setEncoding();
upstreamConnection.on('data', function(data) {
// received reply, end the connection
Expand Down
23 changes: 12 additions & 11 deletions src/MultiplexStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -158,20 +158,20 @@ function Tunnel(id, streamMultiplexStream) {
}
util.inherits(Tunnel, Stream);

function MultiplexStream(options, callback) {
function MultiplexStream(multiplexOptions, callback) {
var self = this,
decoder = new Decoder(),
tunnels = {};

self.readable = true;
self.writable = true;

if (typeof options === 'function') {
callback = options;
options = null;
if (typeof multiplexOptions === 'function') {
callback = multiplexOptions;
multiplexOptions = null;
}

options = options || {};
multiplexOptions = multiplexOptions || {};

if (callback) {
self.on('connection', callback);
Expand Down Expand Up @@ -219,12 +219,13 @@ function MultiplexStream(options, callback) {
}
});

self.connect = function(id, connectListener){
if (typeof id === 'function') {
connectListener = id;
id = null;
self.connect = function(connectOptions, connectListener){
if (typeof connectOptions === 'function') {
connectListener = connectOptions;
connectOptions = null;
}
id = id || uuid.v1();
connectOptions = connectOptions || {};
var id = connectOptions.id || uuid.v1();

var tunnel = new Tunnel(id, self);
if (connectListener) {
Expand All @@ -237,7 +238,7 @@ function MultiplexStream(options, callback) {
tunnel.connectTimeout = setTimeout(function() {
delete tunnels[id];
emitEvent(tunnel, 'error', new Error('Connect request timed out'));
}, options.connectTimeout || DEFAULT_CONNECT_TIMEOUT);
}, multiplexOptions.connectTimeout || DEFAULT_CONNECT_TIMEOUT);
registerTunnel(tunnel);
emitEvent(self, 'data', encodeEvent({
tunnelId: id,
Expand Down
10 changes: 5 additions & 5 deletions test/src/MultiplexStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,14 @@ describe('MultiplexStream', function() {
});
upstreamMultiplex.pipe(downstreamMultiplex).pipe(upstreamMultiplex);

var upstreamConnection = upstreamMultiplex.connect('anAwesomeID');
var upstreamConnection = upstreamMultiplex.connect({id: 'anAwesomeID'});
});

it('should error if the upstream multiplex already has a connection with the requested name', function(done) {
var upstreamMultiplex = new MultiplexStream();
var downstreamMultiplex = new MultiplexStream(function(downstreamConnection) {
expect(downstreamConnection.id).to.equal('anAwesomeID');
upstreamMultiplex.connect('anAwesomeID', function() {
upstreamMultiplex.connect({id: 'anAwesomeID'}, function() {
expect().fail('Should not have received connect event');
}).on('error', function(error) {
expect(error.message).to.equal('Connection already exists');
Expand All @@ -166,7 +166,7 @@ describe('MultiplexStream', function() {
});
upstreamMultiplex.pipe(downstreamMultiplex).pipe(upstreamMultiplex);

var upstreamConnection = upstreamMultiplex.connect('anAwesomeID');
var upstreamConnection = upstreamMultiplex.connect({id: 'anAwesomeID'});
});

it('should timeout if no multiplex responds to connect requests', function(done) {
Expand All @@ -187,7 +187,7 @@ describe('MultiplexStream', function() {
connectTimeout: 500
});
anotherUpstreamMultiplex.pipe(downstreamMultiplex).pipe(anotherUpstreamMultiplex);
anotherUpstreamMultiplex.connect('anAwesomeID', function() {
anotherUpstreamMultiplex.connect({id: 'anAwesomeID'}, function() {
expect().fail('Should not have received connect event');
}).on('error', function(error) {
expect(error.message).to.equal('Connect request timed out');
Expand All @@ -196,7 +196,7 @@ describe('MultiplexStream', function() {
});
upstreamMultiplex.pipe(downstreamMultiplex).pipe(upstreamMultiplex);

var upstreamConnection = upstreamMultiplex.connect('anAwesomeID');
var upstreamConnection = upstreamMultiplex.connect({id: 'anAwesomeID'});
});

it('should end tunnel streams cleanly when the multiplex stream ends', function(done) {
Expand Down

0 comments on commit ac4423b

Please sign in to comment.