Skip to content

Commit

Permalink
HRJS-54 Add more buffer boundary checks
Browse files Browse the repository at this point in the history
* Added manual stress test spec file.
* Added README info on running manual stress tests.
  • Loading branch information
galderz authored and tristantarrant committed Apr 30, 2018
1 parent a7f1bc3 commit d86e97e
Show file tree
Hide file tree
Showing 10 changed files with 325 additions and 42 deletions.
8 changes: 8 additions & 0 deletions README.md
Expand Up @@ -676,6 +676,14 @@ To run individual tests, execute the following:

$ node node_modules/jasmine-node/lib/jasmine-node/cli.js spec/infinispan_local_spec.js --captureExceptions

# Manual stress tests

The testsuite now contains manual stress tests that take several minutes to run.
To run these tests, execute:

$ ./node_modules/.bin/jasmine-node spec-manual --captureExceptions


# Debugging

To debug tests with IDE:
Expand Down
37 changes: 34 additions & 3 deletions lib/codec.js
Expand Up @@ -53,9 +53,16 @@
return values[0];
};

exports.allDecoded = function(values, state) {
return values;
};
exports.allDecoded = function(expectedNumEntries) {
return function(values, state) {
if (values.length < expectedNumEntries) {
logger.tracef("Not enough to read (not array): %s", values);
return undefined;
}

return values;
};
}

exports.bytesEncoded = function(values, state) {
// If buffer is too big, slice it up so that it can be sent
Expand Down Expand Up @@ -315,6 +322,9 @@

function doDecodeSignedInt(bytebuf) {
var num = uncheckedReadVNum(bytebuf)();
if (!f.existy(num))
return undefined;

return (num & 1) == 0 ? num >>> 1 : ~(num >>> 1);
}

Expand All @@ -328,6 +338,12 @@

function uncheckedReadUByte(bytebuf) {
return function() {
if (1 > bytebuf.buf.length - bytebuf.offset) {
logger.tracef('Can not fully read unsigned byte (buffer size is %d, buffer offset %d)',
bytebuf.buf.length, bytebuf.offset);
return undefined;
}

return bytebuf.buf.readUInt8(bytebuf.offset++);
}
}
Expand All @@ -337,6 +353,12 @@
var res = 0, shift = 0, b;

do {
if (1 > bytebuf.buf.length - bytebuf.offset) {
logger.tracef('Can not fully read unsigned byte (buffer size is %d, buffer offset %d)',
bytebuf.buf.length, bytebuf.offset);
return undefined;
}

b = bytebuf.buf.readUInt8(bytebuf.offset++);
res += shift < 28
? (b & REST) << shift
Expand All @@ -350,6 +372,12 @@

function uncheckedReadLong(bytebuf) {
return function() {
if (8 > bytebuf.buf.length - bytebuf.offset) {
logger.tracef('Can not fully read 8 bytes (buffer size is %d, buffer offset %d)',
bytebuf.buf.length, bytebuf.offset);
return undefined;
}

var low = bytebuf.buf.readInt32BE(bytebuf.offset + 4);
var n = bytebuf.buf.readInt32BE(bytebuf.offset) * 4294967296.0 + low;
if (low < 0) n += 4294967296;
Expand All @@ -376,6 +404,9 @@
function uncheckedReadString(bytebuf) {
return function() {
var numBytes = uncheckedReadVNum(bytebuf)();
if (!f.existy(numBytes))
return undefined;

if (numBytes > bytebuf.buf.length - bytebuf.offset) {
logger.tracef('Can not fully read object with %d bytes (buffer size is %d, buffer offset %d)',
numBytes, bytebuf.buf.length, bytebuf.offset);
Expand Down
42 changes: 24 additions & 18 deletions lib/io.js
Expand Up @@ -90,31 +90,37 @@
var protocol = transport.getProtocol();
while (!replayable.isEmpty() && canDecodeMore) {
var bytebuf = replayable.mark();
var header = protocol.decodeHeader(bytebuf);

var topology = header.hasNewTopology
? transport.updateTopology(bytebuf)
: Promise.resolve();
var h = protocol.decodeHeader(bytebuf);
canDecodeMore = h.continue;
if (canDecodeMore) {
var header = h.result;
var topology = header.hasNewTopology
? transport.updateTopology(bytebuf)
: Promise.resolve();

canDecodeMore = f.existy(topology);
canDecodeMore = f.existy(topology);

if (canDecodeMore) {
if (protocol.isEvent(header)) {
canDecodeMore = protocol.decodeEvent(header, bytebuf);
} else {
if (protocol.isError(header)) {
canDecodeMore = decodeError(header, bytebuf, topology);
if (canDecodeMore) {
if (protocol.isEvent(header)) {
canDecodeMore = protocol.decodeEvent(header, bytebuf);
} else {
canDecodeMore = decodeRpcBody(header, bytebuf, topology);
if (protocol.isError(header)) {
canDecodeMore = decodeError(header, bytebuf, topology);
} else {
canDecodeMore = decodeRpcBody(header, bytebuf, topology);
}
}
}

if (!canDecodeMore)
rewind(); // Incomplete event or body, rewind
else
trim(header, bytebuf); // Trim buffer after reading event or body
if (!canDecodeMore)
rewind(); // Incomplete event or body, rewind
else
trim(header, bytebuf); // Trim buffer after reading event or body
} else {
rewind(); // Incomplete topology, rewind
}
} else {
rewind(); // Incomplete topology, rewind
rewind(); // Incomplete header, rewind
}
}
}
Expand Down
128 changes: 115 additions & 13 deletions lib/protocols.js
Expand Up @@ -8,28 +8,53 @@
var u = require('./utils');
var codec = require('./codec');

var logger = u.logger('protocols');

var INFINITE_LIFESPAN = 0x01, INFINITE_MAXIDLE = 0x02; // Duration flag masks
var MAGIC = 0xA0;

var DECODE_PAIR = f.actions(
[codec.decodeObject(), codec.decodeObject()],
function(values) { return {key: values[0], value: values[1]}; });
[codec.decodeObject(),
codec.decodeObject()
], function(values) {
if (values.length < 2) {
logger.tracef("Not enough to read (not array): %s", values);
return undefined;
}

return {key: values[0], value: values[1]};
});
var DECODE_STRING_PAIR = f.actions(
[codec.decodeString(), codec.decodeString()],
function(values) {
if (values.length < 2) {
logger.tracef("Not enough to read (not array): %s", values);
return undefined;
}

var pair = {};
pair[values[0]] = parseInt(values[1]);
return pair;
});
var DECODE_STRING = f.actions([codec.decodeString()], codec.lastDecoded);
var DECODE_TIMESTAMP = f.actions([codec.decodeLong(), codec.decodeVInt()], codec.allDecoded);
var DECODE_TIMESTAMP = f.actions([codec.decodeLong(), codec.decodeVInt()], codec.allDecoded(2));
var DECODE_UBYTE = f.actions([codec.decodeUByte()], codec.lastDecoded);

function hasOpt(opts, name) { return _.has(opts, name) && f.truthy(opts[name]); }
function hasOptPrev(opts) { return hasOpt(opts, 'previous'); }

function decodeTimestamp(flags, mask, headers, bytebuf) {
var timestamp = ((flags & mask) != mask) ? DECODE_TIMESTAMP(bytebuf) : [-1, -1];
var timestamp;
if (((flags & mask) != mask)) {
var decoded = DECODE_TIMESTAMP(bytebuf);
if (decoded.length < 2)
return undefined;

timestamp = decoded;
} else {
timestamp = [-1, -1];
}

return _.object(headers, timestamp);
}

Expand Down Expand Up @@ -189,6 +214,11 @@
codec.decodeUByte(), // status
codec.decodeUByte() // topology change marker
], function(values) {
if (values.length < 5) {
logger.tracef("Not enough to read (not array): %s", values);
return undefined;
}

return {
msgId: values[1], opCode: values[2],
status: values[3], hasNewTopology: values[4] == 1
Expand All @@ -199,10 +229,26 @@
function(values) { return {version: values[0], value: values[1]}; });
var DECODE_VINT = f.actions([codec.decodeVInt()], codec.lastDecoded);

var DECODE_TOPO_HEADER = f.actions([codec.decodeVInt(), codec.decodeVInt()],
function(values) { return {id: values[0], numServers: values[1]}; });
var DECODE_HASH_HEADER = f.actions([codec.decodeUByte(), codec.decodeVInt()],
function(values) { return {hashFunct: values[0], numSegments: values[1]}; });
var DECODE_TOPO_HEADER = f.actions(
[codec.decodeVInt(), codec.decodeVInt()],
function(values) {
if (values.length < 2) {
logger.tracef("Not enough to read (not array): %s", values);
return undefined;
}

return {id: values[0], numServers: values[1]};
});
var DECODE_HASH_HEADER = f.actions(
[codec.decodeUByte(), codec.decodeVInt()],
function(values) {
if (values.length < 2) {
logger.tracef("Not enough to read (not array): %s", values);
return undefined;
}

return {hashFunct: values[0], numSegments: values[1]};
});
var DECODE_HOST_PORT = f.actions([codec.decodeString(), codec.decodeShort()],
function(values) { return {host: values[0], port: values[1]}; });

Expand Down Expand Up @@ -247,9 +293,13 @@
decodeHeader: function(bytebuf) {
try {
var header = DECODE_HEADER(bytebuf);
logger.tracef("Read header(msgId=%d): opCode=%s, status=%s, hasNewTopology=%d",
header.msgId, header.opCode, header.status, header.hasNewTopology);
return header;
if (f.existy(header)) {
logger.tracef("Read header(msgId=%d): opCode=%s, status=%s, hasNewTopology=%d",
header.msgId, header.opCode, header.status, header.hasNewTopology);
return {continue: true, result: header};
}

return {continue: false, result: undefined};
} catch(ex) {
logger.error('Error decoding header, message id unknown:', ex);
throw ex;
Expand All @@ -272,8 +322,19 @@
decodeWithMeta: function(header, bytebuf) {
if (isSuccess(header.status)) {
var flags = DECODE_UBYTE(bytebuf);
if (!f.existy(flags))
return {continue: false};

logger.tracef('Decode with metadata, flags are: %d', flags);

var lifespan = decodeTimestamp(flags, INFINITE_LIFESPAN, ['created', 'lifespan'], bytebuf);
if (!f.existy(lifespan))
return {continue: false};

var idle = decodeTimestamp(flags, INFINITE_MAXIDLE, ['lastUsed', 'maxIdle'], bytebuf);
if (!f.existy(idle))
return {continue: false};

var versioned = DECODE_VERSIONED(bytebuf);
return f.existy(versioned)
? {result: f.merge(versioned, lifespan, idle), continue: true}
Expand Down Expand Up @@ -321,6 +382,9 @@
},
decodeTopology: function(bytebuf) {
var topologyHeader = DECODE_TOPO_HEADER(bytebuf);
if (!f.existy(topologyHeader))
return {done: false};

var addrs = [], i = 0;
while (i++ < topologyHeader.numServers) {
var addr = DECODE_HOST_PORT(bytebuf);
Expand All @@ -329,6 +393,9 @@
}

var hashHeader = DECODE_HASH_HEADER(bytebuf);
if (!f.existy(hashHeader))
return {done: false};

var segs = new Array(hashHeader.numSegments);
for (var j = 0; j < hashHeader.numSegments; j++) {
var numOwners = DECODE_UBYTE(bytebuf);
Expand Down Expand Up @@ -384,6 +451,11 @@
codec.decodeUByte(), // custom event marker
codec.decodeUByte()], // event is retried
function(values) {
if (values.length < 3) {
logger.tracef("Not enough to read (not array): %s", values);
return undefined;
}

return {listenerId: values[0], isCustom: values[1] == 1, isRetried: values[2] == 1}
});

Expand Down Expand Up @@ -464,6 +536,9 @@
},
decodeEvent: function(header, bytebuf) {
var common = DECODE_EVENT_COMMON(bytebuf);
if (!f.existy(common))
return false;

var listenerId = common.listenerId;
if (f.existy(listenerId)) {
var dispatcher = f.dispatch(
Expand All @@ -485,7 +560,14 @@
var DECODE_SEGMENTS_COUNT = f.actions(
[codec.decodeVariableBytes(), // segments byte array
codec.decodeVInt()], // number of entries
function(values) { return {segments: values[0], count: values[1]} });
function(values) {
if (values.length < 2) {
logger.tracef("Not enough to read (not array): %s", values);
return undefined;
}

return {segments: values[0], count: values[1]}
});
var DECODE_VERSION = f.actions(
[codec.decodeFixedBytes(8)],
function(values) { return {version: values[0]}; });
Expand Down Expand Up @@ -514,18 +596,38 @@
},
decodeNextEntries: function(header, bytebuf) {
var segmentsAndCount = DECODE_SEGMENTS_COUNT(bytebuf);
if (!f.existy(segmentsAndCount))
return {continue: false};

var count = segmentsAndCount.count;
logger.tracef('Iterator next contains %d entries', count);
if (count > 0) {
var projectionSize = DECODE_VINT(bytebuf); // projections size
logger.tracef('Projection size is %d', projectionSize);
if (!f.existy(projectionSize))
return {continue: false};

var entries = [], i = 0;
while (i++ < count) {
var hasMeta = DECODE_UBYTE(bytebuf) == 1; // meta
var meta = DECODE_UBYTE(bytebuf);
if (!f.existy(meta))
return {continue: false};

var hasMeta = meta == 1; // meta
var entry = {};
if (hasMeta) {
var flags = DECODE_UBYTE(bytebuf);
if (!f.existy(flags))
return {continue: false};

var lifespan = decodeTimestamp(flags, INFINITE_LIFESPAN, ['created', 'lifespan'], bytebuf);
if (!f.existy(lifespan))
return {continue: false};

var idle = decodeTimestamp(flags, INFINITE_MAXIDLE, ['lastUsed', 'maxIdle'], bytebuf);
if (!f.existy(idle))
return {continue: false};

var version = DECODE_VERSION(bytebuf);
entry = f.merge(lifespan, idle, version);
}
Expand Down
2 changes: 1 addition & 1 deletion package.json
Expand Up @@ -31,6 +31,6 @@
},
"devDependencies": {
"jasmine-node": "^1.14.5",
"request": "^2.79.0"
"request": "2.81.0"
}
}

0 comments on commit d86e97e

Please sign in to comment.