Skip to content

Commit

Permalink
Close websocket clients when stream ends
Browse files Browse the repository at this point in the history
add command id to logging statementes
  • Loading branch information
chmille4 committed Sep 9, 2016
1 parent b66f406 commit 33df018
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 22 deletions.
37 changes: 18 additions & 19 deletions lib/cmd.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ cmd.prototype.init = function(tool, stream, params, opts) {
// add to minions list
var url = decodeURIComponent(arg);
var fifo = fifoPath + '/fifo-' + shortid.generate();
console.log('fifopath = ' + fifo);
self.minions.push( {
'url': url,
'fifoPath': fifo,
Expand Down Expand Up @@ -102,13 +101,13 @@ cmd.prototype.init = function(tool, stream, params, opts) {
var resolvedPath = require("path").resolve(path);
if ( binPath != resolvedPath.substr(0, binPath.length) ) {
var error = "Program path not in executable directory. Only programs in minion/bin/ directory are executable";
self.emit('log', error);
self.emit('error', error);
self.emit('log', self.id + ": " + error);
self.emit('error', self.id + ": " + error);
return false;
} else if( !IsThere(resolvedPath) ) {
var error = "Program not found. Only programs in minion/bin/ directory are executable";
self.emit('log', error);
self.emit('error', error);
self.emit('log', self.id + ": " +error);
self.emit('error', self.id + ": " +error);
return false;
}
self.path = path;
Expand All @@ -126,7 +125,7 @@ cmd.prototype.run = function(clients, serverAddress) {
stream = this.stream;

// spawn tool as new process
self.emit('log', 'command: ' + self.path + ' ' + self.args);
self.emit('log', 'command ' + self.id + ': ' + self.path + ' ' + self.args);


Promise.all( self.minions.map(function(d) { return d.progStream} )).then( function(fifoStreams) {
Expand Down Expand Up @@ -166,7 +165,7 @@ cmd.prototype.run = function(clients, serverAddress) {

// save output for future caching
if (self.opts.cachePath) {
self.emit('log', 'caching ' + self.opts.cachePath);
self.emit('log', self.id + ': caching ' + self.opts.cachePath);
var cacheDir = require('path').dirname(self.opts.cachePath)
mkdirp( cacheDir , function (err) {if (err) console.error(err)});
var processingExtension = self.params.partialCache === 'true' ? '' : '.processing';
Expand Down Expand Up @@ -230,33 +229,33 @@ cmd.prototype.run = function(clients, serverAddress) {
})

prog.stderr.on('data', function (error) {
self.emit('log', 'stderr - ' + error);
self.emit('error', 'stderr - ' + error);
self.emit('log', self.id + ": " + 'stderr - ' + error);
self.emit('error', self.id + ": " + 'stderr - ' + error);
});

prog.on("close", function() {
self.emit('log', 'prog closed')
self.emit('log', self.id + ": " + 'prog closed')
stream.end();
})
prog.stdin.on('error', function() {
self.emit('error', 'error writing to program. possibly unconsumed data in pipe');
self.emit('log', 'error writing to program. possibly unconsumed data in pipe');
self.emit('error', self.id + ": " + 'error writing to program. possibly unconsumed data in pipe');
self.emit('log', self.id + ": " + 'error writing to program. possibly unconsumed data in pipe');
})

prog.on('error', function(err) {
self.emit('error', 'prog threw an error - ' + err);
self.emit('log', 'prog threw an error - ' + err);
self.emit('error', self.id + ": " + 'prog threw an error - ' + err);
self.emit('log', self.id + ": " + 'prog threw an error - ' + err);
})

prog.on('exit', function (code) {
if (code !== 0) {
var error = 'prog process exited with code ' + code
var error = self.id + ": " + 'prog process exited with code ' + code
self.emit('error', error);
self.emit('log', error);
// remove cache
self.deleteCache();
}
self.emit('log', 'prog exited')
self.emit('log', self.id + ": " + 'prog exited')
self.emit('exit', code);
});
})
Expand All @@ -268,7 +267,7 @@ cmd.prototype.end = function() {
// if prog is still running, clean up
if(self.prog) {
if (self.prog.exitCode != 0 ) {
self.emit('log', 'command prematurely ended. Cleaning up ...')
self.emit('log', 'command ' + self.id + ": " + ' prematurely ended. Cleaning up ...')
self.deleteCache();
}
}
Expand All @@ -290,9 +289,9 @@ cmd.prototype.deleteCache = function() {

if (self.opts.cachePath) {
if (self.params.partialCache === 'true')
self.emit('log', 'leaving partial cache: ' + self.opts.cachePath);
self.emit('log', 'command ' + self.id + ": " + 'leaving partial cache: ' + self.opts.cachePath);
else {
self.emit('log', 'deleting cache: ' + self.opts.cachePath);
self.emit('log', 'command ' + self.id + ": " + 'deleting cache: ' + self.opts.cachePath);
fs.unlink(self.opts.cachePath + '.processing', function() {});
fs.unlink(self.opts.cachePath, function() {});
}
Expand Down
6 changes: 5 additions & 1 deletion lib/protocol/ws.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ ws.prototype.run = function(cmd, progStream, opts) {
url = cmd.url;

var source = urlParser( url );
self.emit('log', "requesting url: "+url);
self.emit('log', cmd.id + " requesting url: "+url);

if(opts && opts.stream != undefined) {
var ustream = opts.stream;
Expand All @@ -28,6 +28,9 @@ ws.prototype.run = function(cmd, progStream, opts) {
ustream.on('err', function(error) {
self.emit('error', error);
})
ustream.on('end', function() {
if (ustream.client) { ustream.client.close() };
})
}
else {
var upstreamClient = new BinaryClient(source.host);
Expand All @@ -44,6 +47,7 @@ ws.prototype.run = function(cmd, progStream, opts) {
ustream.on('createClientConnection', function(connection) {
self.emit('createClientConnection', connection);
})
ustream.on('end', function() { upstreamClient.close(); })
});
}
}
Expand Down
1 change: 1 addition & 0 deletions lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ server.listen = function(tool) {
var fifoStreamId = options.connectionID.split('&')[1];
// get command
var dataCommand = bs.commands[options.connectionID];
stream.client = client;
// emit event that client is ready to write data
dataCommand.emit('clientConnected-'+fifoStreamId, stream);
}
Expand Down
4 changes: 2 additions & 2 deletions spec/serverSpec.js
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ describe("Server", function() {
});
})
it("errors when path is outside of bin sandbox", function() {
expect(this.result).toEqual('notool Error: Program path not in executable directory. Only programs in minion/bin/ directory are executable');
expect(this.result.split('Program path ')[1]).toEqual('not in executable directory. Only programs in minion/bin/ directory are executable');
});
afterEach(function() {
this.minion.close();
Expand All @@ -188,7 +188,7 @@ describe("Server", function() {
});
})
it("errors when tool doesn't exist", function() {
expect(this.result).toEqual('notool Error: Program not found. Only programs in minion/bin/ directory are executable');
expect(this.result.split('Program not found. ')[1]).toEqual('Only programs in minion/bin/ directory are executable');
});
afterEach(function() {
this.minion.close();
Expand Down

0 comments on commit 33df018

Please sign in to comment.