diff --git a/META-INF/MANIFEST.MF b/META-INF/MANIFEST.MF new file mode 100644 index 0000000000..445b9656e5 --- /dev/null +++ b/META-INF/MANIFEST.MF @@ -0,0 +1,7 @@ +Manifest-Version: 1.0 +Ant-Version: Apache Ant 1.7.1 +Created-By: 20.8-b03-424 (Apple Inc.) +Implementation-Title: Voldemort +Implementation-Version: 0.90.1 +Implementation-Vendor: LinkedIn + diff --git a/build.xml b/build.xml index 149472494d..df20f6f28a 100644 --- a/build.xml +++ b/build.xml @@ -5,7 +5,7 @@ - + @@ -76,6 +76,12 @@ + + + + + + @@ -141,6 +147,9 @@ + + + diff --git a/src/java/voldemort/client/ClientConfig.java b/src/java/voldemort/client/ClientConfig.java index 20ca113355..9bda12e4cb 100644 --- a/src/java/voldemort/client/ClientConfig.java +++ b/src/java/voldemort/client/ClientConfig.java @@ -80,6 +80,8 @@ public class ClientConfig { private volatile int maxBootstrapRetries = 2; private volatile String clientContextName = "default"; private volatile long asyncCheckMetadataInterval = 5000; + /* 12 hr refresh internval */ + private volatile long clientRegistryRefreshInterval = 3600 * 1000 * 12; public ClientConfig() {} @@ -721,4 +723,13 @@ public ClientConfig setAsyncCheckMetadataInterval(long asyncCheckMetadataInterva this.asyncCheckMetadataInterval = asyncCheckMetadataInterval; return this; } + + public long getClientRegistryRefreshInterval() { + return this.clientRegistryRefreshInterval; + } + + public ClientConfig setClientRegistryRefreshInterval(long clientRegistryRefrshInterval) { + this.clientRegistryRefreshInterval = clientRegistryRefrshInterval; + return this; + } } diff --git a/src/java/voldemort/client/ClientInfo.java b/src/java/voldemort/client/ClientInfo.java index 08886ad7d0..19958bfa58 100644 --- a/src/java/voldemort/client/ClientInfo.java +++ b/src/java/voldemort/client/ClientInfo.java @@ -23,14 +23,26 @@ public class ClientInfo implements Serializable { private int sequence; private String localHostName; private String deploymentPath; - - public ClientInfo(String storeName, String clientContext, int clientSequence, long bootstrapTime) { + private long updateTime; + private String releaseVersion; + + public ClientInfo(String storeName, + String clientContext, + int clientSequence, + long bootstrapTime, + String version) { this.bootstrapTime = bootstrapTime; this.storeName = storeName; this.context = clientContext; this.sequence = clientSequence; this.localHostName = createHostName(); this.deploymentPath = createDeploymentPath(); + this.updateTime = bootstrapTime; + this.releaseVersion = version; + + if(logger.isDebugEnabled()) { + logger.debug(this.toString()); + } } private String createDeploymentPath() { @@ -103,4 +115,54 @@ public void setLocalHostName(String localHostName) { public String getLocalHostName() { return localHostName; } + + public void setUpdateTime(long updateTime) { + this.updateTime = updateTime; + } + + public long getUpdateTime() { + return this.updateTime; + } + + public void setReleaseVersion(String version) { + this.releaseVersion = version; + } + + public String getReleaseVersion() { + return this.releaseVersion; + } + + @Override + public boolean equals(Object object) { + if(this == object) + return true; + if(object == null) + return false; + if(!object.getClass().equals(ClientInfo.class)) + return false; + ClientInfo clientInfo = (ClientInfo) object; + return (this.bootstrapTime == clientInfo.bootstrapTime) + && (this.context.equals(clientInfo.context)) + && (this.deploymentPath.equals(clientInfo.deploymentPath)) + && (this.localHostName.equals(clientInfo.localHostName)) + && (this.sequence == clientInfo.sequence) + && (this.storeName.equals(clientInfo.storeName)) + && (this.updateTime == clientInfo.updateTime) + && (this.releaseVersion == clientInfo.releaseVersion); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("boostrapTime[").append(bootstrapTime).append("], "); + builder.append("context[").append(context).append("], "); + builder.append("deploymentPath[").append(deploymentPath).append("], "); + builder.append("localHostName[").append(localHostName).append("], "); + builder.append("sequence[").append(sequence).append("], "); + builder.append("storeName[").append(storeName).append("], "); + builder.append("updateTime[").append(updateTime).append("], "); + builder.append("releaseVersion[").append(releaseVersion).append("]"); + return builder.toString(); + } + } diff --git a/src/java/voldemort/client/DefaultStoreClient.java b/src/java/voldemort/client/DefaultStoreClient.java index 4aac50d8ee..d82b614aff 100644 --- a/src/java/voldemort/client/DefaultStoreClient.java +++ b/src/java/voldemort/client/DefaultStoreClient.java @@ -37,6 +37,7 @@ import voldemort.store.Store; import voldemort.store.StoreCapabilityType; import voldemort.utils.JmxUtils; +import voldemort.utils.ManifestFileReader; import voldemort.utils.Utils; import voldemort.versioning.InconsistencyResolver; import voldemort.versioning.InconsistentDataException; @@ -98,7 +99,8 @@ public DefaultStoreClient(String storeName, this.clientInfo = new ClientInfo(storeName, clientContext, clientSequence, - System.currentTimeMillis()); + System.currentTimeMillis(), + ManifestFileReader.getReleaseVersion()); this.clientId = AbstractStoreClientFactory.generateClientId(clientInfo); this.config = config; @@ -120,8 +122,7 @@ public DefaultStoreClient(String storeName, } registerClient(); - logger.info("Voldemort client created: clientContext=" + clientContext + " clientSequence=" - + clientSequence + " clientId=" + clientId.toString()); + logger.info("Voldemort client created: " + clientId.toString() + "\n" + clientInfo); } private void registerClient() { diff --git a/src/java/voldemort/server/SystemStoreConstants.java b/src/java/voldemort/server/SystemStoreConstants.java deleted file mode 100644 index a7c546681c..0000000000 --- a/src/java/voldemort/server/SystemStoreConstants.java +++ /dev/null @@ -1,77 +0,0 @@ -package voldemort.server; - -/** - * The various system stores - */ -public class SystemStoreConstants { - - public static final String NAME_PREFIX = "voldsys$_"; - - public static enum SystemStoreName { - voldsys$_client_registry, - voldsys$_client_store_definition; - } - - public static final String SYSTEM_STORE_SCHEMA = "" - + " " - + " voldsys$_client_registry" - + " zone-routing" - + " proximity-handoff" - + " memory" - + " client" - + " 4" - + " " - + " 2" - + " 2" - + " " - + " 1" - + " 1" - + " " - + " string" - + " " - + " " - + " string" - + " " - + " 7" - + " " - - + " " - + " voldsys$_client_store_definition" - + " zone-routing" - + " proximity-handoff" - + " memory" - + " client" - + " 1" - + " 1" - + " 1" - + " " - + " string" - + " " - + " " - + " string" - + " " - + " 7" - + " " - - + " " - + " voldsys$_metadata_version" - + " local-pref-all-routing" - + " proximity-handoff" - + " memory" - + " client" - + " 1" - + " 1" - + " 1" - + " " - + " string" - + " " - + " " - + " java-serialization" - + " " + " " - - + ""; - - public static boolean isSystemStore(String storeName) { - return (null == storeName ? false : storeName.startsWith(NAME_PREFIX)); - } -} \ No newline at end of file diff --git a/src/java/voldemort/store/system/SystemStoreConstants.java b/src/java/voldemort/store/system/SystemStoreConstants.java index 32a1d2e5be..c818ed51fd 100644 --- a/src/java/voldemort/store/system/SystemStoreConstants.java +++ b/src/java/voldemort/store/system/SystemStoreConstants.java @@ -4,6 +4,7 @@ import java.util.List; import voldemort.store.StoreDefinition; +import voldemort.utils.RebalanceUtils; import voldemort.xml.StoreDefinitionsMapper; /** @@ -22,7 +23,7 @@ public static enum SystemStoreName { public static final String SYSTEM_STORE_SCHEMA = "" + " " + " voldsys$_client_registry" - + " zone-routing" + + " all-routing" + " proximity-handoff" + " memory" + " client" @@ -92,13 +93,7 @@ public static List getAllSystemStoreDefs() { } public static StoreDefinition getSystemStoreDef(String name) { - StoreDefinition storeDef = null; List allDefs = getAllSystemStoreDefs(); - for(StoreDefinition def: allDefs) { - if(name.equals(def.getName())) { - storeDef = def; - } - } - return storeDef; + return RebalanceUtils.getStoreDefinitionWithName(allDefs, name); } } \ No newline at end of file diff --git a/src/java/voldemort/utils/ManifestFileReader.java b/src/java/voldemort/utils/ManifestFileReader.java new file mode 100644 index 0000000000..a3287a3681 --- /dev/null +++ b/src/java/voldemort/utils/ManifestFileReader.java @@ -0,0 +1,27 @@ +package voldemort.utils; + +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Properties; + +import org.apache.log4j.Logger; + +public class ManifestFileReader { + + protected static final Logger logger = Logger.getLogger(ManifestFileReader.class); + + private static String MANIFEST_FILE = "META-INF/MANIFEST.MF"; + private static String RELEASE_VERSION_KEY = "Implementation-Version"; + + public static String getReleaseVersion() { + String version = null; + Properties properties = new Properties(); + try { + properties.load(new FileInputStream(MANIFEST_FILE)); + version = properties.getProperty(RELEASE_VERSION_KEY); + } catch(IOException e) { + logger.warn("Unable to load voldemort release version due to the following error:", e); + } + return version; + } +} diff --git a/src/java/voldemort/utils/RebalanceUtils.java b/src/java/voldemort/utils/RebalanceUtils.java index dbfb2a8df8..26cd4d6b44 100644 --- a/src/java/voldemort/utils/RebalanceUtils.java +++ b/src/java/voldemort/utils/RebalanceUtils.java @@ -50,6 +50,7 @@ import voldemort.cluster.Node; import voldemort.routing.RoutingStrategy; import voldemort.routing.RoutingStrategyFactory; +import voldemort.routing.RoutingStrategyType; import voldemort.server.VoldemortConfig; import voldemort.server.rebalance.VoldemortRebalancingException; import voldemort.store.StoreDefinition; @@ -486,11 +487,21 @@ public static boolean checkKeyBelongsToPartition(int nodeId, HashMap> replicaToPartitionList, Cluster cluster, StoreDefinition storeDef) { - List keyPartitions = new RoutingStrategyFactory().updateRoutingStrategy(storeDef, - cluster) - .getPartitionList(key); - List nodePartitions = cluster.getNodeById(nodeId).getPartitionIds(); - return checkKeyBelongsToPartition(keyPartitions, nodePartitions, replicaToPartitionList); + boolean checkResult = false; + if(storeDef.getRoutingStrategyType().equals(RoutingStrategyType.TO_ALL_STRATEGY) + || storeDef.getRoutingStrategyType() + .equals(RoutingStrategyType.TO_ALL_LOCAL_PREF_STRATEGY)) { + checkResult = true; + } else { + List keyPartitions = new RoutingStrategyFactory().updateRoutingStrategy(storeDef, + cluster) + .getPartitionList(key); + List nodePartitions = cluster.getNodeById(nodeId).getPartitionIds(); + checkResult = checkKeyBelongsToPartition(keyPartitions, + nodePartitions, + replicaToPartitionList); + } + return checkResult; } /** @@ -1395,6 +1406,7 @@ public static StoreDefinition getStoreDefinitionWithName(List s for(StoreDefinition storeDef: storeDefs) { if(storeDef.getName().compareTo(storeName) == 0) { def = storeDef; + break; } } diff --git a/test/common/voldemort/config/stores.xml b/test/common/voldemort/config/stores.xml index f25f33efa1..116726bfd4 100644 --- a/test/common/voldemort/config/stores.xml +++ b/test/common/voldemort/config/stores.xml @@ -185,4 +185,34 @@ all-handoff 10 + + test-store-eventual-1 + bdb + client + 2 + 1 + 1 + + string + UTF-8 + + + java-serialization + + + + test-store-eventual-2 + bdb + client + 2 + 1 + 1 + + string + UTF-8 + + + java-serialization + + diff --git a/test/unit/voldemort/client/ClientRegistryTest.java b/test/unit/voldemort/client/ClientRegistryTest.java new file mode 100644 index 0000000000..ccb254a29c --- /dev/null +++ b/test/unit/voldemort/client/ClientRegistryTest.java @@ -0,0 +1,699 @@ +package voldemort.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; + +import junit.framework.Assert; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import voldemort.ServerTestUtils; +import voldemort.TestUtils; +import voldemort.client.protocol.admin.AdminClient; +import voldemort.cluster.Cluster; +import voldemort.serialization.DefaultSerializerFactory; +import voldemort.serialization.Serializer; +import voldemort.serialization.SerializerFactory; +import voldemort.server.VoldemortServer; +import voldemort.store.socket.SocketStoreFactory; +import voldemort.store.socket.clientrequest.ClientRequestExecutorPool; +import voldemort.store.system.SystemStoreConstants; +import voldemort.utils.ByteArray; +import voldemort.utils.Pair; +import voldemort.versioning.Versioned; + +import com.google.common.collect.Lists; + +@SuppressWarnings( { "unchecked" }) +public class ClientRegistryTest { + + public static final String SERVER_LOCAL_URL = "tcp://localhost:"; + public static final String TEST_STORE_NAME = "test-store-eventual-1"; + public static final String TEST_STORE_NAME2 = "test-store-eventual-2"; + public static final String STORES_XML_FILE = "test/common/voldemort/config/stores.xml"; + public static final String CLIENT_CONTEXT_NAME = "testClientRegistryHappyPath"; + public static final String CLIENT_CONTEXT_NAME2 = "testClientRegistryUnhappyPath"; + public static final long CLIENT_REGISTRY_REFRSH_INTERVAL = 1000; + public static final int TOTAL_SERVERS = 2; + + private SocketStoreFactory socketStoreFactory = new ClientRequestExecutorPool(TOTAL_SERVERS, + 10000, + 100000, + 32 * 1024); + private static VoldemortServer[] servers = null; + private static int[] serverPorts = null; + private Cluster cluster = ServerTestUtils.getLocalCluster(2, new int[][] { { 0, 1, 2, 3 }, + { 4, 5, 6, 7 } }); + private static AdminClient adminClient; + + private SerializerFactory serializerFactory = new DefaultSerializerFactory(); + private Serializer valueSerializer = (Serializer) serializerFactory.getSerializer(SystemStoreConstants.getSystemStoreDef(SystemStoreConstants.SystemStoreName.voldsys$_client_registry.name()) + .getValueSerializer()); + private long startTime; + + @Before + public void setUp() throws Exception { + + if(null == servers) { + servers = new VoldemortServer[TOTAL_SERVERS]; + serverPorts = new int[TOTAL_SERVERS]; + + for(int i = 0; i < TOTAL_SERVERS; i++) { + servers[i] = ServerTestUtils.startVoldemortServer(socketStoreFactory, + ServerTestUtils.createServerConfig(true, + i, + TestUtils.createTempDir() + .getAbsolutePath(), + null, + STORES_XML_FILE, + new Properties()), + cluster); + serverPorts[i] = servers[i].getIdentityNode().getSocketPort(); + } + adminClient = ServerTestUtils.getAdminClient(cluster); + } + + startTime = System.currentTimeMillis(); + } + + @After + public void tearDown() throws Exception { + this.clearRegistryContent(); + } + + @Test + public void testHappyPath() { + List emptyPartitionList = Lists.newArrayList(); + ClientConfig clientConfig = new ClientConfig().setMaxThreads(4) + .setMaxTotalConnections(4) + .setMaxConnectionsPerNode(4) + .setBootstrapUrls(SERVER_LOCAL_URL + + serverPorts[0]) + .setClientContextName(CLIENT_CONTEXT_NAME) + .setClientRegistryRefreshInterval(CLIENT_REGISTRY_REFRSH_INTERVAL); + SocketStoreClientFactory socketFactory = new SocketStoreClientFactory(clientConfig); + StoreClient client = socketFactory.getStoreClient(TEST_STORE_NAME); + client.put("k", "v"); + Iterator>> it = adminClient.fetchEntries(0, + SystemStoreConstants.SystemStoreName.voldsys$_client_registry.name(), + emptyPartitionList, + null, + false); + ArrayList infoList = getClientRegistryContent(it); + assertEquals(TEST_STORE_NAME, infoList.get(0).getStoreName()); + assertEquals(CLIENT_CONTEXT_NAME, infoList.get(0).getContext()); + assertEquals(0, infoList.get(0).getClientSequence()); + assertTrue("Client registry bootstrap time incorrect", + startTime <= infoList.get(0).getBootstrapTime()); + Assert.assertNotNull("Client version is null", infoList.get(0).getReleaseVersion()); + assertEquals(1, infoList.size()); + + it = adminClient.fetchEntries(1, + SystemStoreConstants.SystemStoreName.voldsys$_client_registry.name(), + emptyPartitionList, + null, + false); + infoList = getClientRegistryContent(it); + assertEquals(TEST_STORE_NAME, infoList.get(0).getStoreName()); + assertEquals(CLIENT_CONTEXT_NAME, infoList.get(0).getContext()); + assertEquals(0, infoList.get(0).getClientSequence()); + assertTrue("Client registry bootstrap time incorrect", + startTime <= infoList.get(0).getBootstrapTime()); + Assert.assertNotNull("Client version is null", infoList.get(0).getReleaseVersion()); + assertEquals(1, infoList.size()); + + try { + Thread.sleep(CLIENT_REGISTRY_REFRSH_INTERVAL * 2); + } catch(InterruptedException e) {} + // now the periodical update has gone through, it shall be higher than + // the bootstrap time + it = adminClient.fetchEntries(1, + SystemStoreConstants.SystemStoreName.voldsys$_client_registry.name(), + emptyPartitionList, + null, + false); + infoList = getClientRegistryContent(it); + assertTrue("Client registry not updated.", + infoList.get(0).getBootstrapTime() < infoList.get(0).getUpdateTime()); + } + + @Test + public void testTwoClients() { + List emptyPartitionList = Lists.newArrayList(); + ClientConfig clientConfig = new ClientConfig().setMaxThreads(4) + .setMaxTotalConnections(4) + .setMaxConnectionsPerNode(4) + .setBootstrapUrls(SERVER_LOCAL_URL + + serverPorts[0]) + .setClientContextName(CLIENT_CONTEXT_NAME) + .setClientRegistryRefreshInterval(CLIENT_REGISTRY_REFRSH_INTERVAL); + SocketStoreClientFactory socketFactory = new SocketStoreClientFactory(clientConfig); + StoreClient client1 = socketFactory.getStoreClient(TEST_STORE_NAME); + StoreClient client2 = socketFactory.getStoreClient(TEST_STORE_NAME); + + client1.put("k1", "v1"); + client2.put("k2", "v2"); + + Iterator>> it = adminClient.fetchEntries(0, + SystemStoreConstants.SystemStoreName.voldsys$_client_registry.name(), + emptyPartitionList, + null, + false); + ArrayList infoList = getClientRegistryContent(it); + assertEquals(TEST_STORE_NAME, infoList.get(0).getStoreName()); + assertEquals(CLIENT_CONTEXT_NAME, infoList.get(0).getContext()); + assertTrue("Client registry sequence number incorrect", 1 >= infoList.get(0) + .getClientSequence()); + assertTrue("Client registry bootstrap time incorrect", + startTime <= infoList.get(0).getBootstrapTime()); + Assert.assertNotNull("Client version is null", infoList.get(0).getReleaseVersion()); + + assertEquals(TEST_STORE_NAME, infoList.get(1).getStoreName()); + assertEquals(CLIENT_CONTEXT_NAME, infoList.get(1).getContext()); + assertTrue("Client registry sequence number incorrect", 1 >= infoList.get(1) + .getClientSequence()); + assertTrue("Client registry bootstrap time incorrect", + startTime <= infoList.get(1).getBootstrapTime()); + Assert.assertNotNull("Client version is null", infoList.get(1).getReleaseVersion()); + assertEquals(infoList.size(), 2); + + it = adminClient.fetchEntries(1, + SystemStoreConstants.SystemStoreName.voldsys$_client_registry.name(), + emptyPartitionList, + null, + false); + infoList = getClientRegistryContent(it); + assertEquals(TEST_STORE_NAME, infoList.get(0).getStoreName()); + assertEquals(CLIENT_CONTEXT_NAME, infoList.get(0).getContext()); + assertTrue("Client registry sequence number incorrect", 1 >= infoList.get(0) + .getClientSequence()); + assertTrue("Client registry bootstrap time incorrect", + startTime <= infoList.get(0).getBootstrapTime()); + Assert.assertNotNull("Client version is null", infoList.get(0).getReleaseVersion()); + + assertEquals(TEST_STORE_NAME, infoList.get(1).getStoreName()); + assertEquals(CLIENT_CONTEXT_NAME, infoList.get(1).getContext()); + assertTrue("Client registry sequence number incorrect", 1 >= infoList.get(1) + .getClientSequence()); + assertTrue("Client registry bootstrap time incorrect", + startTime <= infoList.get(1).getBootstrapTime()); + Assert.assertNotNull("Client version is null", infoList.get(1).getReleaseVersion()); + + assertEquals(infoList.size(), 2); + + try { + Thread.sleep(CLIENT_REGISTRY_REFRSH_INTERVAL * 2); + } catch(InterruptedException e) {} + // now the periodical update has gone through, it shall be higher than + // the bootstrap time + it = adminClient.fetchEntries(1, + SystemStoreConstants.SystemStoreName.voldsys$_client_registry.name(), + emptyPartitionList, + null, + false); + infoList = getClientRegistryContent(it); + assertTrue("Client registry not updated.", + infoList.get(0).getBootstrapTime() < infoList.get(0).getUpdateTime()); + assertTrue("Client registry not updated.", + infoList.get(1).getBootstrapTime() < infoList.get(1).getUpdateTime()); + } + + @Test + public void testTwoStores() { + List emptyPartitionList = Lists.newArrayList(); + ClientConfig clientConfig = new ClientConfig().setMaxThreads(4) + .setMaxTotalConnections(4) + .setMaxConnectionsPerNode(4) + .setBootstrapUrls(SERVER_LOCAL_URL + + serverPorts[0]) + .setClientContextName(CLIENT_CONTEXT_NAME) + .setClientRegistryRefreshInterval(CLIENT_REGISTRY_REFRSH_INTERVAL); + SocketStoreClientFactory socketFactory = new SocketStoreClientFactory(clientConfig); + StoreClient client1 = socketFactory.getStoreClient(TEST_STORE_NAME); + StoreClient client2 = socketFactory.getStoreClient(TEST_STORE_NAME2); + + client1.put("k1", "v1"); + client2.put("k2", "v2"); + + Iterator>> it = adminClient.fetchEntries(0, + SystemStoreConstants.SystemStoreName.voldsys$_client_registry.name(), + emptyPartitionList, + null, + false); + ArrayList infoList = getClientRegistryContent(it); + + assertEquals(CLIENT_CONTEXT_NAME, infoList.get(0).getContext()); + assertTrue("Client registry bootstrap time incorrect", + startTime <= infoList.get(0).getBootstrapTime()); + Assert.assertNotNull("Client version is null", infoList.get(0).getReleaseVersion()); + + assertEquals(CLIENT_CONTEXT_NAME, infoList.get(1).getContext()); + assertTrue("Client registry bootstrap time incorrect", + startTime <= infoList.get(1).getBootstrapTime()); + Assert.assertNotNull("Client version is null", infoList.get(1).getReleaseVersion()); + + if(infoList.get(0).getStoreName().equals(TEST_STORE_NAME)) { + assertEquals(0, infoList.get(0).getClientSequence()); + assertEquals(TEST_STORE_NAME2, infoList.get(1).getStoreName()); + assertEquals(1, infoList.get(1).getClientSequence()); + assertTrue("Client registry bootstrap time incorrect", + infoList.get(1).getBootstrapTime() >= infoList.get(0).getBootstrapTime()); + } else { + assertEquals(TEST_STORE_NAME2, infoList.get(0).getStoreName()); + assertEquals(1, infoList.get(0).getClientSequence()); + assertEquals(TEST_STORE_NAME, infoList.get(1).getStoreName()); + assertEquals(0, infoList.get(1).getClientSequence()); + assertTrue("Client registry bootstrap time incorrect", + infoList.get(0).getBootstrapTime() >= infoList.get(1).getBootstrapTime()); + } + + it = adminClient.fetchEntries(1, + SystemStoreConstants.SystemStoreName.voldsys$_client_registry.name(), + emptyPartitionList, + null, + false); + infoList = getClientRegistryContent(it); + + assertEquals(CLIENT_CONTEXT_NAME, infoList.get(0).getContext()); + assertTrue("Client registry bootstrap time incorrect", + startTime <= infoList.get(0).getBootstrapTime()); + Assert.assertNotNull("Client version is null", infoList.get(0).getReleaseVersion()); + + assertEquals(CLIENT_CONTEXT_NAME, infoList.get(1).getContext()); + assertTrue("Client registry bootstrap time incorrect", + startTime <= infoList.get(1).getBootstrapTime()); + Assert.assertNotNull("Client version is null", infoList.get(1).getReleaseVersion()); + + if(infoList.get(0).getStoreName().equals(TEST_STORE_NAME)) { + assertEquals(0, infoList.get(0).getClientSequence()); + assertEquals(TEST_STORE_NAME2, infoList.get(1).getStoreName()); + assertEquals(1, infoList.get(1).getClientSequence()); + assertTrue("Client registry bootstrap time incorrect", + infoList.get(1).getBootstrapTime() >= infoList.get(0).getBootstrapTime()); + } else { + assertEquals(TEST_STORE_NAME2, infoList.get(0).getStoreName()); + assertEquals(1, infoList.get(0).getClientSequence()); + assertEquals(TEST_STORE_NAME, infoList.get(1).getStoreName()); + assertEquals(0, infoList.get(1).getClientSequence()); + assertTrue("Client registry bootstrap time incorrect", + infoList.get(0).getBootstrapTime() >= infoList.get(1).getBootstrapTime()); + } + + try { + Thread.sleep(CLIENT_REGISTRY_REFRSH_INTERVAL * 2); + } catch(InterruptedException e) {} + // now the periodical update has gone through, it shall be higher than + // the bootstrap time + it = adminClient.fetchEntries(1, + SystemStoreConstants.SystemStoreName.voldsys$_client_registry.name(), + emptyPartitionList, + null, + false); + infoList = getClientRegistryContent(it); + assertTrue("Client registry not updated.", + infoList.get(0).getBootstrapTime() < infoList.get(0).getUpdateTime()); + assertTrue("Client registry not updated.", + infoList.get(1).getBootstrapTime() < infoList.get(1).getUpdateTime()); + } + + @Test + public void testTwoFactories() { + List emptyPartitionList = Lists.newArrayList(); + ClientConfig clientConfig = new ClientConfig().setMaxThreads(4) + .setMaxTotalConnections(4) + .setMaxConnectionsPerNode(4) + .setBootstrapUrls(SERVER_LOCAL_URL + + serverPorts[0]) + .setClientContextName(CLIENT_CONTEXT_NAME) + .setClientRegistryRefreshInterval(CLIENT_REGISTRY_REFRSH_INTERVAL); + SocketStoreClientFactory socketFactory1 = new SocketStoreClientFactory(clientConfig); + + ClientConfig clientConfig2 = new ClientConfig().setMaxThreads(4) + .setMaxTotalConnections(4) + .setMaxConnectionsPerNode(4) + .setBootstrapUrls(SERVER_LOCAL_URL + + serverPorts[0]) + .setClientContextName(CLIENT_CONTEXT_NAME2); + SocketStoreClientFactory socketFactory2 = new SocketStoreClientFactory(clientConfig2); + + StoreClient client1 = socketFactory1.getStoreClient(TEST_STORE_NAME); + StoreClient client2 = socketFactory2.getStoreClient(TEST_STORE_NAME2); + + client1.put("k1", "v1"); + client2.put("k2", "v2"); + + Iterator>> it = adminClient.fetchEntries(0, + SystemStoreConstants.SystemStoreName.voldsys$_client_registry.name(), + emptyPartitionList, + null, + false); + ArrayList infoList = getClientRegistryContent(it); + + Assert.assertNotNull("Client version is null", infoList.get(0).getReleaseVersion()); + Assert.assertNotNull("Client version is null", infoList.get(1).getReleaseVersion()); + + if(infoList.get(0).getStoreName().equals(TEST_STORE_NAME)) { + assertEquals(CLIENT_CONTEXT_NAME, infoList.get(0).getContext()); + assertEquals(0, infoList.get(0).getClientSequence()); + assertTrue("Client registry bootstrap time incorrect", + startTime <= infoList.get(0).getBootstrapTime()); + + assertEquals(TEST_STORE_NAME2, infoList.get(1).getStoreName()); + assertEquals(CLIENT_CONTEXT_NAME2, infoList.get(1).getContext()); + assertEquals(0, infoList.get(1).getClientSequence()); + assertTrue("Client registry bootstrap time incorrect", + startTime <= infoList.get(1).getBootstrapTime()); + + assertTrue("Client registry bootstrap time incorrect", + infoList.get(1).getBootstrapTime() >= infoList.get(0).getBootstrapTime()); + + } else { + assertEquals(TEST_STORE_NAME2, infoList.get(0).getStoreName()); + assertEquals(CLIENT_CONTEXT_NAME2, infoList.get(0).getContext()); + assertEquals(0, infoList.get(0).getClientSequence()); + assertTrue("Client registry bootstrap time incorrect", + startTime <= infoList.get(0).getBootstrapTime()); + + assertEquals(TEST_STORE_NAME, infoList.get(1).getStoreName()); + assertEquals(CLIENT_CONTEXT_NAME, infoList.get(1).getContext()); + assertEquals(0, infoList.get(1).getClientSequence()); + assertTrue("Client registry bootstrap time incorrect", + startTime <= infoList.get(1).getBootstrapTime()); + + assertTrue("Client registry bootstrap time incorrect", + infoList.get(0).getBootstrapTime() >= infoList.get(1).getBootstrapTime()); + } + + it = adminClient.fetchEntries(1, + SystemStoreConstants.SystemStoreName.voldsys$_client_registry.name(), + emptyPartitionList, + null, + false); + infoList = getClientRegistryContent(it); + + Assert.assertNotNull("Client version is null", infoList.get(0).getReleaseVersion()); + Assert.assertNotNull("Client version is null", infoList.get(1).getReleaseVersion()); + + if(infoList.get(0).getStoreName().equals(TEST_STORE_NAME)) { + assertEquals(CLIENT_CONTEXT_NAME, infoList.get(0).getContext()); + assertEquals(0, infoList.get(0).getClientSequence()); + assertTrue("Client registry bootstrap time incorrect", + startTime <= infoList.get(0).getBootstrapTime()); + + assertEquals(TEST_STORE_NAME2, infoList.get(1).getStoreName()); + assertEquals(CLIENT_CONTEXT_NAME2, infoList.get(1).getContext()); + assertEquals(0, infoList.get(1).getClientSequence()); + assertTrue("Client registry bootstrap time incorrect", + startTime <= infoList.get(1).getBootstrapTime()); + + assertTrue("Client registry bootstrap time incorrect", + infoList.get(1).getBootstrapTime() >= infoList.get(0).getBootstrapTime()); + + } else { + assertEquals(TEST_STORE_NAME2, infoList.get(0).getStoreName()); + assertEquals(CLIENT_CONTEXT_NAME2, infoList.get(0).getContext()); + assertEquals(0, infoList.get(0).getClientSequence()); + assertTrue("Client registry bootstrap time incorrect", + startTime <= infoList.get(0).getBootstrapTime()); + + assertEquals(TEST_STORE_NAME, infoList.get(1).getStoreName()); + assertEquals(CLIENT_CONTEXT_NAME, infoList.get(1).getContext()); + assertEquals(0, infoList.get(1).getClientSequence()); + assertTrue("Client registry bootstrap time incorrect", + startTime <= infoList.get(1).getBootstrapTime()); + + assertTrue("Client registry bootstrap time incorrect", + infoList.get(0).getBootstrapTime() >= infoList.get(1).getBootstrapTime()); + } + + try { + Thread.sleep(CLIENT_REGISTRY_REFRSH_INTERVAL * 2); + } catch(InterruptedException e) {} + // now the periodical update has gone through, it shall be higher than + // the bootstrap time + it = adminClient.fetchEntries(1, + SystemStoreConstants.SystemStoreName.voldsys$_client_registry.name(), + emptyPartitionList, + null, + false); + infoList = getClientRegistryContent(it); + assertTrue("Client registry not updated.", + infoList.get(0).getBootstrapTime() < infoList.get(0).getUpdateTime()); + assertTrue("Client registry not updated.", + infoList.get(1).getBootstrapTime() < infoList.get(1).getUpdateTime()); + } + + @Test + public void testOneServerFailre() { + // bring down one server before starting up the clients + servers[0].stop(); + + List emptyPartitionList = Lists.newArrayList(); + ClientConfig clientConfig = new ClientConfig().setMaxThreads(4) + .setMaxTotalConnections(4) + .setMaxConnectionsPerNode(4) + .setBootstrapUrls(SERVER_LOCAL_URL + + serverPorts[1]) + .setClientContextName(CLIENT_CONTEXT_NAME) + .setClientRegistryRefreshInterval(CLIENT_REGISTRY_REFRSH_INTERVAL); + SocketStoreClientFactory socketFactory1 = new SocketStoreClientFactory(clientConfig); + + ClientConfig clientConfig2 = new ClientConfig().setMaxThreads(4) + .setMaxTotalConnections(4) + .setMaxConnectionsPerNode(4) + .setBootstrapUrls(SERVER_LOCAL_URL + + serverPorts[1]) + .setClientContextName(CLIENT_CONTEXT_NAME2); + SocketStoreClientFactory socketFactory2 = new SocketStoreClientFactory(clientConfig2); + + StoreClient client1 = socketFactory1.getStoreClient(TEST_STORE_NAME); + StoreClient client2 = socketFactory2.getStoreClient(TEST_STORE_NAME2); + + client1.put("k1", "v1"); + client2.put("k2", "v2"); + + Iterator>> it = adminClient.fetchEntries(1, + SystemStoreConstants.SystemStoreName.voldsys$_client_registry.name(), + emptyPartitionList, + null, + false); + ArrayList infoList = getClientRegistryContent(it); + + Assert.assertNotNull("Client version is null", infoList.get(0).getReleaseVersion()); + Assert.assertNotNull("Client version is null", infoList.get(1).getReleaseVersion()); + + if(infoList.get(0).getStoreName().equals(TEST_STORE_NAME)) { + assertEquals(CLIENT_CONTEXT_NAME, infoList.get(0).getContext()); + assertEquals(0, infoList.get(0).getClientSequence()); + assertTrue("Client registry bootstrap time incorrect", + startTime <= infoList.get(0).getBootstrapTime()); + + assertEquals(TEST_STORE_NAME2, infoList.get(1).getStoreName()); + assertEquals(CLIENT_CONTEXT_NAME2, infoList.get(1).getContext()); + assertEquals(0, infoList.get(1).getClientSequence()); + assertTrue("Client registry bootstrap time incorrect", + startTime <= infoList.get(1).getBootstrapTime()); + + assertTrue("Client registry bootstrap time incorrect", + infoList.get(1).getBootstrapTime() >= infoList.get(0).getBootstrapTime()); + + } else { + assertEquals(TEST_STORE_NAME2, infoList.get(0).getStoreName()); + assertEquals(CLIENT_CONTEXT_NAME2, infoList.get(0).getContext()); + assertEquals(0, infoList.get(0).getClientSequence()); + assertTrue("Client registry bootstrap time incorrect", + startTime <= infoList.get(0).getBootstrapTime()); + + assertEquals(TEST_STORE_NAME, infoList.get(1).getStoreName()); + assertEquals(CLIENT_CONTEXT_NAME, infoList.get(1).getContext()); + assertEquals(0, infoList.get(1).getClientSequence()); + assertTrue("Client registry bootstrap time incorrect", + startTime <= infoList.get(1).getBootstrapTime()); + + assertTrue("Client registry bootstrap time incorrect", + infoList.get(0).getBootstrapTime() >= infoList.get(1).getBootstrapTime()); + } + + try { + Thread.sleep(CLIENT_REGISTRY_REFRSH_INTERVAL * 2); + } catch(InterruptedException e) {} + // now the periodical update has gone through, it shall be higher than + // the bootstrap time + it = adminClient.fetchEntries(1, + SystemStoreConstants.SystemStoreName.voldsys$_client_registry.name(), + emptyPartitionList, + null, + false); + infoList = getClientRegistryContent(it); + assertTrue("Client registry not updated.", + infoList.get(0).getBootstrapTime() < infoList.get(0).getUpdateTime()); + assertTrue("Client registry not updated.", + infoList.get(1).getBootstrapTime() < infoList.get(1).getUpdateTime()); + } + + @Test + public void testRepeatRegistrationSameFactory() { + + List emptyPartitionList = Lists.newArrayList(); + ClientConfig clientConfig = new ClientConfig().setMaxThreads(4) + .setMaxTotalConnections(4) + .setMaxConnectionsPerNode(4) + .setBootstrapUrls(SERVER_LOCAL_URL + + serverPorts[1]) + .setClientContextName(CLIENT_CONTEXT_NAME) + .setClientRegistryRefreshInterval(CLIENT_REGISTRY_REFRSH_INTERVAL); + SocketStoreClientFactory socketFactory1 = new SocketStoreClientFactory(clientConfig); + + ClientConfig clientConfig2 = new ClientConfig().setMaxThreads(4) + .setMaxTotalConnections(4) + .setMaxConnectionsPerNode(4) + .setBootstrapUrls(SERVER_LOCAL_URL + + serverPorts[1]) + .setClientContextName(CLIENT_CONTEXT_NAME2); + SocketStoreClientFactory socketFactory2 = new SocketStoreClientFactory(clientConfig2); + + for(int i = 0; i < 3; i++) { + + StoreClient client1 = socketFactory1.getStoreClient(TEST_STORE_NAME); + StoreClient client2 = socketFactory2.getStoreClient(TEST_STORE_NAME2); + + client1.put("k1", "v1"); + client2.put("k2", "v2"); + } + + Iterator>> it = adminClient.fetchEntries(1, + SystemStoreConstants.SystemStoreName.voldsys$_client_registry.name(), + emptyPartitionList, + null, + false); + ArrayList infoList = getClientRegistryContent(it); + assertEquals("Incrrect # of entries created in client registry", 6, infoList.size()); + } + + @Test + public void testRepeatRegistrationDifferentFactories() { + long client1LastBootstrapTime = 0; + long client2LastBootstrapTime = 0; + for(int i = 0; i < 3; i++) { + + List emptyPartitionList = Lists.newArrayList(); + ClientConfig clientConfig = new ClientConfig().setMaxThreads(4) + .setMaxTotalConnections(4) + .setMaxConnectionsPerNode(4) + .setBootstrapUrls(SERVER_LOCAL_URL + + serverPorts[1]) + .setClientContextName(CLIENT_CONTEXT_NAME) + .setClientRegistryRefreshInterval(CLIENT_REGISTRY_REFRSH_INTERVAL); + SocketStoreClientFactory socketFactory1 = new SocketStoreClientFactory(clientConfig); + + ClientConfig clientConfig2 = new ClientConfig().setMaxThreads(4) + .setMaxTotalConnections(4) + .setMaxConnectionsPerNode(4) + .setBootstrapUrls(SERVER_LOCAL_URL + + serverPorts[1]) + .setClientContextName(CLIENT_CONTEXT_NAME2); + SocketStoreClientFactory socketFactory2 = new SocketStoreClientFactory(clientConfig2); + + StoreClient client1 = socketFactory1.getStoreClient(TEST_STORE_NAME); + StoreClient client2 = socketFactory2.getStoreClient(TEST_STORE_NAME2); + + client1.put("k1", "v1"); + client2.put("k2", "v2"); + + Iterator>> it = adminClient.fetchEntries(1, + SystemStoreConstants.SystemStoreName.voldsys$_client_registry.name(), + emptyPartitionList, + null, + false); + ArrayList infoList = getClientRegistryContent(it); + + assertEquals("Incrrect # of entries created in client registry", 2, infoList.size()); + + Assert.assertNotNull("Client version is null", infoList.get(0).getReleaseVersion()); + Assert.assertNotNull("Client version is null", infoList.get(1).getReleaseVersion()); + + if(infoList.get(0).getStoreName().equals(TEST_STORE_NAME)) { + assertEquals(CLIENT_CONTEXT_NAME, infoList.get(0).getContext()); + assertEquals(0, infoList.get(0).getClientSequence()); + assertTrue("Client registry bootstrap time incorrect", + startTime <= infoList.get(0).getBootstrapTime()); + + assertEquals(TEST_STORE_NAME2, infoList.get(1).getStoreName()); + assertEquals(CLIENT_CONTEXT_NAME2, infoList.get(1).getContext()); + assertEquals(0, infoList.get(1).getClientSequence()); + assertTrue("Client registry bootstrap time incorrect", + startTime <= infoList.get(1).getBootstrapTime()); + + assertTrue("Client registry bootstrap time incorrect", + infoList.get(1).getBootstrapTime() >= infoList.get(0).getBootstrapTime()); + + } else { + assertEquals(TEST_STORE_NAME2, infoList.get(0).getStoreName()); + assertEquals(CLIENT_CONTEXT_NAME2, infoList.get(0).getContext()); + assertEquals(0, infoList.get(0).getClientSequence()); + assertTrue("Client registry bootstrap time incorrect", + startTime <= infoList.get(0).getBootstrapTime()); + + assertEquals(TEST_STORE_NAME, infoList.get(1).getStoreName()); + assertEquals(CLIENT_CONTEXT_NAME, infoList.get(1).getContext()); + assertEquals(0, infoList.get(1).getClientSequence()); + assertTrue("Client registry bootstrap time incorrect", + startTime <= infoList.get(1).getBootstrapTime()); + + assertTrue("Client registry bootstrap time incorrect", + infoList.get(0).getBootstrapTime() >= infoList.get(1).getBootstrapTime()); + } + + try { + Thread.sleep(CLIENT_REGISTRY_REFRSH_INTERVAL * 2); + } catch(InterruptedException e) {} + // now the periodical update has gone through, it shall be higher + // than + // the bootstrap time + it = adminClient.fetchEntries(1, + SystemStoreConstants.SystemStoreName.voldsys$_client_registry.name(), + emptyPartitionList, + null, + false); + infoList = getClientRegistryContent(it); + /* + * assertTrue("Client registry not updated.", + * infoList.get(0).getBootstrapTime() < + * infoList.get(0).getUpdateTime()); + * assertTrue("Client registry not updated.", + * infoList.get(1).getBootstrapTime() < + * infoList.get(1).getUpdateTime()); + */ + assertTrue("Bootstrap time does not increase client bounces", + infoList.get(0).getBootstrapTime() > client1LastBootstrapTime); + assertTrue("Bootstrap time does not increase client bounces", + infoList.get(1).getBootstrapTime() > client2LastBootstrapTime); + + client1LastBootstrapTime = infoList.get(0).getBootstrapTime(); + client2LastBootstrapTime = infoList.get(0).getBootstrapTime(); + } + } + + private ArrayList getClientRegistryContent(Iterator>> it) { + ArrayList infoList = Lists.newArrayList(); + while(it.hasNext()) { + infoList.add((ClientInfo) valueSerializer.toObject(it.next().getSecond().getValue())); + } + return infoList; + } + + private void clearRegistryContent() { + for(int i = 0; i < TOTAL_SERVERS; i++) { + servers[i].getStoreRepository() + .getStorageEngine(SystemStoreConstants.SystemStoreName.voldsys$_client_registry.name()) + .truncate(); + } + } +}