From 0dad492e7c694e6502dc69b895e5cb015abbddf0 Mon Sep 17 00:00:00 2001 From: Michael Musgrove Date: Mon, 17 Apr 2023 17:11:03 +0100 Subject: [PATCH] JBTM-3762 cluster support --- .../arjuna/objectstore/slot/CloudId.java | 6 +-- .../arjuna/objectstore/slot/RedisSlots.java | 42 ++++++++++++++----- .../ts/arjuna/objectstore/RedisStoreTest.java | 35 ++++++++++++---- .../resources/redis-jbossts-properties.xml | 12 ++++++ 4 files changed, 74 insertions(+), 21 deletions(-) create mode 100644 rts/lra/coordinator/src/test/resources/redis-jbossts-properties.xml diff --git a/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/objectstore/slot/CloudId.java b/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/objectstore/slot/CloudId.java index 7b72565e41..882b1b48c6 100644 --- a/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/objectstore/slot/CloudId.java +++ b/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/objectstore/slot/CloudId.java @@ -21,11 +21,11 @@ public CloudId(String nodeId, String failoverGroupId) { } public CloudId(String nodeId, String failoverGroupId, String description) { - this.nodeId = nodeId; this.failoverGroupId = failoverGroupId; - this.id = String.format("%s:%s", nodeId, failoverGroupId); + this.nodeId = nodeId; + this.id = String.format("%s:%s", failoverGroupId, nodeId); this.description = description; - this.keyPattern = String.format("%s:*", id); + this.keyPattern = String.format("{%s}:%s:*", failoverGroupId, nodeId); // matches all keys if this failover group } public String allKeysPattern() { diff --git a/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/objectstore/slot/RedisSlots.java b/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/objectstore/slot/RedisSlots.java index 6c5e4c65fe..a81bcfc8b2 100644 --- a/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/objectstore/slot/RedisSlots.java +++ b/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/objectstore/slot/RedisSlots.java @@ -14,7 +14,7 @@ import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPool; -import redis.clients.jedis.Transaction; +import redis.clients.jedis.exceptions.JedisException; import redis.clients.jedis.params.ScanParams; import redis.clients.jedis.resps.ScanResult; @@ -24,6 +24,8 @@ import java.util.List; import java.util.Set; +import static redis.clients.jedis.params.ScanParams.SCAN_POINTER_START; + /** * Redis backed implementation of the SlotStore backend. * Ensure that your Redis installation is configured for @@ -86,7 +88,13 @@ private Set loadClustered() { for (ConnectionPool node : jedisCluster.getClusterNodes().values()) { try (Jedis j = new Jedis(node.getResource())) { - keys.addAll(j.keys(cloudId.keyPattern)); + // load keys matching this recovery manager +// String pattern = String.format("{%s}:%s:*", cloudId.failoverGroupId, cloudId.nodeId); + Set candidates = j.keys(cloudId.allKeysPattern()); //cloudId.keyPattern); + // filter out candidates that don't match this managers node id +//"{0}:migration-node:6" +// Collection actuals = candidates.stream().filter(s -> s.matches(pattern)).collect(Collectors.toList()); + keys.addAll(candidates); } } @@ -114,9 +122,12 @@ private void load(Set keys) { // initialise the remaining slots while (i < slots.length) { - // prefix the slot key with the cloudId and force keys for nodeId into the same hash slot + // prefix the slot key with the cloudId and force keys for nodeId + failoverId into the same hash slot // (using the curly brace notation) so that they will be stored on the same redis node - slots[i] = String.format("{%s}:%d", cloudId.id, i).getBytes(StandardCharsets.UTF_8); + // In this way we can perform multikey operations on a slot + // see https://redis.io/docs/reference/cluster-spec/ section "Key distribution model" for more info +// slots[i] = String.format("{%s}:%d", cloudId.id, i).getBytes(StandardCharsets.UTF_8); + slots[i] = String.format("{%s}:%s:%d", cloudId.failoverGroupId, cloudId.nodeId, i).getBytes(StandardCharsets.UTF_8); i += 1; } } @@ -163,7 +174,11 @@ public void write(int slot, byte[] data, boolean sync) throws IOException { public byte[] read(int slot) throws IOException { if (clustered) { try (JedisCluster jedis = new JedisCluster(hostAndPort)) { - return jedis.get(slots[slot]); + try { + return jedis.get(slots[slot]); + } catch (Exception e) { + throw new IOException(e); + } } } else { try (Jedis jedis = jedisPool.getResource()) { @@ -207,13 +222,18 @@ public boolean migrate(CloudId from, CloudId to) { } String keyPattern = from.allKeysPattern(); - int prefixLength = from.nodeId.length(); try (JedisCluster jedis = new JedisCluster(hostAndPort)) { for (String key : getKeys(keyPattern)) { - String newKey = to.nodeId + key.substring(prefixLength); - - jedis.rename(key, newKey); + String newKey = key.replace(from.nodeId, to.nodeId); + + try { + String res = jedis.rename(key, newKey); + System.out.printf("%s%n", res); + } catch (JedisException e) { + System.out.printf("%s%n", e.getMessage()); + return false; + } } } @@ -222,14 +242,14 @@ public boolean migrate(CloudId from, CloudId to) { private void getKeys(Jedis node, String keyPattern, Set keySet) { ScanParams scanParams = new ScanParams().count(100).match(keyPattern); - String cursor = ScanParams.SCAN_POINTER_START; + String cursor = SCAN_POINTER_START; do { ScanResult scanResult = node.scan(cursor, scanParams); List keys = scanResult.getResult(); keySet.addAll(keys); cursor = scanResult.getCursor(); - } while (!cursor.equals(ScanParams.SCAN_POINTER_START)); + } while (!cursor.equals(SCAN_POINTER_START)); } private Set getKeys(String keyPattern) { diff --git a/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/objectstore/RedisStoreTest.java b/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/objectstore/RedisStoreTest.java index 2f9e700118..013d3879ee 100644 --- a/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/objectstore/RedisStoreTest.java +++ b/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/objectstore/RedisStoreTest.java @@ -52,7 +52,6 @@ public class RedisStoreTest { private static RedisSlots redisSlots; private static RecoveryStore recoveryStore; private static boolean clustered; - private static Boolean redisAvailable = null; private static boolean isRedisRunning() { @@ -60,13 +59,33 @@ private static boolean isRedisRunning() { return redisAvailable; } - String uri = redisConfig.getRedisURI(); + if (clustered) { + try (JedisCluster jedisCluster = new JedisCluster(hostAndPort)) { + for (ConnectionPool node : jedisCluster.getClusterNodes().values()) { + try (Jedis ignore = new Jedis(node.getResource())) { + redisAvailable = true; + break; + } + } + } catch (Exception e) { + redisAvailable = false; + log.warnf("Skipping RedisStoreTests because Redis is not running on the configured endpoint %s:%d", + redisHost, redisPort); + } + } else { + try (JedisPool jedisPool = new JedisPool(redisHost, redisPort)) { + try (Jedis ignore = jedisPool.getResource()) { + redisAvailable = true; + } + } + } - try (Jedis ignored = new Jedis(uri)) { + try (Jedis ignored = new Jedis(redisConfig.getRedisHost(), redisConfig.getRedisPort())) { redisAvailable = true; } catch (Exception e) { redisAvailable = false; - log.warnf("Skipping RedisStoreTests because Redis is not running on the configured endpoint %s", uri); + log.warnf("Skipping RedisStoreTests because Redis is not running on the configured endpoint %s:%d", + redisHost, redisPort); } return redisAvailable; @@ -82,7 +101,9 @@ public static void before() throws CoreEnvironmentBeanException { redisConfig.setBackingSlots(redisSlots); redisConfig.setClustered(true); - clustered = true; + clustered = redisConfig.isClustered(); + redisHost = redisConfig.getRedisHost(); + redisPort = redisConfig.getRedisPort(); BeanPopulator.getDefaultInstance(ObjectStoreEnvironmentBean.class). setObjectStoreType(SlotStoreAdaptor.class.getName()); @@ -292,8 +313,8 @@ public void testMove() throws ObjectStoreException, IOException { // only loads keys corresponding to this nodeId, ignoring ones corresponding to toNodeId) RecoveryStore finalRecoveryStore = recoveryStore; Assertions.assertThrows(ObjectStoreException.class, () -> { - InputObjectState buff = finalRecoveryStore.read_committed(uid, typeName); - assertEquals(value, buff.unpackString()); + InputObjectState buff = finalRecoveryStore.read_committed(uid, typeName); // should throw an exception + assertEquals(value, buff.unpackString()); // should not be reached }); keysAfter = delKeys(toNodeId + ":*"); // clean up diff --git a/rts/lra/coordinator/src/test/resources/redis-jbossts-properties.xml b/rts/lra/coordinator/src/test/resources/redis-jbossts-properties.xml new file mode 100644 index 0000000000..0181efd686 --- /dev/null +++ b/rts/lra/coordinator/src/test/resources/redis-jbossts-properties.xml @@ -0,0 +1,12 @@ + + + from-node + io.narayana.lra.coordinator.internal.LRARecoveryModule + + com.arjuna.ats.internal.arjuna.objectstore.slot.SlotStoreAdaptor + com.arjuna.ats.internal.arjuna.objectstore.slot.SlotStoreAdaptor + + com.arjuna.ats.internal.arjuna.objectstore.slot.RedisSlots + + false +