Skip to content
This repository was archived by the owner on May 30, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

All notable changes to the LaunchDarkly Node.js SDK will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org).

## [4.0.2] = 2018-03-14
### Fixed
- In the Redis feature store, fixed synchronization problems that could cause a feature flag update to be missed if several of them happened in rapid succession.

## [4.0.1] - 2018-03-09
### Fixed
- Any Redis connection failure will now be logged and will trigger reconnection attempts transparently. Previously, it caused an uncaught exception. Note that during a Redis outage, flag evaluations will use the last known value from the in-memory cache if available (if this cache was enabled with the `cache_ttl` parameter to `RedisFeatureStore`), or otherwise the default value.
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "ldclient-node",
"version": "4.0.1",
"version": "4.0.2",
"description": "LaunchDarkly SDK for Node.js",
"main": "index.js",
"scripts": {
Expand Down
126 changes: 79 additions & 47 deletions redis_feature_store.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ function RedisFeatureStore(redis_opts, cache_ttl, prefix, logger) {
store = {},
items_prefix = (prefix || "launchdarkly") + ":",
cache = cache_ttl ? new NodeCache({ stdTTL: cache_ttl}) : null,
updateQueue = [],
inited = false,
checked_init = false;

Expand Down Expand Up @@ -42,10 +43,10 @@ function RedisFeatureStore(redis_opts, cache_ttl, prefix, logger) {
}
initialConnect = false;
connected = true;
})
});
client.on('end', function() {
connected = false;
})
});

// Allow driver programs to exit, even if the Redis socket is active
client.unref();
Expand Down Expand Up @@ -88,6 +89,32 @@ function RedisFeatureStore(redis_opts, cache_ttl, prefix, logger) {
});
}

function executePendingUpdates() {
if (updateQueue.length > 0) {
const entry = updateQueue[0];
const fn = entry[0];
const args = entry[1];
const cb = entry[2];
const newCb = function() {
updateQueue.shift();
if (updateQueue.length > 0) {
setImmediate(executePendingUpdates);
}
cb && cb();
};
fn.apply(store, args.concat([newCb]));
}
}

// Places an update operation on the queue.
var serializeFn = function(updateFn, fnArgs, cb) {
updateQueue.push([updateFn, fnArgs, cb]);
if (updateQueue.length == 1) {
// if nothing else is in progress, we can start this one right away
executePendingUpdates();
}
};

store.get = function(kind, key, cb) {
cb = cb || noop;
do_get(kind, key, function(item) {
Expand Down Expand Up @@ -129,8 +156,11 @@ function RedisFeatureStore(redis_opts, cache_ttl, prefix, logger) {
};

store.init = function(allData, cb) {
serializeFn(store._init, [allData], cb);
};

store._init = function(allData, cb) {
var multi = client.multi();
cb = cb || noop;

if (cache_ttl) {
cache.flushAll();
Expand Down Expand Up @@ -169,59 +199,61 @@ function RedisFeatureStore(redis_opts, cache_ttl, prefix, logger) {
};

store.delete = function(kind, key, version, cb) {
var multi;
var baseKey = items_key(kind);
cb = cb || noop;
client.watch(baseKey);
multi = client.multi();

do_get(kind, key, function(item) {
if (item && item.version >= version) {
multi.discard();
cb();
} else {
deletedItem = { version: version, deleted: true };
multi.hset(baseKey, key, JSON.stringify(deletedItem));
multi.exec(function(err, replies) {
if (err) {
logger.error("Error deleting key " + key + " in '" + kind.namespace + "'", err);
} else if (cache_ttl) {
cache.set(cache_key(kind, key), deletedItem);
}
cb();
});
}
});
serializeFn(store._delete, [kind, key, version], cb);
};

store.upsert = function(kind, item, cb) {
var multi;
var baseKey = items_key(kind);
var key = item.key;
cb = cb || noop;
client.watch(baseKey);
multi = client.multi();
store._delete = function(kind, key, version, cb) {
var deletedItem = { key: key, version: version, deleted: true };
updateItemWithVersioning(kind, deletedItem, cb,
function(err) {
if (err) {
logger.error("Error deleting key " + key + " in '" + kind.namespace + "'", err);
}
});
}

do_get(kind, key, function(original) {
if (original && original.version >= item.version) {
cb();
return;
}
store.upsert = function(kind, item, cb) {
serializeFn(store._upsert, [kind, item], cb);
};

multi.hset(baseKey, key, JSON.stringify(item));
multi.exec(function(err, replies) {
store._upsert = function(kind, item, cb) {
updateItemWithVersioning(kind, item, cb,
function(err) {
if (err) {
logger.error("Error upserting key " + key + " in '" + kind.namespace + "'", err);
} else {
if (cache_ttl) {
cache.set(cache_key(kind, key), item);
}
}
cb();
});
}

function updateItemWithVersioning(kind, newItem, cb, resultFn) {
client.watch(items_key(kind));
var multi = client.multi();
// test_transaction_hook is instrumentation, set only by the unit tests
var prepare = store.test_transaction_hook || function(prepareCb) { prepareCb(); };
prepare(function() {
do_get(kind, newItem.key, function(oldItem) {
if (oldItem && oldItem.version >= newItem.version) {
multi.discard();
cb();
} else {
multi.hset(items_key(kind), newItem.key, JSON.stringify(newItem));
multi.exec(function(err, replies) {
if (!err && replies === null) {
// This means the EXEC failed because someone modified the watched key
logger.debug("Concurrent modification detected, retrying");
updateItemWithVersioning(kind, newItem, cb, resultFn);
} else {
resultFn(err);
if (!err && cache_ttl) {
cache.set(cache_key(kind, newItem.key), newItem);
}
cb();
}
});
}
});
});
};
}

store.initialized = function(cb) {
cb = cb || noop;
Expand All @@ -239,7 +271,7 @@ function RedisFeatureStore(redis_opts, cache_ttl, prefix, logger) {
client.exists(items_key(dataKind.features), function(err, obj) {
if (!err && obj) {
inited = true;
}
}
checked_init = true;
cb(inited);
});
Expand Down
21 changes: 21 additions & 0 deletions test/feature_store_test_base.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,27 @@ function allFeatureStoreTests(makeStore) {
});
});

it('handles upsert race condition within same client correctly', function(done) {
var ver1 = { key: feature1.key, version: feature1.version + 1 };
var ver2 = { key: feature1.key, version: feature1.version + 2 };
initedStore(function(store) {
var counter = 0;
var combinedCallback = function() {
counter++;
if (counter == 2) {
store.get(dataKind.features, feature1.key, function(result) {
expect(result).toEqual(ver2);
done();
});
}
};
// Deliberately do not wait for the first upsert to complete before starting the second,
// so their transactions will be interleaved unless we're correctly serializing updates
store.upsert(dataKind.features, ver2, combinedCallback);
store.upsert(dataKind.features, ver1, combinedCallback);
});
});

it('deletes with newer version', function(done) {
initedStore(function(store) {
store.delete(dataKind.features, feature1.key, feature1.version + 1, function(result) {
Expand Down
53 changes: 49 additions & 4 deletions test/redis_feature_store-test.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,54 @@
var RedisFeatureStore = require('../redis_feature_store');
var allFeatureStoreTests = require('./feature_store_test_base');
var dataKind = require('../versioned_data_kind');
var redis = require('redis');

describe('RedisFeatureStore', function() {
allFeatureStoreTests(function() {
redisOpts = { url: 'redis://localhost:6379' };
return new RedisFeatureStore(redisOpts, 30000);
})
var redisOpts = { url: 'redis://localhost:6379' };

function makeStore() {
return new RedisFeatureStore(redisOpts, 30000);
}

allFeatureStoreTests(makeStore);

it('handles upsert race condition against external client correctly', function(done) {
var store = makeStore();
var otherClient = redis.createClient(redisOpts);

var feature1 = {
key: 'foo',
version: 1
};
var intermediateVer = { key: feature1.key, version: feature1.version };
var finalVer = { key: feature1.key, version: 10 };

var initData = {};
initData[dataKind.features.namespace] = {
'foo': feature1
};

store.init(initData, function() {
var tries = 0;
// This function will be called in between the WATCH and the update transaction.
// We're testing that the store will detect this concurrent modification and will
// transparently retry the update.
store.test_transaction_hook = function(cb) {
if (tries < 3) {
tries++;
intermediateVer.version++;
otherClient.hset("launchdarkly:features", "foo", JSON.stringify(intermediateVer), cb);
} else {
cb();
}
};
store.upsert(dataKind.features, finalVer, function() {
store.get(dataKind.features, feature1.key, function(result) {
otherClient.quit();
expect(result).toEqual(finalVer);
done();
});
});
});
});
});