Skip to content

Commit

Permalink
Complete test coverage on flow engine refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
knolleary committed Nov 2, 2015
1 parent 5a176a0 commit d1940a0
Show file tree
Hide file tree
Showing 7 changed files with 1,012 additions and 741 deletions.
1 change: 1 addition & 0 deletions red/nodes/Node.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ function Node(n) {
util.inherits(Node, EventEmitter);

Node.prototype.updateWires = function(wires) {
//console.log("UPDATE",this.id);
this.wires = wires || [];
delete this._wire;

Expand Down
18 changes: 9 additions & 9 deletions red/nodes/credentials.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ module.exports = {
storage = _storage;
redApp = _app;
},

/**
* Loads the credentials from storage.
*/
Expand All @@ -79,7 +79,7 @@ module.exports = {
log.warn(log._("nodes.credentials.error",{message: err}));
});
},

/**
* Adds a set of credentials for the given node id.
* @param id the node id for the credentials
Expand Down Expand Up @@ -118,7 +118,7 @@ module.exports = {
clean: function (config) {
var existingIds = {};
config.forEach(function(n) {
existingIds[n.id] = true;
existingIds[n.id] = true;
});
var deletedCredentials = false;
for (var c in credentialCache) {
Expand All @@ -135,7 +135,7 @@ module.exports = {
return when.resolve();
}
},

/**
* Registers a node credential definition.
* @param type the node type
Expand All @@ -146,14 +146,14 @@ module.exports = {
credentialsDef[dashedType] = definition;
registerEndpoint(dashedType);
},

/**
* Extracts and stores any credential updates in the provided node.
* The provided node may have a .credentials property that contains
* new credentials for the node.
* This function loops through the credentials in the definition for
* the node-type and applies any of the updates provided in the node.
*
*
* This function does not save the credentials to disk as it is expected
* to be called multiple times when a new flow is deployed.
*
Expand All @@ -171,7 +171,7 @@ module.exports = {
log.warn(log._("nodes.credentials.not-registered",{type:nodeType}));
return;
}

for (var cred in definition) {
if (definition.hasOwnProperty(cred)) {
if (newCreds[cred] === undefined) {
Expand All @@ -191,15 +191,15 @@ module.exports = {
delete node.credentials;
}
},

/**
* Saves the credentials to storage
* @return a promise for the saving of credentials to storage
*/
save: function () {
return storage.saveCredentials(credentialCache);
},

/**
* Gets the credential definition for the given node type
* @param type the node type
Expand Down
6 changes: 3 additions & 3 deletions red/nodes/flows/Flow.js
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ function Flow(global,flow) {
source: {
id: node.id,
type: node.type,
name: node.name,
count: count
}
};
Expand Down Expand Up @@ -258,11 +259,12 @@ function mapEnvVarProperties(obj,prop) {
}

function createNode(type,config) {
//console.log("CREATE",type,config.id);
// console.log("CREATE",type,config.id);
var nn = null;
var nt = typeRegistry.get(type);
if (nt) {
var conf = clone(config);
delete conf.credentials;
for (var p in conf) {
if (conf.hasOwnProperty(p)) {
mapEnvVarProperties(conf,p);
Expand Down Expand Up @@ -371,9 +373,7 @@ function createSubflow(sf,sfn,subflows,globalSubflows,activeNodes) {
if (sf.out) {
var node,wires,i,j;
// Restore the original wiring to the internal nodes

subflowInstance.wires = clone(subflowInstance._originalWires);

for (i=0;i<sf.out.length;i++) {
wires = sf.out[i].wires;
for (j=0;j<wires.length;j++) {
Expand Down
62 changes: 33 additions & 29 deletions red/nodes/flows/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,30 @@ var started = false;

var activeNodesToFlow = {};

var typeEventRegistered = false;

function init(_settings, _storage) {
if (started) {
throw new Error("Cannot init without a stop");
}
settings = _settings;
storage = _storage;
started = false;
events.on('type-registered',function(type) {
if (activeFlowConfig && activeFlowConfig.missingTypes.length > 0) {
var i = activeFlowConfig.missingTypes.indexOf(type);
if (i != -1) {
log.info(log._("nodes.flows.registered-missing", {type:type}));
activeFlowConfig.missingTypes.splice(i,1);
if (activeFlowConfig.missingTypes.length === 0 && started) {
start();
if (!typeEventRegistered) {
events.on('type-registered',function(type) {
if (activeFlowConfig && activeFlowConfig.missingTypes.length > 0) {
var i = activeFlowConfig.missingTypes.indexOf(type);
if (i != -1) {
log.info(log._("nodes.flows.registered-missing", {type:type}));
activeFlowConfig.missingTypes.splice(i,1);
if (activeFlowConfig.missingTypes.length === 0 && started) {
start();
}
}
}
}
});
});
typeEventRegistered = true;
}
}
function load() {
return storage.getFlows().then(function(flows) {
Expand All @@ -67,21 +75,25 @@ function load() {
});
}

function setConfig(config,type) {
function setConfig(_config,type) {
var config = clone(_config);
type = type||"full";

var credentialsChanged = false;
var credentialSavePromise = null;
var configSavePromise = null;

var cleanConfig = clone(config);
cleanConfig.forEach(function(node) {
var diff;
var newFlowConfig = flowUtil.parseConfig(clone(config));
if (type !== 'full' && type !== 'load') {
diff = flowUtil.diffConfigs(activeFlowConfig,newFlowConfig);
}
config.forEach(function(node) {
if (node.credentials) {
credentials.extract(node);
credentialsChanged = true;
}
});

if (credentialsChanged) {
credentialSavePromise = credentials.save();
} else {
Expand All @@ -92,27 +104,19 @@ function setConfig(config,type) {
type = 'full';
} else {
configSavePromise = credentialSavePromise.then(function() {
return storage.saveFlows(cleanConfig);
return storage.saveFlows(config);
});
}

return configSavePromise
.then(function() {
var diff;
activeConfig = cleanConfig;
if (type === 'full') {
activeFlowConfig = flowUtil.parseConfig(clone(config));
} else {
var newConfig = flowUtil.parseConfig(clone(config));
diff = flowUtil.diffConfigs(activeFlowConfig,newConfig);
activeFlowConfig = newConfig;
}
credentials.clean(activeConfig).then(function() {
activeConfig = config;
activeFlowConfig = newFlowConfig;
return credentials.clean(activeConfig).then(function() {
if (started) {
return stop(type,diff).then(function() {
start(type,diff);
}).otherwise(function(err) {
console.log(err);
})
}
});
Expand All @@ -136,9 +140,9 @@ function getNode(id) {
}

function eachNode(cb) {
for (var id in activeFlowConfig.nodes) {
if (activeFlowConfig.nodes.hasOwnProperty(id)) {
cb(activeFlowConfig.nodes[id]);
for (var id in activeFlowConfig.allNodes) {
if (activeFlowConfig.allNodes.hasOwnProperty(id)) {
cb(activeFlowConfig.allNodes[id]);
}
}
}
Expand Down

0 comments on commit d1940a0

Please sign in to comment.