Skip to content

Commit

Permalink
Merge pull request #130 from noflo/error_port_test_js
Browse files Browse the repository at this point in the history
Provide error and ready events
  • Loading branch information
bergie committed Sep 8, 2020
2 parents 713cd7e + d583810 commit 0932d33
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 18 deletions.
99 changes: 99 additions & 0 deletions spec/Base.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
describe('Base interface', () => {
describe('without a graph', () => {
it('should become ready without network', (done) => {
const rt = new direct.Runtime({
baseDir,
});
rt.on('ready', (net) => {
chai.expect(net).to.equal(null);
done();
});
});
});
describe('with a working default graph', () => {
it('should register and run a network', (done) => {
const graphData = {
processes: {
Node1: {
component: 'core/RepeatAsync',
},
},
connections: [
{
data: 'My message to print',
tgt: {
process: 'Node1',
port: 'in',
},
},
],
};
let readyReceived = false;
let startReceived = false;
noflo.graph.loadJSON(graphData, (err, graph) => {
if (err) {
done(err);
return;
}
const rt = new direct.Runtime({
defaultGraph: graph,
baseDir,
});
rt.on('ready', (net) => {
chai.expect(net).to.be.an('object');
chai.expect(net.start).to.be.a('function');
chai.expect(net.graph).to.equal(graph);
readyReceived = true;
});
rt.network.on('addnetwork', (network) => {
network.on('start', () => {
startReceived = true;
});
network.on('end', () => {
chai.expect(readyReceived, 'should have received ready').to.equal(true);
chai.expect(startReceived, 'should have received start').to.equal(true);
done();
});
});
});
});
});
describe('with a graph containing a faulty IIP', () => {
it('should emit an error', (done) => {
const graphData = {
processes: {
Node1: {
component: 'core/Repeat',
},
},
connections: [
{
data: 'My message to print',
tgt: {
process: 'Node1',
port: 'missing',
},
},
],
};
noflo.graph.loadJSON(graphData, (err, graph) => {
if (err) {
done(err);
return;
}
const rt = new direct.Runtime({
defaultGraph: graph,
baseDir,
});
rt.on('ready', () => {
done(new Error('Received unexpected network'));
});
rt.on('error', (err) => {
chai.expect(err).to.be.an('error');
chai.expect(err.message).to.include('No inport \'missing\'');
done();
});
});
});
});
});
46 changes: 34 additions & 12 deletions src/Base.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
const {
EventEmitter,
} = require('events');

/* eslint class-methods-use-this: ["error", { "exceptMethods": ["send", "sendAll"] }] */
const protocols = {
// eslint-disable-next-line global-require
Expand All @@ -17,8 +21,9 @@ const debugMessagingSendPayload = require('debug')('noflo-runtime-base:messaging

// This is the class all NoFlo runtime implementations can extend to easily wrap
// into any transport protocol.
class BaseTransport {
class BaseTransport extends EventEmitter {
constructor(options) {
super();
this.options = options;
if (!this.options) { this.options = {}; }
this.version = '0.7';
Expand All @@ -28,17 +33,6 @@ class BaseTransport {
this.runtime = new protocols.Runtime(this);
this.context = null;

if (this.options.defaultGraph != null) {
this.options.defaultGraph.baseDir = this.options.baseDir;
const graphName = this.getGraphName(this.options.defaultGraph);
this.context = 'none';
this.graph.registerGraph(graphName, this.options.defaultGraph);
this.runtime.setMainGraph(graphName, this.options.defaultGraph);
this.network._startNetwork(this.options.defaultGraph, graphName, this.context, (err) => {
if (err) { throw err; }
});
}

if ((this.options.captureOutput != null) && this.options.captureOutput) {
// Start capturing so that we can send it to the UI when it connects
this.startCapture();
Expand Down Expand Up @@ -67,6 +61,10 @@ class BaseTransport {
if (!this.options.permissions) {
this.options.permissions = {};
}

setTimeout(() => {
this._startDefaultGraph();
}, 0);
}

// Generate a name for the main graph
Expand All @@ -76,6 +74,30 @@ class BaseTransport {
return `${namespace}/${graphName}`;
}

_startDefaultGraph() {
if (!this.options.defaultGraph) {
this.emit('ready', null);
return;
}
this.options.defaultGraph.baseDir = this.options.baseDir;
const graphName = this.getGraphName(this.options.defaultGraph);
this.context = 'none';
this.network._startNetwork(
this.options.defaultGraph,
graphName,
this.context,
(err, network) => {
if (err) {
this.emit('error', err);
return;
}
this.graph.registerGraph(graphName, this.options.defaultGraph, false);
this.runtime.setMainGraph(graphName, this.options.defaultGraph);
this.emit('ready', network);
},
);
}

// Check if a given user is authorized for a given capability
//
// @param [Array] Capabilities to check
Expand Down
2 changes: 1 addition & 1 deletion src/DirectRuntime.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class DirectRuntime extends Base {
}

send(protocol, topic, payload, context) {
if (!context.client) { return; }
if (!context || !context.client) { return; }
const m = {
protocol,
command: topic,
Expand Down
22 changes: 17 additions & 5 deletions src/protocol/Network.js
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,9 @@ class NetworkProtocol extends EventEmitter {
this.subscribeNetwork(network, graphName, context);

// Wire up the network
network.connect((err2) => {
if (err2) {
callback(err2);
network.connect((connectError) => {
if (connectError) {
callback(connectError);
return;
}
callback(null, network);
Expand Down Expand Up @@ -291,7 +291,13 @@ class NetworkProtocol extends EventEmitter {
const existingNetwork = this.getNetwork(graphName);
if (existingNetwork) {
// already initialized
existingNetwork.start(callback);
existingNetwork.start((startError) => {
if (startError) {
callback(startError);
return;
}
callback(null, existingNetwork);
});
return;
}

Expand All @@ -301,7 +307,13 @@ class NetworkProtocol extends EventEmitter {
return;
}
const network = this.getNetwork(graphName);
network.start(callback);
network.start((startError) => {
if (startError) {
callback(startError);
return;
}
callback(null, network);
});
});
}

Expand Down

0 comments on commit 0932d33

Please sign in to comment.