diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e52c31..39d9b23 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ All notable changes to the LaunchDarkly Node.js SDK will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org). +## [4.0.2] = 2018-03-14 +### Fixed +- In the Redis feature store, fixed synchronization problems that could cause a feature flag update to be missed if several of them happened in rapid succession. + ## [4.0.1] - 2018-03-09 ### Fixed - Any Redis connection failure will now be logged and will trigger reconnection attempts transparently. Previously, it caused an uncaught exception. Note that during a Redis outage, flag evaluations will use the last known value from the in-memory cache if available (if this cache was enabled with the `cache_ttl` parameter to `RedisFeatureStore`), or otherwise the default value. diff --git a/package.json b/package.json index ccd7964..9bee3fe 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "ldclient-node", - "version": "4.0.1", + "version": "4.0.2", "description": "LaunchDarkly SDK for Node.js", "main": "index.js", "scripts": { diff --git a/redis_feature_store.js b/redis_feature_store.js index 39fd6a6..e014cea 100644 --- a/redis_feature_store.js +++ b/redis_feature_store.js @@ -13,6 +13,7 @@ function RedisFeatureStore(redis_opts, cache_ttl, prefix, logger) { store = {}, items_prefix = (prefix || "launchdarkly") + ":", cache = cache_ttl ? new NodeCache({ stdTTL: cache_ttl}) : null, + updateQueue = [], inited = false, checked_init = false; @@ -42,10 +43,10 @@ function RedisFeatureStore(redis_opts, cache_ttl, prefix, logger) { } initialConnect = false; connected = true; - }) + }); client.on('end', function() { connected = false; - }) + }); // Allow driver programs to exit, even if the Redis socket is active client.unref(); @@ -88,6 +89,32 @@ function RedisFeatureStore(redis_opts, cache_ttl, prefix, logger) { }); } + function executePendingUpdates() { + if (updateQueue.length > 0) { + const entry = updateQueue[0]; + const fn = entry[0]; + const args = entry[1]; + const cb = entry[2]; + const newCb = function() { + updateQueue.shift(); + if (updateQueue.length > 0) { + setImmediate(executePendingUpdates); + } + cb && cb(); + }; + fn.apply(store, args.concat([newCb])); + } + } + + // Places an update operation on the queue. + var serializeFn = function(updateFn, fnArgs, cb) { + updateQueue.push([updateFn, fnArgs, cb]); + if (updateQueue.length == 1) { + // if nothing else is in progress, we can start this one right away + executePendingUpdates(); + } + }; + store.get = function(kind, key, cb) { cb = cb || noop; do_get(kind, key, function(item) { @@ -129,8 +156,11 @@ function RedisFeatureStore(redis_opts, cache_ttl, prefix, logger) { }; store.init = function(allData, cb) { + serializeFn(store._init, [allData], cb); + }; + + store._init = function(allData, cb) { var multi = client.multi(); - cb = cb || noop; if (cache_ttl) { cache.flushAll(); @@ -169,59 +199,61 @@ function RedisFeatureStore(redis_opts, cache_ttl, prefix, logger) { }; store.delete = function(kind, key, version, cb) { - var multi; - var baseKey = items_key(kind); - cb = cb || noop; - client.watch(baseKey); - multi = client.multi(); - - do_get(kind, key, function(item) { - if (item && item.version >= version) { - multi.discard(); - cb(); - } else { - deletedItem = { version: version, deleted: true }; - multi.hset(baseKey, key, JSON.stringify(deletedItem)); - multi.exec(function(err, replies) { - if (err) { - logger.error("Error deleting key " + key + " in '" + kind.namespace + "'", err); - } else if (cache_ttl) { - cache.set(cache_key(kind, key), deletedItem); - } - cb(); - }); - } - }); + serializeFn(store._delete, [kind, key, version], cb); }; - store.upsert = function(kind, item, cb) { - var multi; - var baseKey = items_key(kind); - var key = item.key; - cb = cb || noop; - client.watch(baseKey); - multi = client.multi(); + store._delete = function(kind, key, version, cb) { + var deletedItem = { key: key, version: version, deleted: true }; + updateItemWithVersioning(kind, deletedItem, cb, + function(err) { + if (err) { + logger.error("Error deleting key " + key + " in '" + kind.namespace + "'", err); + } + }); + } - do_get(kind, key, function(original) { - if (original && original.version >= item.version) { - cb(); - return; - } + store.upsert = function(kind, item, cb) { + serializeFn(store._upsert, [kind, item], cb); + }; - multi.hset(baseKey, key, JSON.stringify(item)); - multi.exec(function(err, replies) { + store._upsert = function(kind, item, cb) { + updateItemWithVersioning(kind, item, cb, + function(err) { if (err) { logger.error("Error upserting key " + key + " in '" + kind.namespace + "'", err); - } else { - if (cache_ttl) { - cache.set(cache_key(kind, key), item); - } } - cb(); }); + } + function updateItemWithVersioning(kind, newItem, cb, resultFn) { + client.watch(items_key(kind)); + var multi = client.multi(); + // test_transaction_hook is instrumentation, set only by the unit tests + var prepare = store.test_transaction_hook || function(prepareCb) { prepareCb(); }; + prepare(function() { + do_get(kind, newItem.key, function(oldItem) { + if (oldItem && oldItem.version >= newItem.version) { + multi.discard(); + cb(); + } else { + multi.hset(items_key(kind), newItem.key, JSON.stringify(newItem)); + multi.exec(function(err, replies) { + if (!err && replies === null) { + // This means the EXEC failed because someone modified the watched key + logger.debug("Concurrent modification detected, retrying"); + updateItemWithVersioning(kind, newItem, cb, resultFn); + } else { + resultFn(err); + if (!err && cache_ttl) { + cache.set(cache_key(kind, newItem.key), newItem); + } + cb(); + } + }); + } + }); }); - }; + } store.initialized = function(cb) { cb = cb || noop; @@ -239,7 +271,7 @@ function RedisFeatureStore(redis_opts, cache_ttl, prefix, logger) { client.exists(items_key(dataKind.features), function(err, obj) { if (!err && obj) { inited = true; - } + } checked_init = true; cb(inited); }); diff --git a/test/feature_store_test_base.js b/test/feature_store_test_base.js index 981da51..5bb874e 100644 --- a/test/feature_store_test_base.js +++ b/test/feature_store_test_base.js @@ -97,6 +97,27 @@ function allFeatureStoreTests(makeStore) { }); }); + it('handles upsert race condition within same client correctly', function(done) { + var ver1 = { key: feature1.key, version: feature1.version + 1 }; + var ver2 = { key: feature1.key, version: feature1.version + 2 }; + initedStore(function(store) { + var counter = 0; + var combinedCallback = function() { + counter++; + if (counter == 2) { + store.get(dataKind.features, feature1.key, function(result) { + expect(result).toEqual(ver2); + done(); + }); + } + }; + // Deliberately do not wait for the first upsert to complete before starting the second, + // so their transactions will be interleaved unless we're correctly serializing updates + store.upsert(dataKind.features, ver2, combinedCallback); + store.upsert(dataKind.features, ver1, combinedCallback); + }); + }); + it('deletes with newer version', function(done) { initedStore(function(store) { store.delete(dataKind.features, feature1.key, feature1.version + 1, function(result) { diff --git a/test/redis_feature_store-test.js b/test/redis_feature_store-test.js index 5d16e0e..282d9dd 100644 --- a/test/redis_feature_store-test.js +++ b/test/redis_feature_store-test.js @@ -1,9 +1,54 @@ var RedisFeatureStore = require('../redis_feature_store'); var allFeatureStoreTests = require('./feature_store_test_base'); +var dataKind = require('../versioned_data_kind'); +var redis = require('redis'); describe('RedisFeatureStore', function() { - allFeatureStoreTests(function() { - redisOpts = { url: 'redis://localhost:6379' }; - return new RedisFeatureStore(redisOpts, 30000); - }) + var redisOpts = { url: 'redis://localhost:6379' }; + + function makeStore() { + return new RedisFeatureStore(redisOpts, 30000); + } + + allFeatureStoreTests(makeStore); + + it('handles upsert race condition against external client correctly', function(done) { + var store = makeStore(); + var otherClient = redis.createClient(redisOpts); + + var feature1 = { + key: 'foo', + version: 1 + }; + var intermediateVer = { key: feature1.key, version: feature1.version }; + var finalVer = { key: feature1.key, version: 10 }; + + var initData = {}; + initData[dataKind.features.namespace] = { + 'foo': feature1 + }; + + store.init(initData, function() { + var tries = 0; + // This function will be called in between the WATCH and the update transaction. + // We're testing that the store will detect this concurrent modification and will + // transparently retry the update. + store.test_transaction_hook = function(cb) { + if (tries < 3) { + tries++; + intermediateVer.version++; + otherClient.hset("launchdarkly:features", "foo", JSON.stringify(intermediateVer), cb); + } else { + cb(); + } + }; + store.upsert(dataKind.features, finalVer, function() { + store.get(dataKind.features, feature1.key, function(result) { + otherClient.quit(); + expect(result).toEqual(finalVer); + done(); + }); + }); + }); + }); });