diff --git a/README.md b/README.md index beec66c..e0c1c80 100644 --- a/README.md +++ b/README.md @@ -676,6 +676,14 @@ To run individual tests, execute the following: $ node node_modules/jasmine-node/lib/jasmine-node/cli.js spec/infinispan_local_spec.js --captureExceptions +# Manual stress tests + +The testsuite now contains manual stress tests that take several minutes to run. +To run these tests, execute: + + $ ./node_modules/.bin/jasmine-node spec-manual --captureExceptions + + # Debugging To debug tests with IDE: diff --git a/lib/codec.js b/lib/codec.js index 6d70103..7560be7 100644 --- a/lib/codec.js +++ b/lib/codec.js @@ -53,9 +53,16 @@ return values[0]; }; - exports.allDecoded = function(values, state) { - return values; - }; + exports.allDecoded = function(expectedNumEntries) { + return function(values, state) { + if (values.length < expectedNumEntries) { + logger.tracef("Not enough to read (not array): %s", values); + return undefined; + } + + return values; + }; + } exports.bytesEncoded = function(values, state) { // If buffer is too big, slice it up so that it can be sent @@ -315,6 +322,9 @@ function doDecodeSignedInt(bytebuf) { var num = uncheckedReadVNum(bytebuf)(); + if (!f.existy(num)) + return undefined; + return (num & 1) == 0 ? num >>> 1 : ~(num >>> 1); } @@ -328,6 +338,12 @@ function uncheckedReadUByte(bytebuf) { return function() { + if (1 > bytebuf.buf.length - bytebuf.offset) { + logger.tracef('Can not fully read unsigned byte (buffer size is %d, buffer offset %d)', + bytebuf.buf.length, bytebuf.offset); + return undefined; + } + return bytebuf.buf.readUInt8(bytebuf.offset++); } } @@ -337,6 +353,12 @@ var res = 0, shift = 0, b; do { + if (1 > bytebuf.buf.length - bytebuf.offset) { + logger.tracef('Can not fully read unsigned byte (buffer size is %d, buffer offset %d)', + bytebuf.buf.length, bytebuf.offset); + return undefined; + } + b = bytebuf.buf.readUInt8(bytebuf.offset++); res += shift < 28 ? (b & REST) << shift @@ -350,6 +372,12 @@ function uncheckedReadLong(bytebuf) { return function() { + if (8 > bytebuf.buf.length - bytebuf.offset) { + logger.tracef('Can not fully read 8 bytes (buffer size is %d, buffer offset %d)', + bytebuf.buf.length, bytebuf.offset); + return undefined; + } + var low = bytebuf.buf.readInt32BE(bytebuf.offset + 4); var n = bytebuf.buf.readInt32BE(bytebuf.offset) * 4294967296.0 + low; if (low < 0) n += 4294967296; @@ -376,6 +404,9 @@ function uncheckedReadString(bytebuf) { return function() { var numBytes = uncheckedReadVNum(bytebuf)(); + if (!f.existy(numBytes)) + return undefined; + if (numBytes > bytebuf.buf.length - bytebuf.offset) { logger.tracef('Can not fully read object with %d bytes (buffer size is %d, buffer offset %d)', numBytes, bytebuf.buf.length, bytebuf.offset); diff --git a/lib/io.js b/lib/io.js index 1c9afff..89ebccd 100644 --- a/lib/io.js +++ b/lib/io.js @@ -90,31 +90,37 @@ var protocol = transport.getProtocol(); while (!replayable.isEmpty() && canDecodeMore) { var bytebuf = replayable.mark(); - var header = protocol.decodeHeader(bytebuf); - var topology = header.hasNewTopology - ? transport.updateTopology(bytebuf) - : Promise.resolve(); + var h = protocol.decodeHeader(bytebuf); + canDecodeMore = h.continue; + if (canDecodeMore) { + var header = h.result; + var topology = header.hasNewTopology + ? transport.updateTopology(bytebuf) + : Promise.resolve(); - canDecodeMore = f.existy(topology); + canDecodeMore = f.existy(topology); - if (canDecodeMore) { - if (protocol.isEvent(header)) { - canDecodeMore = protocol.decodeEvent(header, bytebuf); - } else { - if (protocol.isError(header)) { - canDecodeMore = decodeError(header, bytebuf, topology); + if (canDecodeMore) { + if (protocol.isEvent(header)) { + canDecodeMore = protocol.decodeEvent(header, bytebuf); } else { - canDecodeMore = decodeRpcBody(header, bytebuf, topology); + if (protocol.isError(header)) { + canDecodeMore = decodeError(header, bytebuf, topology); + } else { + canDecodeMore = decodeRpcBody(header, bytebuf, topology); + } } - } - if (!canDecodeMore) - rewind(); // Incomplete event or body, rewind - else - trim(header, bytebuf); // Trim buffer after reading event or body + if (!canDecodeMore) + rewind(); // Incomplete event or body, rewind + else + trim(header, bytebuf); // Trim buffer after reading event or body + } else { + rewind(); // Incomplete topology, rewind + } } else { - rewind(); // Incomplete topology, rewind + rewind(); // Incomplete header, rewind } } } diff --git a/lib/protocols.js b/lib/protocols.js index a2208e7..42d2659 100644 --- a/lib/protocols.js +++ b/lib/protocols.js @@ -8,28 +8,53 @@ var u = require('./utils'); var codec = require('./codec'); + var logger = u.logger('protocols'); + var INFINITE_LIFESPAN = 0x01, INFINITE_MAXIDLE = 0x02; // Duration flag masks var MAGIC = 0xA0; var DECODE_PAIR = f.actions( - [codec.decodeObject(), codec.decodeObject()], - function(values) { return {key: values[0], value: values[1]}; }); + [codec.decodeObject(), + codec.decodeObject() + ], function(values) { + if (values.length < 2) { + logger.tracef("Not enough to read (not array): %s", values); + return undefined; + } + + return {key: values[0], value: values[1]}; + }); var DECODE_STRING_PAIR = f.actions( [codec.decodeString(), codec.decodeString()], function(values) { + if (values.length < 2) { + logger.tracef("Not enough to read (not array): %s", values); + return undefined; + } + var pair = {}; pair[values[0]] = parseInt(values[1]); return pair; }); var DECODE_STRING = f.actions([codec.decodeString()], codec.lastDecoded); - var DECODE_TIMESTAMP = f.actions([codec.decodeLong(), codec.decodeVInt()], codec.allDecoded); + var DECODE_TIMESTAMP = f.actions([codec.decodeLong(), codec.decodeVInt()], codec.allDecoded(2)); var DECODE_UBYTE = f.actions([codec.decodeUByte()], codec.lastDecoded); function hasOpt(opts, name) { return _.has(opts, name) && f.truthy(opts[name]); } function hasOptPrev(opts) { return hasOpt(opts, 'previous'); } function decodeTimestamp(flags, mask, headers, bytebuf) { - var timestamp = ((flags & mask) != mask) ? DECODE_TIMESTAMP(bytebuf) : [-1, -1]; + var timestamp; + if (((flags & mask) != mask)) { + var decoded = DECODE_TIMESTAMP(bytebuf); + if (decoded.length < 2) + return undefined; + + timestamp = decoded; + } else { + timestamp = [-1, -1]; + } + return _.object(headers, timestamp); } @@ -189,6 +214,11 @@ codec.decodeUByte(), // status codec.decodeUByte() // topology change marker ], function(values) { + if (values.length < 5) { + logger.tracef("Not enough to read (not array): %s", values); + return undefined; + } + return { msgId: values[1], opCode: values[2], status: values[3], hasNewTopology: values[4] == 1 @@ -199,10 +229,26 @@ function(values) { return {version: values[0], value: values[1]}; }); var DECODE_VINT = f.actions([codec.decodeVInt()], codec.lastDecoded); - var DECODE_TOPO_HEADER = f.actions([codec.decodeVInt(), codec.decodeVInt()], - function(values) { return {id: values[0], numServers: values[1]}; }); - var DECODE_HASH_HEADER = f.actions([codec.decodeUByte(), codec.decodeVInt()], - function(values) { return {hashFunct: values[0], numSegments: values[1]}; }); + var DECODE_TOPO_HEADER = f.actions( + [codec.decodeVInt(), codec.decodeVInt()], + function(values) { + if (values.length < 2) { + logger.tracef("Not enough to read (not array): %s", values); + return undefined; + } + + return {id: values[0], numServers: values[1]}; + }); + var DECODE_HASH_HEADER = f.actions( + [codec.decodeUByte(), codec.decodeVInt()], + function(values) { + if (values.length < 2) { + logger.tracef("Not enough to read (not array): %s", values); + return undefined; + } + + return {hashFunct: values[0], numSegments: values[1]}; + }); var DECODE_HOST_PORT = f.actions([codec.decodeString(), codec.decodeShort()], function(values) { return {host: values[0], port: values[1]}; }); @@ -247,9 +293,13 @@ decodeHeader: function(bytebuf) { try { var header = DECODE_HEADER(bytebuf); - logger.tracef("Read header(msgId=%d): opCode=%s, status=%s, hasNewTopology=%d", - header.msgId, header.opCode, header.status, header.hasNewTopology); - return header; + if (f.existy(header)) { + logger.tracef("Read header(msgId=%d): opCode=%s, status=%s, hasNewTopology=%d", + header.msgId, header.opCode, header.status, header.hasNewTopology); + return {continue: true, result: header}; + } + + return {continue: false, result: undefined}; } catch(ex) { logger.error('Error decoding header, message id unknown:', ex); throw ex; @@ -272,8 +322,19 @@ decodeWithMeta: function(header, bytebuf) { if (isSuccess(header.status)) { var flags = DECODE_UBYTE(bytebuf); + if (!f.existy(flags)) + return {continue: false}; + + logger.tracef('Decode with metadata, flags are: %d', flags); + var lifespan = decodeTimestamp(flags, INFINITE_LIFESPAN, ['created', 'lifespan'], bytebuf); + if (!f.existy(lifespan)) + return {continue: false}; + var idle = decodeTimestamp(flags, INFINITE_MAXIDLE, ['lastUsed', 'maxIdle'], bytebuf); + if (!f.existy(idle)) + return {continue: false}; + var versioned = DECODE_VERSIONED(bytebuf); return f.existy(versioned) ? {result: f.merge(versioned, lifespan, idle), continue: true} @@ -321,6 +382,9 @@ }, decodeTopology: function(bytebuf) { var topologyHeader = DECODE_TOPO_HEADER(bytebuf); + if (!f.existy(topologyHeader)) + return {done: false}; + var addrs = [], i = 0; while (i++ < topologyHeader.numServers) { var addr = DECODE_HOST_PORT(bytebuf); @@ -329,6 +393,9 @@ } var hashHeader = DECODE_HASH_HEADER(bytebuf); + if (!f.existy(hashHeader)) + return {done: false}; + var segs = new Array(hashHeader.numSegments); for (var j = 0; j < hashHeader.numSegments; j++) { var numOwners = DECODE_UBYTE(bytebuf); @@ -384,6 +451,11 @@ codec.decodeUByte(), // custom event marker codec.decodeUByte()], // event is retried function(values) { + if (values.length < 3) { + logger.tracef("Not enough to read (not array): %s", values); + return undefined; + } + return {listenerId: values[0], isCustom: values[1] == 1, isRetried: values[2] == 1} }); @@ -464,6 +536,9 @@ }, decodeEvent: function(header, bytebuf) { var common = DECODE_EVENT_COMMON(bytebuf); + if (!f.existy(common)) + return false; + var listenerId = common.listenerId; if (f.existy(listenerId)) { var dispatcher = f.dispatch( @@ -485,7 +560,14 @@ var DECODE_SEGMENTS_COUNT = f.actions( [codec.decodeVariableBytes(), // segments byte array codec.decodeVInt()], // number of entries - function(values) { return {segments: values[0], count: values[1]} }); + function(values) { + if (values.length < 2) { + logger.tracef("Not enough to read (not array): %s", values); + return undefined; + } + + return {segments: values[0], count: values[1]} + }); var DECODE_VERSION = f.actions( [codec.decodeFixedBytes(8)], function(values) { return {version: values[0]}; }); @@ -514,18 +596,38 @@ }, decodeNextEntries: function(header, bytebuf) { var segmentsAndCount = DECODE_SEGMENTS_COUNT(bytebuf); + if (!f.existy(segmentsAndCount)) + return {continue: false}; + var count = segmentsAndCount.count; logger.tracef('Iterator next contains %d entries', count); if (count > 0) { var projectionSize = DECODE_VINT(bytebuf); // projections size + logger.tracef('Projection size is %d', projectionSize); + if (!f.existy(projectionSize)) + return {continue: false}; + var entries = [], i = 0; while (i++ < count) { - var hasMeta = DECODE_UBYTE(bytebuf) == 1; // meta + var meta = DECODE_UBYTE(bytebuf); + if (!f.existy(meta)) + return {continue: false}; + + var hasMeta = meta == 1; // meta var entry = {}; if (hasMeta) { var flags = DECODE_UBYTE(bytebuf); + if (!f.existy(flags)) + return {continue: false}; + var lifespan = decodeTimestamp(flags, INFINITE_LIFESPAN, ['created', 'lifespan'], bytebuf); + if (!f.existy(lifespan)) + return {continue: false}; + var idle = decodeTimestamp(flags, INFINITE_MAXIDLE, ['lastUsed', 'maxIdle'], bytebuf); + if (!f.existy(idle)) + return {continue: false}; + var version = DECODE_VERSION(bytebuf); entry = f.merge(lifespan, idle, version); } diff --git a/package.json b/package.json index 949fb07..9489c39 100644 --- a/package.json +++ b/package.json @@ -31,6 +31,6 @@ }, "devDependencies": { "jasmine-node": "^1.14.5", - "request": "^2.79.0" + "request": "2.81.0" } } diff --git a/spec-manual/infinispan_manual_stress_get_spec.js b/spec-manual/infinispan_manual_stress_get_spec.js new file mode 100644 index 0000000..21c5fce --- /dev/null +++ b/spec-manual/infinispan_manual_stress_get_spec.js @@ -0,0 +1,48 @@ +var _ = require('underscore'); +var Promise = require('promise'); + +var t = require('../spec/utils/testing'); // Testing dependency + +describe('Infinispan local client under read stress load', function () { + var client = t.client(t.local); + + beforeEach(function (done) { + client + .then(t.assert(t.clear())) + .catch(t.failed(done)).finally(done); + }); + + it('can do many gets continuously', function (done) { + client.then(function (cl) { + var key = "stress-get"; + var put = cl.put(key, "test"); + + var gets = _.map(_.range(100000), function(i) { + var get = put.then( + function() { return cl.get(key); } + ); + + // Print out the key that was retrieved + return get.then( + function(value) { + //console.log('get(key)=' + value); + expect(value).toBe("test"); + } + ); + }); + + return Promise.all(gets) + .catch(t.failed(done)) + .finally(done); + }) + }, 120000); + + // Since Jasmine 1.3 does not have afterAll callback, this disconnect test must be last + it('disconnects client', function (done) { + client + .then(t.disconnect()) + .catch(t.failed(done)) + .finally(done); + }); + +}); \ No newline at end of file diff --git a/spec-manual/infinispan_manual_stress_iterate_spec.js b/spec-manual/infinispan_manual_stress_iterate_spec.js new file mode 100644 index 0000000..03bf01e --- /dev/null +++ b/spec-manual/infinispan_manual_stress_iterate_spec.js @@ -0,0 +1,84 @@ +var _ = require('underscore'); +var Promise = require('promise'); + +var f = require('../lib/functional'); +var t = require('../spec/utils/testing'); // Testing dependency + +describe('Infinispan local client under iterate stress load', function () { + var client = t.client(t.local); + + beforeEach(function (done) { + client + .then(t.assert(t.clear())) + .catch(t.failed(done)).finally(done); + }); + + it('can do many iterates continuously', function (done) { + client.then(function (cl) { + var singleInsert = cl.put("key0", JSON.stringify({ test: "test de prueba con un texto largo", token: 'Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.' })); + + var numEntries = 100000; + + var multiInsert = _.map(_.range(numEntries), function(i) { + return singleInsert.then(function () { + cl.put("key" + i, JSON.stringify({ test: "test de prueba con un texto largo", token: 'Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.' })) + }); + }); + + var multiIterate = singleInsert.then(function () { + console.log("1"); + return test(cl); + }) + .then(function (result) { + console.log("2, entries iterated: " + result.length); + expect(result.length).toBe(numEntries + 1); + return test(cl); + }) + .then(function (result) { + console.log("3, entries iterated: " + result.length); + expect(result.length).toBe(numEntries + 1); + return test(cl); + }) + .then(function (result) { + console.log("4, entries iterated: " + result.length); + expect(result.length).toBe(numEntries + 1); + }); + + // TODO multi iterate should return same result + + return Promise.all(f.cat([multiIterate], multiInsert)) + .catch(t.failed(done)) + .finally(done); + }) + }, 180000); + + function test (cl) { + var iterator = cl.iterator(100); + var entries = []; + return iterator + .then(function (it) { + function loop(promise, fn) { + return promise.then(fn).then(function (entry) { + return !entry.done ? loop(it.next(), fn) : entry.value; + }); + } + return loop(it.next(), function (entry) { + entries.push(entry); + //console.log(`entry ${JSON.stringify(entry)}`) + return entry; + }); + }) + .then(function () { + return entries; + }); + } + + // Since Jasmine 1.3 does not have afterAll callback, this disconnect test must be last + it('disconnects client', function (done) { + client + .then(t.disconnect()) + .catch(t.failed(done)) + .finally(done); + }); + +}); \ No newline at end of file diff --git a/spec/codec_spec.js b/spec/codec_spec.js index 12dea1b..d8d0ae5 100644 --- a/spec/codec_spec.js +++ b/spec/codec_spec.js @@ -1,4 +1,4 @@ -'use strict'; +//'use strict'; var _ = require('underscore'); var f = require('../lib/functional'); @@ -197,7 +197,7 @@ describe('Basic encode/decode', function() { var encoders = _.map(numbers, function (num) { return codec.encodeUByte(num); }); var decoders = _.map(numbers, function (num) { return codec.decodeUByte(); }); var encodeActions = f.actions(encoders, codec.bytesEncoded); - var decodeActions = f.actions(decoders, codec.allDecoded); + var decodeActions = f.actions(decoders, codec.allDecoded(10)); var bytebuf = t.assertEncode(t.newByteBuf(1), encodeActions, 10); expect(decodeActions({buf: bytebuf.buf, offset: 0})).toEqual(numbers); }); @@ -240,7 +240,7 @@ function assert(expected, size, encoder, decoder, bufferSize) { function encodeDecode(size, encoder, decoder, bufferSize) { var enc = f.actions(_.isArray(encoder) ? encoder : [encoder], codec.bytesEncoded); var bytebuf = t.assertEncode(t.newByteBuf(bufferSize), enc, size); - var dec = f.actions(_.isArray(decoder) ? decoder : [decoder], codec.allDecoded); + var dec = f.actions(_.isArray(decoder) ? decoder : [decoder], codec.allDecoded(decoder.length)); return dec({buf: bytebuf.buf, offset: 0}); } diff --git a/spec/configs/domain.xml b/spec/configs/domain.xml index 97b568a..b7a12b6 100644 --- a/spec/configs/domain.xml +++ b/spec/configs/domain.xml @@ -712,7 +712,9 @@ - + + + @@ -1017,7 +1019,9 @@ - + + + diff --git a/spec/protocols_spec.js b/spec/protocols_spec.js index 96e836c..d7673f9 100644 --- a/spec/protocols_spec.js +++ b/spec/protocols_spec.js @@ -6,8 +6,8 @@ var codec = require('../lib/codec'); var t = require('./utils/testing'); // Testing dependency -var singleExpiryDecode = f.actions([codec.decodeUByte(), codec.decodeVLong()], codec.allDecoded); -var constantExpiryDecode = f.actions([codec.decodeUByte()], codec.allDecoded); +var singleExpiryDecode = f.actions([codec.decodeUByte(), codec.decodeVLong()], codec.allDecoded(2)); +var constantExpiryDecode = f.actions([codec.decodeUByte()], codec.allDecoded(1)); function lifespan(unit) { return unit << 4 | 0x07; } function maxIdle(unit) { return 0x70 | unit; }