Skip to content

Commit

Permalink
Message
Browse files Browse the repository at this point in the history
  • Loading branch information
Rory A. Svage committed Aug 7, 2020
1 parent ec368ae commit f81cee0
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 88 deletions.
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -27,6 +27,7 @@
],
"dependencies": {
"ajv": "6.12.3",
"async-mutex": "0.2.4",
"basic-auth": "2.0.1",
"bcryptjs": "2.4.3",
"body-parser": "1.19.0",
Expand Down
203 changes: 115 additions & 88 deletions packages/node_modules/@node-red/runtime/lib/api/flows.js
Expand Up @@ -34,6 +34,8 @@
*/

var runtime;
var Mutex = require('async-mutex').Mutex;
const mutex = new Mutex();

var api = module.exports = {
init: function(_runtime) {
Expand Down Expand Up @@ -64,37 +66,39 @@ var api = module.exports = {
* @memberof @node-red/runtime_flows
*/
setFlows: function(opts) {
return new Promise(function(resolve,reject) {
return mutex.runExclusive(function() {
return new Promise(function(resolve,reject) {

var flows = opts.flows;
var deploymentType = opts.deploymentType||"full";
runtime.log.audit({event: "flows.set",type:deploymentType}, opts.req);
var flows = opts.flows;
var deploymentType = opts.deploymentType||"full";
runtime.log.audit({event: "flows.set",type:deploymentType}, opts.req);

var apiPromise;
if (deploymentType === 'reload') {
apiPromise = runtime.nodes.loadFlows(true);
} else {
if (flows.hasOwnProperty('rev')) {
var currentVersion = runtime.nodes.getFlows().rev;
if (currentVersion !== flows.rev) {
var err;
err = new Error();
err.code = "version_mismatch";
err.status = 409;
//TODO: log warning
return reject(err);
var apiPromise;
if (deploymentType === 'reload') {
apiPromise = runtime.nodes.loadFlows(true);
} else {
if (flows.hasOwnProperty('rev')) {
var currentVersion = runtime.nodes.getFlows().rev;
if (currentVersion !== flows.rev) {
var err;
err = new Error();
err.code = "version_mismatch";
err.status = 409;
//TODO: log warning
return reject(err);
}
}
apiPromise = runtime.nodes.setFlows(flows.flows,flows.credentials,deploymentType);
}
apiPromise = runtime.nodes.setFlows(flows.flows,flows.credentials,deploymentType);
}
apiPromise.then(function(flowId) {
return resolve({rev:flowId});
}).catch(function(err) {
runtime.log.warn(runtime.log._("api.flows.error-"+(deploymentType === 'reload'?'reload':'save'),{message:err.message}));
runtime.log.warn(err.stack);
return reject(err);
apiPromise.then(function(flowId) {
return resolve({rev:flowId});
}).catch(function(err) {
runtime.log.warn(runtime.log._("api.flows.error-"+(deploymentType === 'reload'?'reload':'save'),{message:err.message}));
runtime.log.warn(err.stack);
return reject(err);
});
});
});
});
},

/**
Expand All @@ -107,19 +111,23 @@ var api = module.exports = {
* @memberof @node-red/runtime_flows
*/
addFlow: function(opts) {
return new Promise(function(resolve,reject) {
var flow = opts.flow;
runtime.nodes.addFlow(flow).then(function(id) {
runtime.log.audit({event: "flow.add",id:id}, opts.req);
return resolve(id);
}).catch(function(err) {
runtime.log.audit({event: "flow.add",error:err.code||"unexpected_error",message:err.toString()}, opts.req);
err.status = 400;
return reject(err);
return mutex.runExclusive(function() {
return new Promise(function (resolve, reject) {
var flow = opts.flow;
runtime.nodes.addFlow(flow).then(function (id) {
runtime.log.audit({event: "flow.add", id: id}, opts.req);
return resolve(id);
}).catch(function (err) {
runtime.log.audit({
event: "flow.add",
error: err.code || "unexpected_error",
message: err.toString()
}, opts.req);
err.status = 400;
return reject(err);
})
})
})


});
},

/**
Expand All @@ -145,7 +153,6 @@ var api = module.exports = {
return reject(err);
}
})

},
/**
* Updates an existing flow configuration
Expand All @@ -158,33 +165,42 @@ var api = module.exports = {
* @memberof @node-red/runtime_flows
*/
updateFlow: function(opts) {
return new Promise(function (resolve,reject) {
var flow = opts.flow;
var id = opts.id;
try {
runtime.nodes.updateFlow(id,flow).then(function() {
runtime.log.audit({event: "flow.update",id:id}, opts.req);
return resolve(id);
}).catch(function(err) {
runtime.log.audit({event: "flow.update",error:err.code||"unexpected_error",message:err.toString()}, opts.req);
err.status = 400;
return reject(err);
})
} catch(err) {
if (err.code === 404) {
runtime.log.audit({event: "flow.update",id:id,error:"not_found"}, opts.req);
// TODO: this swap around of .code and .status isn't ideal
err.status = 404;
err.code = "not_found";
return reject(err);
} else {
runtime.log.audit({event: "flow.update",error:err.code||"unexpected_error",message:err.toString()}, opts.req);
err.status = 400;
return reject(err);
return mutex.runExclusive(function() {
return new Promise(function (resolve, reject) {
var flow = opts.flow;
var id = opts.id;
try {
runtime.nodes.updateFlow(id, flow).then(function () {
runtime.log.audit({event: "flow.update", id: id}, opts.req);
return resolve(id);
}).catch(function (err) {
runtime.log.audit({
event: "flow.update",
error: err.code || "unexpected_error",
message: err.toString()
}, opts.req);
err.status = 400;
return reject(err);
})
} catch (err) {
if (err.code === 404) {
runtime.log.audit({event: "flow.update", id: id, error: "not_found"}, opts.req);
// TODO: this swap around of .code and .status isn't ideal
err.status = 404;
err.code = "not_found";
return reject(err);
} else {
runtime.log.audit({
event: "flow.update",
error: err.code || "unexpected_error",
message: err.toString()
}, opts.req);
err.status = 400;
return reject(err);
}
}
}
});
});

},
/**
* Deletes a flow
Expand All @@ -196,30 +212,42 @@ var api = module.exports = {
* @memberof @node-red/runtime_flows
*/
deleteFlow: function(opts) {
return new Promise(function (resolve,reject) {
var id = opts.id;
try {
runtime.nodes.removeFlow(id).then(function() {
runtime.log.audit({event: "flow.remove",id:id}, opts.req);
return resolve();
}).catch(function(err) {
runtime.log.audit({event: "flow.remove",id:id,error:err.code||"unexpected_error",message:err.toString()}, opts.req);
err.status = 400;
return reject(err);
});
} catch(err) {
if (err.code === 404) {
runtime.log.audit({event: "flow.remove",id:id,error:"not_found"}, opts.req);
// TODO: this swap around of .code and .status isn't ideal
err.status = 404;
err.code = "not_found";
return reject(err);
} else {
runtime.log.audit({event: "flow.remove",id:id,error:err.code||"unexpected_error",message:err.toString()}, opts.req);
err.status = 400;
return reject(err);
return mutex.runExclusive(function() {
return new Promise(function (resolve, reject) {
var id = opts.id;
try {
runtime.nodes.removeFlow(id).then(function () {
runtime.log.audit({event: "flow.remove", id: id}, opts.req);
return resolve();
}).catch(function (err) {
runtime.log.audit({
event: "flow.remove",
id: id,
error: err.code || "unexpected_error",
message: err.toString()
}, opts.req);
err.status = 400;
return reject(err);
});
} catch (err) {
if (err.code === 404) {
runtime.log.audit({event: "flow.remove", id: id, error: "not_found"}, opts.req);
// TODO: this swap around of .code and .status isn't ideal
err.status = 404;
err.code = "not_found";
return reject(err);
} else {
runtime.log.audit({
event: "flow.remove",
id: id,
error: err.code || "unexpected_error",
message: err.toString()
}, opts.req);
err.status = 400;
return reject(err);
}
}
}
});
});
},

Expand Down Expand Up @@ -264,5 +292,4 @@ var api = module.exports = {
resolve(sendCredentials);
})
}

}
1 change: 1 addition & 0 deletions packages/node_modules/@node-red/runtime/package.json
Expand Up @@ -18,6 +18,7 @@
"dependencies": {
"@node-red/registry": "1.1.3",
"@node-red/util": "1.1.3",
"async-mutex": "0.2.4",
"clone": "2.1.2",
"express": "4.17.1",
"fs-extra": "8.1.0",
Expand Down

0 comments on commit f81cee0

Please sign in to comment.