diff --git a/src/java/voldemort/VoldemortAdminTool.java b/src/java/voldemort/VoldemortAdminTool.java index 4f4a249c2d..13dc8daca0 100644 --- a/src/java/voldemort/VoldemortAdminTool.java +++ b/src/java/voldemort/VoldemortAdminTool.java @@ -1363,7 +1363,7 @@ private static void executeUpdateEntries(Integer nodeId, for(String storeName: storeNames) { Iterator>> iterator = readEntriesBinary(inputDir, storeName); - adminClient.storeOps.updateEntries(nodeId, storeName, iterator, null); + adminClient.streamingOps.updateEntries(nodeId, storeName, iterator, null); } } @@ -1623,7 +1623,7 @@ private static void executeQueryKeys(final Integer nodeId, listKeys.add(new ByteArray(serializer.toBytes(key))); } for(final String storeName: storeNames) { - final Iterator iterator = adminClient.storeOps.queryKeys(nodeId.intValue(), + final Iterator iterator = adminClient.streamingOps.queryKeys(nodeId.intValue(), storeName, listKeys.iterator()); List storeDefinitionList = adminClient.metadataMgmtOps.getRemoteStoreDefList(nodeId) diff --git a/src/java/voldemort/client/protocol/admin/AdminClient.java b/src/java/voldemort/client/protocol/admin/AdminClient.java index 4609c57e49..f19b5a448d 100644 --- a/src/java/voldemort/client/protocol/admin/AdminClient.java +++ b/src/java/voldemort/client/protocol/admin/AdminClient.java @@ -158,7 +158,8 @@ public class AdminClient { final public AdminClient.StoreManagementOperations storeMgmtOps; final public AdminClient.StoreMaintenanceOperations storeMntOps; final public AdminClient.BulkStreamingFetchOperations bulkFetchOps; - final public AdminClient.StreamingStoreOperations storeOps; + final public AdminClient.StreamingOperations streamingOps; + final public AdminClient.StoreOperations storeOps; final public AdminClient.RestoreOperations restoreOps; final public AdminClient.RebalancingOperations rebalanceOps; final public AdminClient.ReadOnlySpecificOperations readonlyOps; @@ -175,7 +176,8 @@ private AdminClient(AdminClientConfig adminClientConfig) { this.storeMgmtOps = this.new StoreManagementOperations(); this.storeMntOps = this.new StoreMaintenanceOperations(); this.bulkFetchOps = this.new BulkStreamingFetchOperations(); - this.storeOps = this.new StreamingStoreOperations(); + this.streamingOps = this.new StreamingOperations(); + this.storeOps = this.new StoreOperations(); this.restoreOps = this.new RestoreOperations(); this.rebalanceOps = this.new RebalancingOperations(); this.readonlyOps = this.new ReadOnlySpecificOperations(); @@ -1913,8 +1915,8 @@ private class NodeStore { private final ConcurrentMap nodeStoreSocketCache; - // TODO: Pass in a ClientConfig or a AdminClientConfig? AdminStoreClient() { + // TODO: Pass in a ClientConfig or a AdminClientConfig? this.clientConfig = new ClientConfig(); clientPool = new ClientRequestExecutorPool(clientConfig.getSelectors(), clientConfig.getMaxConnectionsPerNode(), @@ -1956,15 +1958,47 @@ public void stop() { } } - // TODO: Rename StreamingStoreOperations to StoreOperations? Or pull out the - // query/update/repair stuff that operates on individual keys into new inner - // class StoreOperations. + public class StoreOperations { + + /** + * This method updates exactly one key/value for a specific store on a + * specific node. + * + * @param storeName Name of the store + * @param nodeKeyValue A specific key/value to update on a specific + * node. + * @return RepairEntryResult with success/exception details. + */ + public void putNodeKeyValue(String storeName, NodeValue nodeKeyValue) { + SocketStore socketStore = adminStoreClient.getSocketStore(nodeKeyValue.getNodeId(), + storeName); + + socketStore.put(nodeKeyValue.getKey(), nodeKeyValue.getVersioned(), null); + } + + /** + * Fetch key/value tuple for given key for a specific store on specified + * node. + * + * @param storeName Name of the store + * @param nodeId Id of the node to query from + * @param key for which to query + * @return List> of values for the specified NodeKey. + */ + public List> getNodeKey(String storeName, int nodeId, ByteArray key) { + SocketStore socketStore = adminStoreClient.getSocketStore(nodeId, storeName); + return socketStore.get(key, null); + } + + // As needed, add 'getall', 'delete', and so on interfaces... + } + /** * Encapsulates all steaming operations that actually read and write * key-value pairs into the cluster * */ - public class StreamingStoreOperations { + public class StreamingOperations { /** * Update a stream of key/value entries at the given node. The iterator @@ -2054,50 +2088,8 @@ public void updateEntries(int nodeId, } } - /** - * This method updates exactly one key/value for a specific store on a - * specific node. - * - * @param storeName Name of the store - * @param nodeKeyValue A specific key/value to update on a specific - * node. - * @return RepairEntryResult with success/exception details. - */ - public RepairEntryResult repairEntry(String storeName, - NodeValue nodeKeyValue) { - SocketStore socketStore = adminStoreClient.getSocketStore(nodeKeyValue.getNodeId(), - storeName); - - try { - socketStore.put(nodeKeyValue.getKey(), nodeKeyValue.getVersioned(), null); - return new RepairEntryResult(); - } catch(VoldemortException ve) { - return new RepairEntryResult(ve); - } - } - - /** - * Fetch key/value tuple for given key on specified node - * - * @param storeName Name of the store - * @param nodeId Id of the node to query from - * @param key for which to query - * @return QueryKeyResult with key & value or key & exception, depending - * on result. - */ - public QueryKeyResult queryKey(String storeName, int nodeId, ByteArray key) { - SocketStore socketStore = adminStoreClient.getSocketStore(nodeId, storeName); - - List> value = null; - try { - value = socketStore.get(key, null); - return new QueryKeyResult(key, value); - } catch(VoldemortException ve) { - return new QueryKeyResult(key, ve); - } - } - - // TODO: Use queryKey method (and so adminStoreClient too)? + // TODO: Use storeOperation.getNodeKey()? Or some other way of using + // adminStoreClient? /** * Fetch key/value tuples belonging to a node with given key values * @@ -2760,10 +2752,10 @@ public void run() { partitionIdList, null, false); - currentAdminClient.storeOps.updateEntries(nodeId, - storeName, - iterator, - null); + currentAdminClient.streamingOps.updateEntries(nodeId, + storeName, + iterator, + null); logger.info("Mirroring data for store:" + storeName + " from node " + nodeIdToMirrorFrom + " completed."); diff --git a/src/java/voldemort/client/protocol/admin/QueryKeyResult.java b/src/java/voldemort/client/protocol/admin/QueryKeyResult.java index b70c35f286..dfca2dad3a 100644 --- a/src/java/voldemort/client/protocol/admin/QueryKeyResult.java +++ b/src/java/voldemort/client/protocol/admin/QueryKeyResult.java @@ -15,13 +15,13 @@ public class QueryKeyResult { private final List> values; private final Exception exception; - QueryKeyResult(ByteArray key, List> values) { + public QueryKeyResult(ByteArray key, List> values) { this.key = key; this.values = values; this.exception = null; } - QueryKeyResult(ByteArray key, Exception exception) { + public QueryKeyResult(ByteArray key, Exception exception) { this.key = key; this.values = null; this.exception = exception; diff --git a/src/java/voldemort/client/protocol/admin/RepairEntryResult.java b/src/java/voldemort/client/protocol/admin/RepairEntryResult.java deleted file mode 100644 index f783a88d23..0000000000 --- a/src/java/voldemort/client/protocol/admin/RepairEntryResult.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2013 LinkedIn, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package voldemort.client.protocol.admin; - -import voldemort.VoldemortException; -import voldemort.versioning.ObsoleteVersionException; - -public class RepairEntryResult { - - private final VoldemortException voldemortException; - private final boolean success; - - RepairEntryResult() { - this.voldemortException = null; - success = true; - } - - RepairEntryResult(VoldemortException e) { - if(e instanceof ObsoleteVersionException) { - this.voldemortException = null; - this.success = true; - } else { - this.voldemortException = e; - this.success = false; - } - } - - public boolean isSuccess() { - return success; - } - - public boolean hasException() { - return (voldemortException != null); - } - - public VoldemortException getException() { - return voldemortException; - } - -} \ No newline at end of file diff --git a/src/java/voldemort/server/rebalance/async/DonorBasedRebalancePusherSlave.java b/src/java/voldemort/server/rebalance/async/DonorBasedRebalancePusherSlave.java index b5d9cfd67a..312a1f83c8 100644 --- a/src/java/voldemort/server/rebalance/async/DonorBasedRebalancePusherSlave.java +++ b/src/java/voldemort/server/rebalance/async/DonorBasedRebalancePusherSlave.java @@ -44,7 +44,7 @@ public void run() throws VoldemortException { while(!nodeIterator.done) { try { nodeIterator.reset(); - adminClient.storeOps.updateEntries(nodeId, storeName, nodeIterator, null); + adminClient.streamingOps.updateEntries(nodeId, storeName, nodeIterator, null); nodeIterator.purge(); } catch(VoldemortException e) { if(e.getCause() instanceof IOException) { diff --git a/src/java/voldemort/server/scheduler/slop/StreamingSlopPusherJob.java b/src/java/voldemort/server/scheduler/slop/StreamingSlopPusherJob.java index f344688fa3..68e04b3444 100644 --- a/src/java/voldemort/server/scheduler/slop/StreamingSlopPusherJob.java +++ b/src/java/voldemort/server/scheduler/slop/StreamingSlopPusherJob.java @@ -422,7 +422,7 @@ public void run() { } this.startTime = System.currentTimeMillis(); iterator = new SlopIterator(slopQueue, current); - adminClient.storeOps.updateSlopEntries(nodeId, iterator); + adminClient.streamingOps.updateSlopEntries(nodeId, iterator); } while(!iterator.isComplete()); // Clear up both previous and current diff --git a/src/java/voldemort/store/routed/NodeValue.java b/src/java/voldemort/store/routed/NodeValue.java index 01dda95a6a..8b225e3b07 100644 --- a/src/java/voldemort/store/routed/NodeValue.java +++ b/src/java/voldemort/store/routed/NodeValue.java @@ -35,6 +35,8 @@ */ public final class NodeValue implements Serializable, Cloneable { + // TODO: Rename NodeValue to NodeKeyValue + private static final long serialVersionUID = 1; private final int nodeId; diff --git a/src/java/voldemort/utils/ConsistencyFix.java b/src/java/voldemort/utils/ConsistencyFix.java index fe52ec429c..9cbea4143f 100644 --- a/src/java/voldemort/utils/ConsistencyFix.java +++ b/src/java/voldemort/utils/ConsistencyFix.java @@ -36,11 +36,11 @@ import voldemort.client.protocol.admin.AdminClient; import voldemort.client.protocol.admin.AdminClientConfig; import voldemort.client.protocol.admin.QueryKeyResult; -import voldemort.client.protocol.admin.RepairEntryResult; import voldemort.cluster.Cluster; import voldemort.store.StoreDefinition; import voldemort.store.routed.NodeValue; import voldemort.store.routed.ReadRepairer; +import voldemort.versioning.ObsoleteVersionException; import voldemort.versioning.VectorClock; import voldemort.versioning.Versioned; @@ -48,45 +48,36 @@ public class ConsistencyFix { - // TODO: Move ConsistencyFixContext into its own file? Or break this apart. - // I.e., explicitly pass adminClient and storeInstance around? - private static class ConsistencyFixContext { + private static AdminClient adminClient = null; + private static StoreInstance storeInstance = null; - private final AdminClient adminClient; - private final StoreInstance storeInstance; + private static void init(Options options) { + System.out.println("Connecting to bootstrap server: " + options.url); + adminClient = new AdminClient(options.url, new AdminClientConfig(), 0); + Cluster cluster = adminClient.getAdminClientCluster(); + System.out.println("Cluster determined to be: " + cluster.getName()); - public ConsistencyFixContext(String url, String storeName) throws Exception { - System.out.println("Connecting to bootstrap server: " + url); - adminClient = new AdminClient(url, new AdminClientConfig(), 0); - Cluster cluster = adminClient.getAdminClientCluster(); - System.out.println("Cluster determined to be: " + cluster.getName()); + System.out.println("Determining store definition for store: " + options.storeName); + Versioned> storeDefinitions = adminClient.metadataMgmtOps.getRemoteStoreDefList(0); + List storeDefs = storeDefinitions.getValue(); + StoreDefinition storeDefinition = StoreDefinitionUtils.getStoreDefinitionWithName(storeDefs, + options.storeName); + System.out.println("Store definition determined."); - System.out.println("Determining store definition for store: " + storeName); - Versioned> storeDefinitions = adminClient.metadataMgmtOps.getRemoteStoreDefList(0); - List storeDefs = storeDefinitions.getValue(); - StoreDefinition storeDefinition = StoreDefinitionUtils.getStoreDefinitionWithName(storeDefs, - storeName); - System.out.println("Store definition determined."); - - storeInstance = new StoreInstance(cluster, storeDefinition); - } - - public AdminClient getAdminClient() { - return adminClient; - } + storeInstance = new StoreInstance(cluster, storeDefinition); + } - public StoreInstance getStoreInstance() { - return storeInstance; - } + private static void stop() { + adminClient.stop(); + } - public String getStoreName() { - return storeInstance.getStoreDefinition().getName(); - } + public static String getStoreName() { + return storeInstance.getStoreDefinition().getName(); + } - public int getMasterPartitionId(String keyInHexFormat) throws DecoderException { - byte[] key = Hex.decodeHex(keyInHexFormat.toCharArray()); - return storeInstance.getMasterPartitionId(key); - } + public static int getMasterPartitionId(String keyInHexFormat) throws DecoderException { + byte[] key = Hex.decodeHex(keyInHexFormat.toCharArray()); + return storeInstance.getMasterPartitionId(key); } public static void printUsage() { @@ -191,6 +182,9 @@ private static ConsistencyFix.Options parseArgs(String[] args) { List valuesOf = (List) optionSet.valuesOf("keys"); options.keysInHexFormat = valuesOf; } + // TODO: Should I do something more iterator like for reading files of + // keys? I suspect that we should not read millions(?) of keys into + // memory before doing any actual work. if(optionSet.has("key-file")) { String keyFile = (String) optionSet.valueOf("key-file"); System.err.println("Key file: " + keyFile); @@ -222,11 +216,10 @@ public static void main(String[] args) throws Exception { Utils.croak("Failure to create BufferedWriter for ouput file '" + options.outFile + "'"); } - // TODO: Need something more iterator like for reading files of keys. - // Don't want to read millions of keys into memory at startup. - ConsistencyFixContext vInstance = new ConsistencyFixContext(options.url, options.storeName); + init(options); + for(String keyInHexFormat: options.keysInHexFormat) { - FixKeyResult fixKeyResult = fixKey(vInstance, keyInHexFormat, options.verbose); + FixKeyResult fixKeyResult = fixKey(keyInHexFormat, options.verbose); if(fixKeyResult == FixKeyResult.SUCCESS) { System.out.println("Successfully processed " + keyInHexFormat); } else { @@ -234,7 +227,7 @@ public static void main(String[] args) throws Exception { } } - vInstance.getAdminClient().stop(); + stop(); } public enum FixKeyResult { @@ -265,8 +258,7 @@ public String toString() { * in a non-null object to be populated by this method. * @return FixKeyResult */ - private static ConsistencyFix.FixKeyResult doRead(final ConsistencyFixContext vInstance, - final List nodeIdList, + private static ConsistencyFix.FixKeyResult doRead(final List nodeIdList, final byte[] keyInBytes, final String keyInHexFormat, boolean verbose, @@ -281,13 +273,17 @@ private static ConsistencyFix.FixKeyResult doRead(final ConsistencyFixContext vI if(verbose) { System.out.println("Reading key-values for specified key: " + keyInHexFormat); } + ByteArray key = new ByteArray(keyInBytes); // TODO: Do this asynchronously so that all requests are outstanding in // parallel. Will need to make nodeIdToKeyValues thread safe. for(int nodeId: nodeIdList) { - QueryKeyResult queryKeyResult = vInstance.getAdminClient().storeOps.queryKey(vInstance.getStoreName(), - nodeId, - new ByteArray(keyInBytes)); - nodeIdToKeyValues.put(nodeId, queryKeyResult); + List> values = null; + try { + values = adminClient.storeOps.getNodeKey(getStoreName(), nodeId, key); + nodeIdToKeyValues.put(nodeId, new QueryKeyResult(key, values)); + } catch(VoldemortException ve) { + nodeIdToKeyValues.put(nodeId, new QueryKeyResult(key, ve)); + } } return FixKeyResult.SUCCESS; @@ -427,8 +423,7 @@ private static List> resolveReadConflicts(boolean v * non-null object to be populated by this method. * @return */ - private static ConsistencyFix.FixKeyResult doWriteBack(final ConsistencyFixContext vInstance, - boolean verbose, + private static ConsistencyFix.FixKeyResult doWriteBack(boolean verbose, final List> toReadRepair) { if(verbose) { System.out.println("Performing repair work:"); @@ -441,19 +436,16 @@ private static ConsistencyFix.FixKeyResult doWriteBack(final ConsistencyFixConte } // TODO: Do this asynchronously so that all requests are outstanding // in parallel. - RepairEntryResult repairEntryResult = vInstance.getAdminClient().storeOps.repairEntry(vInstance.getStoreName(), - nodeKeyValue); - if(!repairEntryResult.isSuccess()) { - if(verbose) { - if(repairEntryResult.hasException()) { - VoldemortException ve = repairEntryResult.getException(); - System.out.println("\t... Repair of key " + nodeKeyValue.getKey() - + "on node with id " + nodeKeyValue.getNodeId() - + " for version " + nodeKeyValue.getVersion() - + " failed because of exception : " + ve.getMessage()); - } - } + try { + adminClient.storeOps.putNodeKeyValue(getStoreName(), nodeKeyValue); + } catch(ObsoleteVersionException ove) { + // NOOP. Treat OVE as success. + } catch(VoldemortException ve) { allRepairsSuccessful = false; + System.out.println("\t... Repair of key " + nodeKeyValue.getKey() + + "on node with id " + nodeKeyValue.getNodeId() + + " for version " + nodeKeyValue.getVersion() + + " failed because of exception : " + ve.getMessage()); } } if(!allRepairsSuccessful) { @@ -466,9 +458,7 @@ private static ConsistencyFix.FixKeyResult doWriteBack(final ConsistencyFixConte return FixKeyResult.SUCCESS; } - public static ConsistencyFix.FixKeyResult fixKey(ConsistencyFixContext vInstance, - String keyInHexFormat, - boolean verbose) { + public static ConsistencyFix.FixKeyResult fixKey(String keyInHexFormat, boolean verbose) { if(verbose) { System.out.println("Performing consistency fix of key: " + keyInHexFormat); } @@ -479,8 +469,8 @@ public static ConsistencyFix.FixKeyResult fixKey(ConsistencyFixContext vInstance int masterPartitionId = -1; try { keyInBytes = ByteUtils.fromHexString(keyInHexFormat); - masterPartitionId = vInstance.getMasterPartitionId(keyInHexFormat); - nodeIdList = vInstance.getStoreInstance().getReplicationNodeList(masterPartitionId); + masterPartitionId = getMasterPartitionId(keyInHexFormat); + nodeIdList = storeInstance.getReplicationNodeList(masterPartitionId); } catch(Exception exception) { if(verbose) { System.out.println("Aborting fixKey due to bad init."); @@ -492,8 +482,7 @@ public static ConsistencyFix.FixKeyResult fixKey(ConsistencyFixContext vInstance // Read Map nodeIdToKeyValues = new HashMap(); - FixKeyResult fixKeyResult = ConsistencyFix.doRead(vInstance, - nodeIdList, + FixKeyResult fixKeyResult = ConsistencyFix.doRead(nodeIdList, keyInBytes, keyInHexFormat, verbose, @@ -519,7 +508,7 @@ public static ConsistencyFix.FixKeyResult fixKey(ConsistencyFixContext vInstance nodeValues); // Write back (if necessary) - fixKeyResult = ConsistencyFix.doWriteBack(vInstance, verbose, toReadRepair); + fixKeyResult = ConsistencyFix.doWriteBack(verbose, toReadRepair); if(fixKeyResult != FixKeyResult.SUCCESS) { return fixKeyResult; } diff --git a/test/unit/voldemort/client/AbstractAdminServiceFilterTest.java b/test/unit/voldemort/client/AbstractAdminServiceFilterTest.java index 7f9b94df25..ad9053a3d6 100644 --- a/test/unit/voldemort/client/AbstractAdminServiceFilterTest.java +++ b/test/unit/voldemort/client/AbstractAdminServiceFilterTest.java @@ -119,7 +119,7 @@ public void testUpdateAsStreamWithFilter() { Set>> entrySet = createEntries(); // make update stream call with filter - getAdminClient().storeOps.updateEntries(0, testStoreName, entrySet.iterator(), filter); + getAdminClient().streamingOps.updateEntries(0, testStoreName, entrySet.iterator(), filter); // assert none of the filtered entries are updated. // user store should be present diff --git a/test/unit/voldemort/client/AdminServiceBasicTest.java b/test/unit/voldemort/client/AdminServiceBasicTest.java index cb456c5d8f..a8fd7d9adb 100644 --- a/test/unit/voldemort/client/AdminServiceBasicTest.java +++ b/test/unit/voldemort/client/AdminServiceBasicTest.java @@ -1415,7 +1415,7 @@ public void testQuery() { // test one key on store 0 queryKeys = new ArrayList(); queryKeys.add(belongToAndInsideServer0Keys.get(0)); - results = getAdminClient().storeOps.queryKeys(0, testStoreName, queryKeys.iterator()); + results = getAdminClient().streamingOps.queryKeys(0, testStoreName, queryKeys.iterator()); assertTrue("Results should not be empty", results.hasNext()); entry = results.next(); assertEquals(queryKeys.get(0), entry.getKey()); @@ -1430,7 +1430,7 @@ public void testQuery() { // test one key belongs to but not exists in server 0 queryKeys = new ArrayList(); queryKeys.add(belongToServer0ButOutsideBothKeys.get(0)); - results = getAdminClient().storeOps.queryKeys(0, testStoreName, queryKeys.iterator()); + results = getAdminClient().streamingOps.queryKeys(0, testStoreName, queryKeys.iterator()); assertTrue("Results should not be empty", results.hasNext()); entry = results.next(); assertFalse("There should not be more results", results.hasNext()); @@ -1442,7 +1442,7 @@ public void testQuery() { // test one key not exist and does not belong to server 0 queryKeys = new ArrayList(); queryKeys.add(notBelongToServer0AndOutsideBothKeys.get(0)); - results = getAdminClient().storeOps.queryKeys(0, testStoreName, queryKeys.iterator()); + results = getAdminClient().streamingOps.queryKeys(0, testStoreName, queryKeys.iterator()); assertTrue("Results should not be empty", results.hasNext()); entry = results.next(); assertFalse("There should not be more results", results.hasNext()); @@ -1454,7 +1454,7 @@ public void testQuery() { // test one key that exists on server 0 but does not belong to server 0 queryKeys = new ArrayList(); queryKeys.add(notBelongServer0ButInsideServer0Keys.get(0)); - results = getAdminClient().storeOps.queryKeys(0, testStoreName, queryKeys.iterator()); + results = getAdminClient().streamingOps.queryKeys(0, testStoreName, queryKeys.iterator()); assertTrue("Results should not be empty", results.hasNext()); entry = results.next(); assertFalse("There should not be more results", results.hasNext()); @@ -1467,7 +1467,7 @@ public void testQuery() { store0.delete(belongToAndInsideServer0Keys.get(4), null); queryKeys = new ArrayList(); queryKeys.add(belongToAndInsideServer0Keys.get(4)); - results = getAdminClient().storeOps.queryKeys(0, testStoreName, queryKeys.iterator()); + results = getAdminClient().streamingOps.queryKeys(0, testStoreName, queryKeys.iterator()); assertTrue("Results should not be empty", results.hasNext()); entry = results.next(); assertFalse("There should not be more results", results.hasNext()); @@ -1477,14 +1477,14 @@ public void testQuery() { // test empty request queryKeys = new ArrayList(); - results = getAdminClient().storeOps.queryKeys(0, testStoreName, queryKeys.iterator()); + results = getAdminClient().streamingOps.queryKeys(0, testStoreName, queryKeys.iterator()); assertFalse("Results should be empty", results.hasNext()); // test null key queryKeys = new ArrayList(); queryKeys.add(null); assertEquals(1, queryKeys.size()); - results = getAdminClient().storeOps.queryKeys(0, testStoreName, queryKeys.iterator()); + results = getAdminClient().streamingOps.queryKeys(0, testStoreName, queryKeys.iterator()); assertTrue("Results should not be empty", results.hasNext()); entry = results.next(); assertFalse("There should not be more results", results.hasNext()); @@ -1497,7 +1497,7 @@ public void testQuery() { queryKeys.add(belongToAndInsideServer1Keys.get(0)); queryKeys.add(belongToAndInsideServer1Keys.get(1)); queryKeys.add(belongToAndInsideServer1Keys.get(2)); - results = getAdminClient().storeOps.queryKeys(1, testStoreName, queryKeys.iterator()); + results = getAdminClient().streamingOps.queryKeys(1, testStoreName, queryKeys.iterator()); assertTrue("Results should not be empty", results.hasNext()); Map>> entries = new HashMap>>(); int resultCount = 0; @@ -1535,7 +1535,7 @@ public void testQuery() { queryKeys.add(belongToAndInsideServer0Keys.get(3)); queryKeys.add(belongToAndInsideServer0Keys.get(5)); queryKeys.add(notBelongServer0ButInsideServer0Keys.get(2)); - results = getAdminClient().storeOps.queryKeys(0, testStoreName, queryKeys.iterator()); + results = getAdminClient().streamingOps.queryKeys(0, testStoreName, queryKeys.iterator()); // key 0 entry = results.next(); assertEquals(0, ByteUtils.compare(queryKeys.get(0).get(), entry.getKey().get())); @@ -1596,7 +1596,7 @@ protected Pair> computeNext() { } }; - getAdminClient().storeOps.updateEntries(0, testStoreName, iterator, null); + getAdminClient().streamingOps.updateEntries(0, testStoreName, iterator, null); // check updated values Store store = getStore(0, testStoreName); @@ -1621,7 +1621,7 @@ public void testUpdateSlops() { "test-consistent-with-pref-list"); Iterator> slopIterator = entrySet.iterator(); - getAdminClient().storeOps.updateSlopEntries(0, slopIterator); + getAdminClient().streamingOps.updateSlopEntries(0, slopIterator); // check updated values Iterator> entrysetItr = entrySet.iterator(); diff --git a/test/unit/voldemort/client/AdminServiceFailureTest.java b/test/unit/voldemort/client/AdminServiceFailureTest.java index eaa7613811..2fce111b4e 100644 --- a/test/unit/voldemort/client/AdminServiceFailureTest.java +++ b/test/unit/voldemort/client/AdminServiceFailureTest.java @@ -191,7 +191,7 @@ private void doOperation(StreamOperations e, false)); return; case UPDATE_ENTRIES: - getAdminClient().storeOps.updateEntries(nodeId, + getAdminClient().streamingOps.updateEntries(nodeId, storeName, getRandomlyFailingIterator(ServerTestUtils.createRandomKeyValuePairs(TEST_KEYS)), null);