Skip to content

Commit

Permalink
clone msg for every single line output
Browse files Browse the repository at this point in the history
(this may not be desired)
Add tests to ensure properties go trhough
  • Loading branch information
Dave Conway-Jones committed Oct 25, 2019
1 parent c8b8bed commit 7c4e782
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 33 deletions.
54 changes: 21 additions & 33 deletions packages/node_modules/@node-red/nodes/core/storage/10-file.js
Expand Up @@ -140,8 +140,8 @@ module.exports = function(RED) {
try {
var stat = fs.statSync(filename);
node.wstreamIno = stat.ino;
} catch(err) {
}
}
catch(err) { }
});
node.wstream.on("error", function(err) {
node.error(RED._("file.errors.appendfail",{error:err.toString()}),msg);
Expand Down Expand Up @@ -276,7 +276,6 @@ module.exports = function(RED) {
ch = "\n";
type = "string";
}
var hwm;
var getout = false;

var rs = fs.createReadStream(filename)
Expand All @@ -290,30 +289,24 @@ module.exports = function(RED) {
spare += decode(chunk, node.encoding);
var bits = spare.split("\n");
for (var i=0; i < bits.length - 1; i++) {
var m = {
payload:bits[i],
topic:msg.topic,
filename:msg.filename,
parts:{index:count, ch:ch, type:type, id:msg._msgid}
}
var sendMessage = RED.util.cloneMessage(msg);
sendMessage.payload = bits[i];
sendMessage.parts = {index:count, ch:ch, type:type, id:msg._msgid};
count += 1;
nodeSend(m);
nodeSend(sendMessage);
}
spare = bits[i];
}
if (node.format === "stream") {
var m = {
payload:chunk,
topic:msg.topic,
filename:msg.filename,
parts:{index:count, ch:ch, type:type, id:msg._msgid}
}
var sendMessage = RED.util.cloneMessage(msg);
sendMessage.payload = chunk;
sendMessage.parts = {index:count, ch:ch, type:type, id:msg._msgid};
count += 1;
if (chunk.length < hwm) { // last chunk is smaller that high water mark = eof
getout = false;
m.parts.count = count;
sendMessage.parts.count = count;
}
nodeSend(m);
nodeSend(sendMessage);
}
}
else {
Expand All @@ -332,28 +325,23 @@ module.exports = function(RED) {
nodeDone();
})
.on('end', function() {
var sendMessage = RED.util.cloneMessage(msg);
if (node.chunk === false) {
if (node.format === "utf8") {
msg.payload = decode(lines, node.encoding);
sendMessage.payload = decode(lines, node.encoding);
}
else { msg.payload = lines; }
nodeSend(msg);
else { sendMessage.payload = lines; }
nodeSend(sendMessage);
}
else if (node.format === "lines") {
var m = { payload: spare,
parts: {
index: count,
count: count+1,
ch: ch,
type: type,
id: msg._msgid
}
};
nodeSend(m);
sendMessage.payload = spare;
sendMessage.parts = { index:count, count:count+1, ch:ch, type:type, id:msg._msgid };
nodeSend(sendMessage);
}
else if (getout) { // last chunk same size as high water mark - have to send empty extra packet.
var m = { parts:{index:count, count:count, ch:ch, type:type, id:msg._msgid} };
nodeSend(m);
delete sendMessage.payload;
sendMessage.parts = { parts:{index:count, count:count, ch:ch, type:type, id:msg._msgid} };
nodeSend(sendMessage);
}
nodeDone();
});
Expand Down
59 changes: 59 additions & 0 deletions test/nodes/core/storage/10-file_spec.js
Expand Up @@ -1192,6 +1192,43 @@ describe('file Nodes', function() {
});
});

it('should read in a file and output split lines with parts and preserve other properties', function(done) {
var flow = [{id:"fileInNode1", type:"file in", name: "fileInNode", filename:fileToTest, format:"lines", wires:[["n2"]]},
{id:"n2", type:"helper"}];
helper.load(fileNode, flow, function() {
var n1 = helper.getNode("fileInNode1");
var n2 = helper.getNode("n2");
var c = 0;
n2.on("input", function(msg) {
try {
msg.should.have.property('payload');
msg.payload.should.be.a.String();
msg.should.have.property('topic',"dujour");
msg.should.have.property('foo',"bar");
msg.should.have.property('parts');
msg.parts.should.have.property('index',c);
msg.parts.should.have.property('type','string');
msg.parts.should.have.property('ch','\n');
if (c === 0) {
msg.payload.should.equal("File message line 1");
}
if (c === 1) {
msg.payload.should.equal("File message line 2");
}
if (c === 2) {
msg.payload.should.equal("");
done();
}
c++;
}
catch(e) {
done(e);
}
});
n1.receive({payload:"",topic:"dujour",foo:"bar"});
});
});

it('should read in a file and output a buffer with parts', function(done) {
var flow = [{id:"fileInNode1", type:"file in", name: "fileInNode", filename:fileToTest, format:"stream", wires:[["n2"]]},
{id:"n2", type:"helper"}];
Expand All @@ -1212,6 +1249,28 @@ describe('file Nodes', function() {
});
});

it('should read in a file and output a buffer with parts and preserve other properties', function(done) {
var flow = [{id:"fileInNode1", type:"file in", name: "fileInNode", filename:fileToTest, format:"stream", wires:[["n2"]]},
{id:"n2", type:"helper"}];
helper.load(fileNode, flow, function() {
var n1 = helper.getNode("fileInNode1");
var n2 = helper.getNode("n2");
n2.on("input", function(msg) {
msg.should.have.property('payload');
msg.should.have.property('topic',"dujour");
msg.should.have.property('foo',"bar");
Buffer.isBuffer(msg.payload).should.be.true();
msg.payload.should.have.length(40);
msg.should.have.property('parts');
msg.parts.should.have.property('count',1);
msg.parts.should.have.property('type','buffer');
msg.parts.should.have.property('ch','');
done();
});
n1.receive({payload:"",topic:"dujour",foo:"bar"});
});
});

it('should warn if no filename set', function(done) {
var flow = [{id:"fileInNode1", type:"file in", name: "fileInNode", "format":""}];
helper.load(fileNode, flow, function() {
Expand Down

0 comments on commit 7c4e782

Please sign in to comment.