Skip to content

Commit

Permalink
Fix RedisAdvancedClusterAsyncCommandsImpl.msetnx return value #376
Browse files Browse the repository at this point in the history
MSETNX on RedisAdvancedClusterAsyncCommandsImpl returns now only true if all distributed MSETNX operations returned true. MSETNX has still a caveat when run on multiple cluster nodes:
MSETNX executed on a single node with a mixed set of existing and non-existing keys will not set the non-existing keys if a key of the operation already exists. That's different with the MSETNX cluster operation. It can't guarantee atomicity so some keys might get set on one node while another node does not set any keys at all.
  • Loading branch information
mp911de committed Oct 18, 2016
1 parent 4b3f980 commit be21fb5
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -225,14 +225,15 @@ public RedisFuture<Boolean> msetnx(Map<K, V> map) {
}

return new PipelinedRedisFuture<>(executions, objectPipelinedRedisFuture -> {

for (RedisFuture<Boolean> listRedisFuture : executions.values()) {
Boolean b = MultiNodeExecution.execute(() -> listRedisFuture.get());
if (b != null && b) {
return true;
if (b == null || !b) {
return false;
}
}

return false;
return !executions.isEmpty();
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public Mono<Long> mget(KeyValueStreamingChannel<K, V> channel, Iterable<K> keys)
public Mono<Boolean> msetnx(Map<K, V> map) {

return pipeliningWithMap(map, kvMap -> RedisAdvancedClusterReactiveCommandsImpl.super.msetnx(kvMap).flux(),
booleanFlux -> booleanFlux).reduce((accu, next) -> accu || next);
booleanFlux -> booleanFlux).reduce((accu, next) -> accu && next);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,23 +149,16 @@ public void msetCrossSlot() throws Exception {
}
}

protected Map<String, String> prepareMset() {
Map<String, String> mset = new HashMap<>();
for (char c = 'a'; c < 'z'; c++) {
String key = new String(new char[] { c, c, c });
mset.put(key, "value-" + key);
}
return mset;
}

@Test
public void msetnxCrossSlot() throws Exception {

Map<String, String> mset = prepareMset();

RedisFuture<Boolean> result = commands.msetnx(mset);

assertThat(result.get()).isTrue();
String key = mset.keySet().iterator().next();
Map<String, String> submap = Collections.singletonMap(key, mset.get(key));

assertThat(commands.msetnx(submap).get()).isTrue();
assertThat(commands.msetnx(mset).get()).isFalse();

for (String mykey : mset.keySet()) {
String s1 = commands.get(mykey).get();
Expand Down Expand Up @@ -585,5 +578,14 @@ private void writeKeysToTwoNodes() {
syncCommands.set(KEY_ON_NODE_1, value);
syncCommands.set(KEY_ON_NODE_2, value);
}

protected Map<String, String> prepareMset() {
Map<String, String> mset = new HashMap<>();
for (char c = 'a'; c < 'z'; c++) {
String key = new String(new char[] { c, c, c });
mset.put(key, "value-" + key);
}
return mset;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -77,11 +74,17 @@ public void msetCrossSlot() throws Exception {
@Test
public void msetnxCrossSlot() throws Exception {

assertThat(block(commands.msetnx(RandomKeys.MAP))).isTrue();
Map<String, String> mset = prepareMset();

for (String mykey : RandomKeys.KEYS) {
String key = mset.keySet().iterator().next();
Map<String, String> submap = Collections.singletonMap(key, mset.get(key));

assertThat(block(commands.msetnx(submap))).isTrue();
assertThat(block(commands.msetnx(mset))).isFalse();

for (String mykey : mset.keySet()) {
String s1 = syncCommands.get(mykey);
assertThat(s1).isEqualTo(RandomKeys.MAP.get(mykey));
assertThat(s1).isEqualTo(mset.get(mykey));
}
}

Expand Down Expand Up @@ -365,4 +368,13 @@ private void writeKeysToTwoNodes() {
syncCommands.set(KEY_ON_NODE_1, value);
syncCommands.set(KEY_ON_NODE_2, value);
}

protected Map<String, String> prepareMset() {
Map<String, String> mset = new HashMap<>();
for (char c = 'a'; c < 'z'; c++) {
String key = new String(new char[] { c, c, c });
mset.put(key, "value-" + key);
}
return mset;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ public void msetnx() throws Exception {
Map<String, String> map = new LinkedHashMap<>();
map.put("one", "1");
map.put("two", "2");
assertThat(redis.msetnx(map)).isTrue();
assertThat(redis.msetnx(map)).isFalse();
redis.del("one");
redis.del("two"); // probably set on a different node
assertThat(redis.msetnx(map)).isTrue();
assertThat(redis.get("two")).isEqualTo("2");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ public void msetnx() throws Exception {
Map<String, String> map = new LinkedHashMap<>();
map.put("one", "1");
map.put("two", "2");
assertThat(redis.msetnx(map)).isTrue();
assertThat(redis.msetnx(map)).isFalse();
redis.del("one");
redis.del("two"); // probably set on a different node
assertThat(redis.msetnx(map)).isTrue();
assertThat(redis.get("two")).isEqualTo("2");
}
Expand Down

0 comments on commit be21fb5

Please sign in to comment.