Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Consolidating haskeys logic with getall

  • Loading branch information...
commit 83fcac4f47ee0513909114ff88beb932ff30cce8 1 parent be70d7a
Roshan Sumbaly authored September 21, 2012

Showing 21 changed files with 583 additions and 907 deletions. Show diff stats Hide diff stats

  1. 10  src/java/voldemort/client/ClientConfig.java
  2. 33  src/java/voldemort/client/TimeoutConfig.java
  3. 8  src/java/voldemort/server/VoldemortConfig.java
  4. 85  src/java/voldemort/store/routed/HasKeysPipelineData.java
  5. 13  src/java/voldemort/store/routed/{GetAllPipelineData.java → MultiKeysPipelineData.java}
  6. 47  src/java/voldemort/store/routed/PipelineRoutedStore.java
  7. 8  src/java/voldemort/store/routed/action/GetAllReadRepair.java
  8. 157  src/java/voldemort/store/routed/action/HasKeysConfigureNodes.java
  9. 26  src/java/voldemort/store/routed/action/{GetAllConfigureNodes.java → MultiKeysConfigureNodes.java}
  10. 152  src/java/voldemort/store/routed/action/PerformParallelGetAllRequests.java
  11. 145  src/java/voldemort/store/routed/action/PerformParallelHasKeysRequests.java
  12. 162  src/java/voldemort/store/routed/action/PerformParallelMultiKeysRequests.java
  13. 190  src/java/voldemort/store/routed/action/PerformSerialGetAllRequests.java
  14. 181  src/java/voldemort/store/routed/action/PerformSerialHasKeysRequests.java
  15. 197  src/java/voldemort/store/routed/action/PerformSerialMultiKeysRequests.java
  16. 1  test/integration/voldemort/performance/ClientConnectionStressTest.java
  17. 1  test/unit/voldemort/store/routed/GetallNodeReachTest.java
  18. 2  test/unit/voldemort/store/routed/HintedHandoffTest.java
  19. 1  test/unit/voldemort/store/routed/ReadRepairerTest.java
  20. 24  test/unit/voldemort/store/routed/RoutedStoreTest.java
  21. 47  test/unit/voldemort/store/routed/action/GetAllConfigureNodesTest.java
10  src/java/voldemort/client/ClientConfig.java
@@ -56,7 +56,7 @@
56 56
     private volatile boolean socketKeepAlive = false;
57 57
     private volatile int selectors = 8;
58 58
     private volatile long routingTimeoutMs = 15000;
59  
-    private volatile TimeoutConfig timeoutConfig = new TimeoutConfig(routingTimeoutMs, false);
  59
+    private volatile TimeoutConfig timeoutConfig = new TimeoutConfig(routingTimeoutMs, false, false);
60 60
     private volatile int socketBufferSize = 64 * 1024;
61 61
     private volatile SerializerFactory serializerFactory = new DefaultSerializerFactory();
62 62
     private volatile List<String> bootstrapUrls = null;
@@ -99,6 +99,7 @@ public ClientConfig() {}
99 99
     public static final String GET_VERSIONS_ROUTING_TIMEOUT_MS_PROPERTY = "getversions_routing_timeout_ms";
100 100
     public static final String DELETE_ROUTING_TIMEOUT_MS_PROPERTY = "delete_routing_timeout_ms";
101 101
     public static final String ALLOW_PARTIAL_GETALLS_PROPERTY = "allow_partial_getalls";
  102
+    public static final String ALLOW_PARTIAL_HASKEYS_PROPERTY = "allow_partial_haskeys";
102 103
     public static final String NODE_BANNAGE_MS_PROPERTY = "node_bannage_ms";
103 104
     public static final String SOCKET_BUFFER_SIZE_PROPERTY = "socket_buffer_size";
104 105
     public static final String SERIALIZER_FACTORY_CLASS_PROPERTY = "serializer_factory_class";
@@ -183,7 +184,7 @@ private void setProperties(Properties properties) {
183 184
             this.setRoutingTimeout(props.getInt(ROUTING_TIMEOUT_MS_PROPERTY), TimeUnit.MILLISECONDS);
184 185
 
185 186
         // By default, make all the timeouts equal to routing timeout
186  
-        timeoutConfig = new TimeoutConfig(routingTimeoutMs, false);
  187
+        timeoutConfig = new TimeoutConfig(routingTimeoutMs, false, false);
187 188
 
188 189
         if(props.containsKey(GETALL_ROUTING_TIMEOUT_MS_PROPERTY))
189 190
             timeoutConfig.setOperationTimeout(VoldemortOpCode.GET_ALL_OP_CODE,
@@ -210,7 +211,10 @@ private void setProperties(Properties properties) {
210 211
                                               props.getInt(DELETE_ROUTING_TIMEOUT_MS_PROPERTY));
211 212
 
212 213
         if(props.containsKey(ALLOW_PARTIAL_GETALLS_PROPERTY))
213  
-            timeoutConfig.setPartialGetAllAllowed(props.getBoolean(ALLOW_PARTIAL_GETALLS_PROPERTY));
  214
+            timeoutConfig.setPartialGetAllsAllowed(props.getBoolean(ALLOW_PARTIAL_GETALLS_PROPERTY));
  215
+
  216
+        if(props.containsKey(ALLOW_PARTIAL_HASKEYS_PROPERTY))
  217
+            timeoutConfig.setPartialHasKeysAllowed(props.getBoolean(ALLOW_PARTIAL_HASKEYS_PROPERTY));
214 218
 
215 219
         if(props.containsKey(SOCKET_BUFFER_SIZE_PROPERTY))
216 220
             this.setSocketBufferSize(props.getInt(SOCKET_BUFFER_SIZE_PROPERTY));
33  src/java/voldemort/client/TimeoutConfig.java
@@ -12,16 +12,21 @@
12 12
 
13 13
     private HashMap<Byte, Long> timeoutMap;
14 14
 
15  
-    private boolean partialGetAllAllowed;
  15
+    private boolean partialGetAllsAllowed;
16 16
 
17  
-    public TimeoutConfig(long globalTimeout, boolean allowPartialGetAlls) {
  17
+    private boolean partialHasKeysAllowed;
  18
+
  19
+    public TimeoutConfig(long globalTimeout,
  20
+                         boolean partialGetAllsAllowed,
  21
+                         boolean partialHasKeysAllowed) {
18 22
         this(globalTimeout,
19 23
              globalTimeout,
20 24
              globalTimeout,
21 25
              globalTimeout,
22 26
              globalTimeout,
23 27
              globalTimeout,
24  
-             allowPartialGetAlls);
  28
+             partialGetAllsAllowed,
  29
+             partialHasKeysAllowed);
25 30
     }
26 31
 
27 32
     public TimeoutConfig(long getTimeout,
@@ -30,7 +35,8 @@ public TimeoutConfig(long getTimeout,
30 35
                          long getAllTimeout,
31 36
                          long getVersionsTimeout,
32 37
                          long hasKeysTimeout,
33  
-                         boolean allowPartialGetAlls) {
  38
+                         boolean partialGetAllsAllowed,
  39
+                         boolean partialHasKeysAllowed) {
34 40
         timeoutMap = new HashMap<Byte, Long>();
35 41
         timeoutMap.put(VoldemortOpCode.GET_OP_CODE, getTimeout);
36 42
         timeoutMap.put(VoldemortOpCode.PUT_OP_CODE, putTimeout);
@@ -38,7 +44,8 @@ public TimeoutConfig(long getTimeout,
38 44
         timeoutMap.put(VoldemortOpCode.GET_ALL_OP_CODE, getAllTimeout);
39 45
         timeoutMap.put(VoldemortOpCode.GET_VERSION_OP_CODE, getVersionsTimeout);
40 46
         timeoutMap.put(VoldemortOpCode.HAS_KEYS_OP_CODE, hasKeysTimeout);
41  
-        setPartialGetAllAllowed(allowPartialGetAlls);
  47
+        setPartialGetAllsAllowed(partialGetAllsAllowed);
  48
+        setPartialHasKeysAllowed(partialHasKeysAllowed);
42 49
     }
43 50
 
44 51
     public long getOperationTimeout(Byte opCode) {
@@ -50,12 +57,20 @@ public void setOperationTimeout(Byte opCode, long timeoutMs) {
50 57
         timeoutMap.put(opCode, timeoutMs);
51 58
     }
52 59
 
53  
-    public boolean isPartialGetAllAllowed() {
54  
-        return partialGetAllAllowed;
  60
+    public boolean isPartialGetAllsAllowed() {
  61
+        return partialGetAllsAllowed;
  62
+    }
  63
+
  64
+    public void setPartialGetAllsAllowed(boolean partialGetAllsAllowed) {
  65
+        this.partialGetAllsAllowed = partialGetAllsAllowed;
  66
+    }
  67
+
  68
+    public boolean isPartialHasKeysAllowed() {
  69
+        return partialHasKeysAllowed;
55 70
     }
56 71
 
57  
-    public void setPartialGetAllAllowed(boolean allowPartialGetAlls) {
58  
-        this.partialGetAllAllowed = allowPartialGetAlls;
  72
+    public void setPartialHasKeysAllowed(boolean partialHasKeysAllowed) {
  73
+        this.partialHasKeysAllowed = partialHasKeysAllowed;
59 74
     }
60 75
 
61 76
 }
8  src/java/voldemort/server/VoldemortConfig.java
@@ -285,7 +285,7 @@ public VoldemortConfig(Props props) {
285 285
         this.clientMaxConnectionsPerNode = props.getInt("client.max.connections.per.node", 50);
286 286
         this.clientConnectionTimeoutMs = props.getInt("client.connection.timeout.ms", 500);
287 287
         this.clientRoutingTimeoutMs = props.getInt("client.routing.timeout.ms", 15000);
288  
-        this.clientTimeoutConfig = new TimeoutConfig(this.clientRoutingTimeoutMs, false);
  288
+        this.clientTimeoutConfig = new TimeoutConfig(this.clientRoutingTimeoutMs, false, false);
289 289
         this.clientTimeoutConfig.setOperationTimeout(VoldemortOpCode.GET_OP_CODE,
290 290
                                                      props.getInt("client.routing.get.timeout.ms",
291 291
                                                                   this.clientRoutingTimeoutMs));
@@ -301,8 +301,10 @@ public VoldemortConfig(Props props) {
301 301
         this.clientTimeoutConfig.setOperationTimeout(VoldemortOpCode.DELETE_OP_CODE,
302 302
                                                      props.getInt("client.routing.delete.timeout.ms",
303 303
                                                                   this.clientRoutingTimeoutMs));
304  
-        this.clientTimeoutConfig.setPartialGetAllAllowed(props.getBoolean("client.routing.allow.partial.getall",
305  
-                                                                          false));
  304
+        this.clientTimeoutConfig.setPartialGetAllsAllowed(props.getBoolean("client.routing.allow.partial.getall",
  305
+                                                                           false));
  306
+        this.clientTimeoutConfig.setPartialHasKeysAllowed(props.getBoolean("client.routing.allow.partial.haskeys",
  307
+                                                                           false));
306 308
         this.clientMaxThreads = props.getInt("client.max.threads", 500);
307 309
         this.clientThreadIdleMs = props.getInt("client.thread.idle.ms", 100000);
308 310
         this.clientMaxQueuedRequests = props.getInt("client.max.queued.requests", 1000);
85  src/java/voldemort/store/routed/HasKeysPipelineData.java
... ...
@@ -1,85 +0,0 @@
1  
-package voldemort.store.routed;
2  
-
3  
-import java.util.HashMap;
4  
-import java.util.HashSet;
5  
-import java.util.List;
6  
-import java.util.Map;
7  
-
8  
-import org.apache.commons.lang.mutable.MutableInt;
9  
-
10  
-import voldemort.cluster.Node;
11  
-import voldemort.utils.ByteArray;
12  
-
13  
-/**
14  
- * This is used only by the "get all" operation as it includes data specific
15  
- * only to that operation.
16  
- */
17  
-
18  
-public class HasKeysPipelineData extends PipelineData<Iterable<ByteArray>, Map<ByteArray, Boolean>> {
19  
-
20  
-    private final Map<ByteArray, Boolean> result;
21  
-
22  
-    // Keys for each node needed to satisfy storeDef.getPreferredReads() if
23  
-    // no failures.
24  
-    private Map<Node, List<ByteArray>> nodeToKeysMap;
25  
-
26  
-    // Keep track of nodes per key that might be needed if there are
27  
-    // failures during getAll
28  
-    private Map<ByteArray, List<Node>> keyToExtraNodesMap;
29  
-
30  
-    private final Map<ByteArray, MutableInt> keyToSuccessCount;
31  
-
32  
-    private final Map<ByteArray, HashSet<Integer>> keyToZoneResponses;
33  
-
34  
-    private Integer zonesRequired;
35  
-
36  
-    public HasKeysPipelineData() {
37  
-        this.result = new HashMap<ByteArray, Boolean>();
38  
-        this.keyToSuccessCount = new HashMap<ByteArray, MutableInt>();
39  
-        this.keyToZoneResponses = new HashMap<ByteArray, HashSet<Integer>>();
40  
-    }
41  
-
42  
-    public Map<ByteArray, HashSet<Integer>> getKeyToZoneResponse() {
43  
-        return this.keyToZoneResponses;
44  
-    }
45  
-
46  
-    public Map<ByteArray, Boolean> getResult() {
47  
-        return result;
48  
-    }
49  
-
50  
-    public Map<Node, List<ByteArray>> getNodeToKeysMap() {
51  
-        return nodeToKeysMap;
52  
-    }
53  
-
54  
-    public void setNodeToKeysMap(Map<Node, List<ByteArray>> nodeToKeysMap) {
55  
-        this.nodeToKeysMap = nodeToKeysMap;
56  
-    }
57  
-
58  
-    public Map<ByteArray, List<Node>> getKeyToExtraNodesMap() {
59  
-        return keyToExtraNodesMap;
60  
-    }
61  
-
62  
-    public void setKeyToExtraNodesMap(Map<ByteArray, List<Node>> keyToExtraNodesMap) {
63  
-        this.keyToExtraNodesMap = keyToExtraNodesMap;
64  
-    }
65  
-
66  
-    public MutableInt getSuccessCount(ByteArray key) {
67  
-        MutableInt value = keyToSuccessCount.get(key);
68  
-
69  
-        if(value == null) {
70  
-            value = new MutableInt(0);
71  
-            keyToSuccessCount.put(key, value);
72  
-        }
73  
-
74  
-        return value;
75  
-    }
76  
-
77  
-    public void setZonesRequired(Integer zonesRequired) {
78  
-        this.zonesRequired = zonesRequired;
79  
-    }
80  
-
81  
-    public Integer getZonesRequired() {
82  
-        return this.zonesRequired;
83  
-    }
84  
-
85  
-}
13  ...va/voldemort/store/routed/GetAllPipelineData.java → ...voldemort/store/routed/MultiKeysPipelineData.java
@@ -25,17 +25,14 @@
25 25
 
26 26
 import voldemort.cluster.Node;
27 27
 import voldemort.utils.ByteArray;
28  
-import voldemort.versioning.Versioned;
29 28
 
30 29
 /**
31 30
  * This is used only by the "get all" operation as it includes data specific
32 31
  * only to that operation.
33 32
  */
  33
+public class MultiKeysPipelineData<T> extends PipelineData<Iterable<ByteArray>, Map<ByteArray, T>> {
34 34
 
35  
-public class GetAllPipelineData extends
36  
-        PipelineData<Iterable<ByteArray>, Map<ByteArray, List<Versioned<byte[]>>>> {
37  
-
38  
-    private final Map<ByteArray, List<Versioned<byte[]>>> result;
  35
+    private final Map<ByteArray, T> result;
39 36
 
40 37
     // Keys for each node needed to satisfy storeDef.getPreferredReads() if
41 38
     // no failures.
@@ -53,8 +50,8 @@
53 50
 
54 51
     private Integer zonesRequired;
55 52
 
56  
-    public GetAllPipelineData() {
57  
-        this.result = new HashMap<ByteArray, List<Versioned<byte[]>>>();
  53
+    public MultiKeysPipelineData() {
  54
+        this.result = new HashMap<ByteArray, T>();
58 55
         this.keyToSuccessCount = new HashMap<ByteArray, MutableInt>();
59 56
         this.keyToZoneResponses = new HashMap<ByteArray, HashSet<Integer>>();
60 57
     }
@@ -63,7 +60,7 @@ public GetAllPipelineData() {
63 60
         return this.keyToZoneResponses;
64 61
     }
65 62
 
66  
-    public Map<ByteArray, List<Versioned<byte[]>>> getResult() {
  63
+    public Map<ByteArray, T> getResult() {
67 64
         return result;
68 65
     }
69 66
 
47  src/java/voldemort/store/routed/PipelineRoutedStore.java
@@ -37,10 +37,9 @@
37 37
 import voldemort.store.routed.Pipeline.Event;
38 38
 import voldemort.store.routed.Pipeline.Operation;
39 39
 import voldemort.store.routed.action.ConfigureNodes;
40  
-import voldemort.store.routed.action.GetAllConfigureNodes;
41 40
 import voldemort.store.routed.action.GetAllReadRepair;
42  
-import voldemort.store.routed.action.HasKeysConfigureNodes;
43 41
 import voldemort.store.routed.action.IncrementClock;
  42
+import voldemort.store.routed.action.MultiKeysConfigureNodes;
44 43
 import voldemort.store.routed.action.PerformDeleteHintedHandoff;
45 44
 import voldemort.store.routed.action.PerformParallelDeleteRequests;
46 45
 import voldemort.store.routed.action.PerformParallelGetAllRequests;
@@ -295,7 +294,7 @@ private String formatNodeValuesFromGet(List<Response<ByteArray, List<Versioned<b
295 294
 
296 295
         boolean allowReadRepair = repairReads && (transforms == null || transforms.size() == 0);
297 296
 
298  
-        GetAllPipelineData pipelineData = new GetAllPipelineData();
  297
+        MultiKeysPipelineData<List<Versioned<byte[]>>> pipelineData = new MultiKeysPipelineData<List<Versioned<byte[]>>>();
299 298
         if(zoneRoutingEnabled)
300 299
             pipelineData.setZonesRequired(storeDef.getZoneCountReads());
301 300
         else
@@ -306,15 +305,15 @@ private String formatNodeValuesFromGet(List<Response<ByteArray, List<Versioned<b
306 305
                                          timeoutConfig.getOperationTimeout(VoldemortOpCode.GET_ALL_OP_CODE),
307 306
                                          TimeUnit.MILLISECONDS);
308 307
         pipeline.addEventAction(Event.STARTED,
309  
-                                new GetAllConfigureNodes(pipelineData,
310  
-                                                         Event.CONFIGURED,
311  
-                                                         failureDetector,
312  
-                                                         storeDef.getPreferredReads(),
313  
-                                                         storeDef.getRequiredReads(),
314  
-                                                         routingStrategy,
315  
-                                                         keys,
316  
-                                                         transforms,
317  
-                                                         clientZone));
  308
+                                new MultiKeysConfigureNodes<List<Versioned<byte[]>>>(pipelineData,
  309
+                                                                                     Event.CONFIGURED,
  310
+                                                                                     failureDetector,
  311
+                                                                                     storeDef.getPreferredReads(),
  312
+                                                                                     storeDef.getRequiredReads(),
  313
+                                                                                     routingStrategy,
  314
+                                                                                     keys,
  315
+                                                                                     transforms,
  316
+                                                                                     clientZone));
318 317
         pipeline.addEventAction(Event.CONFIGURED,
319 318
                                 new PerformParallelGetAllRequests(pipelineData,
320 319
                                                                   Event.INSUFFICIENT_SUCCESSES,
@@ -330,7 +329,7 @@ private String formatNodeValuesFromGet(List<Response<ByteArray, List<Versioned<b
330 329
                                                                 innerStores,
331 330
                                                                 storeDef.getPreferredReads(),
332 331
                                                                 storeDef.getRequiredReads(),
333  
-                                                                timeoutConfig.isPartialGetAllAllowed()));
  332
+                                                                timeoutConfig.isPartialGetAllsAllowed()));
334 333
 
335 334
         if(allowReadRepair)
336 335
             pipeline.addEventAction(Event.RESPONSES_RECEIVED,
@@ -384,7 +383,7 @@ private String formatNodeValuesFromGet(List<Response<ByteArray, List<Versioned<b
384 383
             startTimeNs = System.nanoTime();
385 384
         }
386 385
 
387  
-        HasKeysPipelineData pipelineData = new HasKeysPipelineData();
  386
+        MultiKeysPipelineData<Boolean> pipelineData = new MultiKeysPipelineData<Boolean>();
388 387
         if(zoneRoutingEnabled)
389 388
             pipelineData.setZonesRequired(storeDef.getZoneCountReads());
390 389
         else
@@ -395,14 +394,15 @@ private String formatNodeValuesFromGet(List<Response<ByteArray, List<Versioned<b
395 394
                                          timeoutConfig.getOperationTimeout(VoldemortOpCode.HAS_KEYS_OP_CODE),
396 395
                                          TimeUnit.MILLISECONDS);
397 396
         pipeline.addEventAction(Event.STARTED,
398  
-                                new HasKeysConfigureNodes(pipelineData,
399  
-                                                          Event.CONFIGURED,
400  
-                                                          failureDetector,
401  
-                                                          storeDef.getPreferredReads(),
402  
-                                                          storeDef.getRequiredReads(),
403  
-                                                          routingStrategy,
404  
-                                                          keys,
405  
-                                                          clientZone));
  397
+                                new MultiKeysConfigureNodes<Boolean>(pipelineData,
  398
+                                                                     Event.CONFIGURED,
  399
+                                                                     failureDetector,
  400
+                                                                     storeDef.getPreferredReads(),
  401
+                                                                     storeDef.getRequiredReads(),
  402
+                                                                     routingStrategy,
  403
+                                                                     keys,
  404
+                                                                     null,
  405
+                                                                     clientZone));
406 406
         pipeline.addEventAction(Event.CONFIGURED,
407 407
                                 new PerformParallelHasKeysRequests(pipelineData,
408 408
                                                                    Event.INSUFFICIENT_SUCCESSES,
@@ -418,7 +418,8 @@ private String formatNodeValuesFromGet(List<Response<ByteArray, List<Versioned<b
418 418
                                                                  failureDetector,
419 419
                                                                  innerStores,
420 420
                                                                  storeDef.getPreferredReads(),
421  
-                                                                 storeDef.getRequiredReads()));
  421
+                                                                 storeDef.getRequiredReads(),
  422
+                                                                 timeoutConfig.isPartialHasKeysAllowed()));
422 423
 
423 424
         pipeline.addEvent(Event.STARTED);
424 425
 
8  src/java/voldemort/store/routed/action/GetAllReadRepair.java
@@ -21,18 +21,18 @@
21 21
 import java.util.Map;
22 22
 
23 23
 import voldemort.store.nonblockingstore.NonblockingStore;
24  
-import voldemort.store.routed.GetAllPipelineData;
  24
+import voldemort.store.routed.MultiKeysPipelineData;
  25
+import voldemort.store.routed.Pipeline.Event;
25 26
 import voldemort.store.routed.ReadRepairer;
26 27
 import voldemort.store.routed.Response;
27  
-import voldemort.store.routed.Pipeline.Event;
28 28
 import voldemort.utils.ByteArray;
29 29
 import voldemort.versioning.Versioned;
30 30
 
31 31
 public class GetAllReadRepair
32 32
         extends
33  
-        AbstractReadRepair<Iterable<ByteArray>, Map<ByteArray, List<Versioned<byte[]>>>, GetAllPipelineData> {
  33
+        AbstractReadRepair<Iterable<ByteArray>, Map<ByteArray, List<Versioned<byte[]>>>, MultiKeysPipelineData<List<Versioned<byte[]>>>> {
34 34
 
35  
-    public GetAllReadRepair(GetAllPipelineData pipelineData,
  35
+    public GetAllReadRepair(MultiKeysPipelineData<List<Versioned<byte[]>>> pipelineData,
36 36
                             Event completeEvent,
37 37
                             int preferred,
38 38
                             long timeoutMs,
157  src/java/voldemort/store/routed/action/HasKeysConfigureNodes.java
... ...
@@ -1,157 +0,0 @@
1  
-/*
2  
- * Copyright 2012 LinkedIn, Inc
3  
- * 
4  
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5  
- * use this file except in compliance with the License. You may obtain a copy of
6  
- * the License at
7  
- * 
8  
- * http://www.apache.org/licenses/LICENSE-2.0
9  
- * 
10  
- * Unless required by applicable law or agreed to in writing, software
11  
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  
- * License for the specific language governing permissions and limitations under
14  
- * the License.
15  
- */
16  
-
17  
-package voldemort.store.routed.action;
18  
-
19  
-import java.util.ArrayList;
20  
-import java.util.HashMap;
21  
-import java.util.LinkedList;
22  
-import java.util.List;
23  
-import java.util.Map;
24  
-
25  
-import voldemort.VoldemortException;
26  
-import voldemort.cluster.Node;
27  
-import voldemort.cluster.Zone;
28  
-import voldemort.cluster.failuredetector.FailureDetector;
29  
-import voldemort.routing.RoutingStrategy;
30  
-import voldemort.store.routed.HasKeysPipelineData;
31  
-import voldemort.store.routed.Pipeline;
32  
-import voldemort.store.routed.Pipeline.Event;
33  
-import voldemort.utils.ByteArray;
34  
-
35  
-import com.google.common.collect.Lists;
36  
-import com.google.common.collect.Maps;
37  
-
38  
-public class HasKeysConfigureNodes extends
39  
-        AbstractConfigureNodes<Iterable<ByteArray>, Map<ByteArray, Boolean>, HasKeysPipelineData> {
40  
-
41  
-    private final int preferred;
42  
-
43  
-    private final Iterable<ByteArray> keys;
44  
-
45  
-    private final Zone clientZone;
46  
-
47  
-    public HasKeysConfigureNodes(HasKeysPipelineData pipelineData,
48  
-                                 Event completeEvent,
49  
-                                 FailureDetector failureDetector,
50  
-                                 int preferred,
51  
-                                 int required,
52  
-                                 RoutingStrategy routingStrategy,
53  
-                                 Iterable<ByteArray> keys,
54  
-                                 Zone clientZone) {
55  
-        super(pipelineData, completeEvent, failureDetector, required, routingStrategy);
56  
-        this.preferred = preferred;
57  
-        this.keys = keys;
58  
-        this.clientZone = clientZone;
59  
-    }
60  
-
61  
-    public void execute(Pipeline pipeline) {
62  
-        Map<Node, List<ByteArray>> nodeToKeysMap = Maps.newHashMap();
63  
-        Map<ByteArray, List<Node>> keyToExtraNodesMap = Maps.newHashMap();
64  
-
65  
-        for(ByteArray key: keys) {
66  
-            List<Node> nodes = null;
67  
-
68  
-            try {
69  
-                nodes = getNodes(key);
70  
-            } catch(VoldemortException e) {
71  
-                pipelineData.setFatalError(e);
72  
-                pipeline.addEvent(Event.ERROR);
73  
-                return;
74  
-            }
75  
-
76  
-            List<Node> preferredNodes = Lists.newArrayListWithCapacity(preferred);
77  
-            List<Node> extraNodes = Lists.newArrayListWithCapacity(3);
78  
-
79  
-            if(pipelineData.getZonesRequired() != null) {
80  
-
81  
-                if(pipelineData.getZonesRequired() > this.clientZone.getProximityList().size()) {
82  
-                    throw new VoldemortException("Number of zones required should be less than the total number of zones");
83  
-                }
84  
-
85  
-                if(pipelineData.getZonesRequired() > required) {
86  
-                    throw new VoldemortException("Number of zones required should be less than the required number of "
87  
-                                                 + pipeline.getOperation().getSimpleName() + "s");
88  
-                }
89  
-
90  
-                // Create zone id to node mapping
91  
-                Map<Integer, List<Node>> zoneIdToNode = new HashMap<Integer, List<Node>>();
92  
-                for(Node node: nodes) {
93  
-                    List<Node> nodesList = null;
94  
-                    if(zoneIdToNode.containsKey(node.getZoneId())) {
95  
-                        nodesList = zoneIdToNode.get(node.getZoneId());
96  
-                    } else {
97  
-                        nodesList = new ArrayList<Node>();
98  
-                        zoneIdToNode.put(node.getZoneId(), nodesList);
99  
-                    }
100  
-                    nodesList.add(node);
101  
-                }
102  
-
103  
-                nodes = new ArrayList<Node>();
104  
-                LinkedList<Integer> proximityList = this.clientZone.getProximityList();
105  
-                // Add a node from every zone
106  
-                for(int index = 0; index < pipelineData.getZonesRequired(); index++) {
107  
-                    List<Node> zoneNodes = zoneIdToNode.get(proximityList.get(index));
108  
-                    if(zoneNodes != null) {
109  
-                        nodes.add(zoneNodes.remove(0));
110  
-                    }
111  
-                }
112  
-
113  
-                // Add the rest
114  
-                nodes.addAll(zoneIdToNode.get(this.clientZone.getId()));
115  
-                for(int index = 0; index < proximityList.size(); index++) {
116  
-                    List<Node> zoneNodes = zoneIdToNode.get(proximityList.get(index));
117  
-                    if(zoneNodes != null)
118  
-                        nodes.addAll(zoneNodes);
119  
-                }
120  
-
121  
-            }
122  
-
123  
-            for(Node node: nodes) {
124  
-                if(preferredNodes.size() < preferred)
125  
-                    preferredNodes.add(node);
126  
-                else
127  
-                    extraNodes.add(node);
128  
-            }
129  
-
130  
-            for(Node node: preferredNodes) {
131  
-                List<ByteArray> nodeKeys = nodeToKeysMap.get(node);
132  
-
133  
-                if(nodeKeys == null) {
134  
-                    nodeKeys = Lists.newArrayList();
135  
-                    nodeToKeysMap.put(node, nodeKeys);
136  
-                }
137  
-
138  
-                nodeKeys.add(key);
139  
-            }
140  
-
141  
-            if(!extraNodes.isEmpty()) {
142  
-                List<Node> list = keyToExtraNodesMap.get(key);
143  
-
144  
-                if(list == null)
145  
-                    keyToExtraNodesMap.put(key, extraNodes);
146  
-                else
147  
-                    list.addAll(extraNodes);
148  
-            }
149  
-        }
150  
-
151  
-        pipelineData.setKeyToExtraNodesMap(keyToExtraNodesMap);
152  
-        pipelineData.setNodeToKeysMap(nodeToKeysMap);
153  
-
154  
-        pipeline.addEvent(completeEvent);
155  
-    }
156  
-
157  
-}
26  ...ort/store/routed/action/GetAllConfigureNodes.java → .../store/routed/action/MultiKeysConfigureNodes.java
@@ -27,18 +27,16 @@
27 27
 import voldemort.cluster.Zone;
28 28
 import voldemort.cluster.failuredetector.FailureDetector;
29 29
 import voldemort.routing.RoutingStrategy;
30  
-import voldemort.store.routed.GetAllPipelineData;
  30
+import voldemort.store.routed.MultiKeysPipelineData;
31 31
 import voldemort.store.routed.Pipeline;
32 32
 import voldemort.store.routed.Pipeline.Event;
33 33
 import voldemort.utils.ByteArray;
34  
-import voldemort.versioning.Versioned;
35 34
 
36 35
 import com.google.common.collect.Lists;
37 36
 import com.google.common.collect.Maps;
38 37
 
39  
-public class GetAllConfigureNodes
40  
-        extends
41  
-        AbstractConfigureNodes<Iterable<ByteArray>, Map<ByteArray, List<Versioned<byte[]>>>, GetAllPipelineData> {
  38
+public class MultiKeysConfigureNodes<T> extends
  39
+        AbstractConfigureNodes<Iterable<ByteArray>, Map<ByteArray, T>, MultiKeysPipelineData<T>> {
42 40
 
43 41
     private final int preferred;
44 42
 
@@ -48,15 +46,15 @@
48 46
 
49 47
     private final Map<ByteArray, byte[]> transforms;
50 48
 
51  
-    public GetAllConfigureNodes(GetAllPipelineData pipelineData,
52  
-                                Event completeEvent,
53  
-                                FailureDetector failureDetector,
54  
-                                int preferred,
55  
-                                int required,
56  
-                                RoutingStrategy routingStrategy,
57  
-                                Iterable<ByteArray> keys,
58  
-                                Map<ByteArray, byte[]> transforms,
59  
-                                Zone clientZone) {
  49
+    public MultiKeysConfigureNodes(MultiKeysPipelineData<T> pipelineData,
  50
+                                   Event completeEvent,
  51
+                                   FailureDetector failureDetector,
  52
+                                   int preferred,
  53
+                                   int required,
  54
+                                   RoutingStrategy routingStrategy,
  55
+                                   Iterable<ByteArray> keys,
  56
+                                   Map<ByteArray, byte[]> transforms,
  57
+                                   Zone clientZone) {
60 58
         super(pipelineData, completeEvent, failureDetector, required, routingStrategy);
61 59
         this.preferred = preferred;
62 60
         this.keys = keys;
152  src/java/voldemort/store/routed/action/PerformParallelGetAllRequests.java
@@ -18,159 +18,49 @@
18 18
 
19 19
 import java.util.ArrayList;
20 20
 import java.util.Collection;
21  
-import java.util.HashSet;
22 21
 import java.util.List;
23 22
 import java.util.Map;
24  
-import java.util.concurrent.ConcurrentHashMap;
25  
-import java.util.concurrent.CountDownLatch;
26  
-import java.util.concurrent.TimeUnit;
27 23
 
28  
-import org.apache.commons.lang.mutable.MutableInt;
29  
-import org.apache.log4j.Level;
30  
-
31  
-import voldemort.cluster.Node;
32 24
 import voldemort.cluster.failuredetector.FailureDetector;
33  
-import voldemort.store.InvalidMetadataException;
34 25
 import voldemort.store.nonblockingstore.NonblockingStore;
35 26
 import voldemort.store.nonblockingstore.NonblockingStoreCallback;
36  
-import voldemort.store.routed.GetAllPipelineData;
37  
-import voldemort.store.routed.Pipeline;
  27
+import voldemort.store.routed.MultiKeysPipelineData;
38 28
 import voldemort.store.routed.Pipeline.Event;
39  
-import voldemort.store.routed.Response;
40 29
 import voldemort.utils.ByteArray;
41 30
 import voldemort.versioning.Versioned;
42 31
 
43 32
 import com.google.common.collect.Lists;
44 33
 
45  
-public class PerformParallelGetAllRequests
46  
-        extends
47  
-        AbstractAction<Iterable<ByteArray>, Map<ByteArray, List<Versioned<byte[]>>>, GetAllPipelineData> {
48  
-
49  
-    private final long timeoutMs;
50  
-
51  
-    private final Map<Integer, NonblockingStore> nonblockingStores;
  34
+public class PerformParallelGetAllRequests extends
  35
+        PerformParallelMultiKeysRequests<List<Versioned<byte[]>>> {
52 36
 
53  
-    private final FailureDetector failureDetector;
54  
-
55  
-    public PerformParallelGetAllRequests(GetAllPipelineData pipelineData,
  37
+    public PerformParallelGetAllRequests(MultiKeysPipelineData<List<Versioned<byte[]>>> pipelineData,
56 38
                                          Event completeEvent,
57 39
                                          FailureDetector failureDetector,
58 40
                                          long timeoutMs,
59 41
                                          Map<Integer, NonblockingStore> nonblockingStores) {
60  
-        super(pipelineData, completeEvent);
61  
-        this.failureDetector = failureDetector;
62  
-        this.timeoutMs = timeoutMs;
63  
-        this.nonblockingStores = nonblockingStores;
  42
+        super(pipelineData, completeEvent, failureDetector, timeoutMs, nonblockingStores);
64 43
     }
65 44
 
66  
-    @SuppressWarnings("unchecked")
67  
-    public void execute(final Pipeline pipeline) {
68  
-        int attempts = pipelineData.getNodeToKeysMap().size();
69  
-        final Map<Integer, Response<Iterable<ByteArray>, Object>> responses = new ConcurrentHashMap<Integer, Response<Iterable<ByteArray>, Object>>();
70  
-        final CountDownLatch latch = new CountDownLatch(attempts);
71  
-
72  
-        if(logger.isTraceEnabled())
73  
-            logger.trace("Attempting " + attempts + " " + pipeline.getOperation().getSimpleName()
74  
-                         + " operations in parallel");
75  
-
76  
-        Map<ByteArray, byte[]> transforms = pipelineData.getTransforms();
77  
-
78  
-        for(Map.Entry<Node, List<ByteArray>> entry: pipelineData.getNodeToKeysMap().entrySet()) {
79  
-            final Node node = entry.getKey();
80  
-            final Collection<ByteArray> keys = entry.getValue();
81  
-
82  
-            NonblockingStoreCallback callback = new NonblockingStoreCallback() {
83  
-
84  
-                public void requestComplete(Object result, long requestTime) {
85  
-                    if(logger.isTraceEnabled())
86  
-                        logger.trace(pipeline.getOperation().getSimpleName()
87  
-                                     + " response received (" + requestTime + " ms.) from node "
88  
-                                     + node.getId());
89  
-
90  
-                    Response<Iterable<ByteArray>, Object> response = new Response<Iterable<ByteArray>, Object>(node,
91  
-                                                                                                               keys,
92  
-                                                                                                               result,
93  
-                                                                                                               requestTime);
94  
-                    responses.put(node.getId(), response);
95  
-                    latch.countDown();
96  
-
97  
-                    // Note errors that come in after the pipeline has finished.
98  
-                    // These will *not* get a chance to be called in the loop of
99  
-                    // responses below.
100  
-                    if(pipeline.isFinished() && response.getValue() instanceof Exception)
101  
-                        if(response.getValue() instanceof InvalidMetadataException) {
102  
-                            pipelineData.reportException((InvalidMetadataException) response.getValue());
103  
-                            logger.warn("Received invalid metadata problem after a successful "
104  
-                                        + pipeline.getOperation().getSimpleName()
105  
-                                        + " call on node " + node.getId() + ", store '"
106  
-                                        + pipelineData.getStoreName() + "'");
107  
-                        } else {
108  
-                            handleResponseError(response, pipeline, failureDetector);
109  
-                        }
110  
-                }
111  
-
112  
-            };
113  
-
114  
-            if(logger.isTraceEnabled())
115  
-                logger.trace("Submitting " + pipeline.getOperation().getSimpleName()
116  
-                             + " request on node " + node.getId());
117  
-
118  
-            NonblockingStore store = nonblockingStores.get(node.getId());
119  
-            store.submitGetAllRequest(keys, transforms, callback, timeoutMs);
120  
-        }
  45
+    @Override
  46
+    public void submitRequest(NonblockingStore store,
  47
+                              NonblockingStoreCallback callback,
  48
+                              Collection<ByteArray> keys) {
  49
+        store.submitGetAllRequest(keys, pipelineData.getTransforms(), callback, timeoutMs);
  50
+    }
121 51
 
122  
-        try {
123  
-            latch.await(timeoutMs, TimeUnit.MILLISECONDS);
124  
-        } catch(InterruptedException e) {
125  
-            if(logger.isEnabledFor(Level.WARN))
126  
-                logger.warn(e, e);
  52
+    @Override
  53
+    public void transform(ByteArray key, Map<ByteArray, List<Versioned<byte[]>>> values) {
  54
+        List<Versioned<byte[]>> retrieved = values.get(key);
  55
+        if(retrieved == null) {
  56
+            retrieved = new ArrayList<Versioned<byte[]>>();
127 57
         }
128 58
 
129  
-        for(Response<Iterable<ByteArray>, Object> response: responses.values()) {
130  
-            if(response.getValue() instanceof Exception) {
131  
-                if(handleResponseError(response, pipeline, failureDetector))
132  
-                    return;
133  
-            } else {
134  
-                Map<ByteArray, List<Versioned<byte[]>>> values = (Map<ByteArray, List<Versioned<byte[]>>>) response.getValue();
135  
-
136  
-                for(ByteArray key: response.getKey()) {
137  
-                    MutableInt successCount = pipelineData.getSuccessCount(key);
138  
-                    successCount.increment();
139  
-
140  
-                    List<Versioned<byte[]>> retrieved = values.get(key);
141  
-                    if(retrieved == null) {
142  
-                        retrieved = new ArrayList<Versioned<byte[]>>();
143  
-                    }
144  
-                    /*
145  
-                     * retrieved can be null if there are no values for the key
146  
-                     * provided
147  
-                     */
148  
-                    List<Versioned<byte[]>> existing = pipelineData.getResult().get(key);
149  
-
150  
-                    if(existing == null)
151  
-                        pipelineData.getResult().put(key, Lists.newArrayList(retrieved));
152  
-                    else
153  
-                        existing.addAll(retrieved);
154  
-
155  
-                    HashSet<Integer> zoneResponses = null;
156  
-                    if(pipelineData.getKeyToZoneResponse().containsKey(key)) {
157  
-                        zoneResponses = pipelineData.getKeyToZoneResponse().get(key);
158  
-                    } else {
159  
-                        zoneResponses = new HashSet<Integer>();
160  
-                        pipelineData.getKeyToZoneResponse().put(key, zoneResponses);
161  
-                    }
162  
-                    zoneResponses.add(response.getNode().getZoneId());
163  
-                }
164  
-
165  
-                pipelineData.getResponses()
166  
-                            .add(new Response<Iterable<ByteArray>, Map<ByteArray, List<Versioned<byte[]>>>>(response.getNode(),
167  
-                                                                                                            response.getKey(),
168  
-                                                                                                            values,
169  
-                                                                                                            response.getRequestTime()));
170  
-                failureDetector.recordSuccess(response.getNode(), response.getRequestTime());
171  
-            }
172  
-        }
  59
+        List<Versioned<byte[]>> existing = pipelineData.getResult().get(key);
173 60
 
174  
-        pipeline.addEvent(completeEvent);
  61
+        if(existing == null)
  62
+            pipelineData.getResult().put(key, Lists.newArrayList(retrieved));
  63
+        else
  64
+            existing.addAll(retrieved);
175 65
     }
176 66
 }
145  src/java/voldemort/store/routed/action/PerformParallelHasKeysRequests.java
@@ -17,152 +17,47 @@
17 17
 package voldemort.store.routed.action;
18 18
 
19 19
 import java.util.Collection;
20  
-import java.util.HashSet;
21  
-import java.util.List;
22 20
 import java.util.Map;
23  
-import java.util.concurrent.ConcurrentHashMap;
24  
-import java.util.concurrent.CountDownLatch;
25  
-import java.util.concurrent.TimeUnit;
26 21
 
27  
-import org.apache.commons.lang.mutable.MutableInt;
28  
-import org.apache.log4j.Level;
29  
-
30  
-import voldemort.cluster.Node;
31 22
 import voldemort.cluster.failuredetector.FailureDetector;
32  
-import voldemort.store.InvalidMetadataException;
33 23
 import voldemort.store.nonblockingstore.NonblockingStore;
34 24
 import voldemort.store.nonblockingstore.NonblockingStoreCallback;
35  
-import voldemort.store.routed.HasKeysPipelineData;
36  
-import voldemort.store.routed.Pipeline;
  25
+import voldemort.store.routed.MultiKeysPipelineData;
37 26
 import voldemort.store.routed.Pipeline.Event;
38  
-import voldemort.store.routed.Response;
39 27
 import voldemort.utils.ByteArray;
40 28
 
41  
-public class PerformParallelHasKeysRequests extends
42  
-        AbstractAction<Iterable<ByteArray>, Map<ByteArray, Boolean>, HasKeysPipelineData> {
43  
-
44  
-    private final long timeoutMs;
45  
-
46  
-    private final Map<Integer, NonblockingStore> nonblockingStores;
47  
-
48  
-    private final FailureDetector failureDetector;
  29
+public class PerformParallelHasKeysRequests extends PerformParallelMultiKeysRequests<Boolean> {
49 30
 
50 31
     private final boolean exact;
51 32
 
52  
-    public PerformParallelHasKeysRequests(HasKeysPipelineData pipelineData,
  33
+    public PerformParallelHasKeysRequests(MultiKeysPipelineData<Boolean> pipelineData,
53 34
                                           Event completeEvent,
54 35
                                           FailureDetector failureDetector,
55 36
                                           long timeoutMs,
56 37
                                           Map<Integer, NonblockingStore> nonblockingStores,
57 38
                                           boolean exact) {
58  
-        super(pipelineData, completeEvent);
59  
-        this.failureDetector = failureDetector;
60  
-        this.timeoutMs = timeoutMs;
61  
-        this.nonblockingStores = nonblockingStores;
  39
+        super(pipelineData, completeEvent, failureDetector, timeoutMs, nonblockingStores);
62 40
         this.exact = exact;
63 41
     }
64 42
 
65  
-    @SuppressWarnings("unchecked")
66  
-    public void execute(final Pipeline pipeline) {
67  
-        int attempts = pipelineData.getNodeToKeysMap().size();
68  
-        final Map<Integer, Response<Iterable<ByteArray>, Object>> responses = new ConcurrentHashMap<Integer, Response<Iterable<ByteArray>, Object>>();
69  
-        final CountDownLatch latch = new CountDownLatch(attempts);
70  
-
71  
-        if(logger.isTraceEnabled())
72  
-            logger.trace("Attempting " + attempts + " " + pipeline.getOperation().getSimpleName()
73  
-                         + " operations in parallel");
74  
-
75  
-        for(Map.Entry<Node, List<ByteArray>> entry: pipelineData.getNodeToKeysMap().entrySet()) {
76  
-            final Node node = entry.getKey();
77  
-            final Collection<ByteArray> keys = entry.getValue();
78  
-
79  
-            NonblockingStoreCallback callback = new NonblockingStoreCallback() {
80  
-
81  
-                public void requestComplete(Object result, long requestTime) {
82  
-                    if(logger.isTraceEnabled())
83  
-                        logger.trace(pipeline.getOperation().getSimpleName()
84  
-                                     + " response received (" + requestTime + " ms.) from node "
85  
-                                     + node.getId());
86  
-
87  
-                    Response<Iterable<ByteArray>, Object> response = new Response<Iterable<ByteArray>, Object>(node,
88  
-                                                                                                               keys,
89  
-                                                                                                               result,
90  
-                                                                                                               requestTime);
91  
-                    responses.put(node.getId(), response);
92  
-                    latch.countDown();
93  
-
94  
-                    // Note errors that come in after the pipeline has finished.
95  
-                    // These will *not* get a chance to be called in the loop of
96  
-                    // responses below.
97  
-                    if(pipeline.isFinished() && response.getValue() instanceof Exception)
98  
-                        if(response.getValue() instanceof InvalidMetadataException) {
99  
-                            pipelineData.reportException((InvalidMetadataException) response.getValue());
100  
-                            logger.warn("Received invalid metadata problem after a successful "
101  
-                                        + pipeline.getOperation().getSimpleName()
102  
-                                        + " call on node " + node.getId() + ", store '"
103  
-                                        + pipelineData.getStoreName() + "'");
104  
-                        } else {
105  
-                            handleResponseError(response, pipeline, failureDetector);
106  
-                        }
107  
-                }
  43
+    @Override
  44
+    public void transform(ByteArray key, Map<ByteArray, Boolean> values) {
  45
+        Boolean retrieved = values.get(key);
  46
+        if(retrieved == null)
  47
+            retrieved = new Boolean(false);
108 48
 
109  
-            };
  49
+        Boolean existing = pipelineData.getResult().get(key);
  50
+        if(existing == null)
  51
+            pipelineData.getResult().put(key, retrieved);
  52
+        else
  53
+            pipelineData.getResult().put(key, existing | retrieved);
110 54
 
111  
-            if(logger.isTraceEnabled())
112  
-                logger.trace("Submitting " + pipeline.getOperation().getSimpleName()
113  
-                             + " request on node " + node.getId());
114  
-
115  
-            NonblockingStore store = nonblockingStores.get(node.getId());
116  
-            store.submitHasKeysRequest(keys, exact, callback, timeoutMs);
117  
-        }
118  
-
119  
-        try {
120  
-            latch.await(timeoutMs, TimeUnit.MILLISECONDS);
121  
-        } catch(InterruptedException e) {
122  
-            if(logger.isEnabledFor(Level.WARN))
123  
-                logger.warn(e, e);
124  
-        }
125  
-
126  
-        for(Response<Iterable<ByteArray>, Object> response: responses.values()) {
127  
-            if(response.getValue() instanceof Exception) {
128  
-                if(handleResponseError(response, pipeline, failureDetector))
129  
-                    return;
130  
-            } else {
131  
-                Map<ByteArray, Boolean> values = (Map<ByteArray, Boolean>) response.getValue();
132  
-
133  
-                for(ByteArray key: response.getKey()) {
134  
-                    MutableInt successCount = pipelineData.getSuccessCount(key);
135  
-                    successCount.increment();
136  
-
137  
-                    Boolean retrieved = values.get(key);
138  
-                    if(retrieved == null)
139  
-                        retrieved = false;
140  
-
141  
-                    Boolean existing = pipelineData.getResult().get(key);
142  
-                    if(existing == null)
143  
-                        pipelineData.getResult().put(key, retrieved);
144  
-                    else
145  
-                        pipelineData.getResult().put(key, existing | retrieved);
146  
-
147  
-                    HashSet<Integer> zoneResponses = null;
148  
-                    if(pipelineData.getKeyToZoneResponse().containsKey(key)) {
149  
-                        zoneResponses = pipelineData.getKeyToZoneResponse().get(key);
150  
-                    } else {
151  
-                        zoneResponses = new HashSet<Integer>();
152  
-                        pipelineData.getKeyToZoneResponse().put(key, zoneResponses);
153  
-                    }
154  
-                    zoneResponses.add(response.getNode().getZoneId());
155  
-                }
156  
-
157  
-                pipelineData.getResponses()
158  
-                            .add(new Response<Iterable<ByteArray>, Map<ByteArray, Boolean>>(response.getNode(),
159  
-                                                                                            response.getKey(),
160  
-                                                                                            values,
161  
-                                                                                            response.getRequestTime()));
162  
-                failureDetector.recordSuccess(response.getNode(), response.getRequestTime());
163  
-            }
164  
-        }
  55
+    }
165 56
 
166  
-        pipeline.addEvent(completeEvent);
  57
+    @Override
  58
+    public void submitRequest(NonblockingStore store,
  59
+                              NonblockingStoreCallback callback,
  60
+                              Collection<ByteArray> keys) {
  61
+        store.submitHasKeysRequest(keys, exact, callback, timeoutMs);
167 62
     }
168 63
 }
162  src/java/voldemort/store/routed/action/PerformParallelMultiKeysRequests.java
... ...
@@ -0,0 +1,162 @@
  1
+/*
  2
+ * Copyright 2010 LinkedIn, Inc
  3
+ * 
  4
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
  5
+ * use this file except in compliance with the License. You may obtain a copy of
  6
+ * the License at
  7
+ * 
  8
+ * http://www.apache.org/licenses/LICENSE-2.0
  9
+ * 
  10
+ * Unless required by applicable law or agreed to in writing, software
  11
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13
+ * License for the specific language governing permissions and limitations under
  14
+ * the License.
  15
+ */
  16
+
  17
+package voldemort.store.routed.action;
  18
+
  19
+import java.util.Collection;
  20
+import java.util.HashSet;
  21
+import java.util.List;
  22
+import java.util.Map;
  23
+import java.util.concurrent.ConcurrentHashMap;
  24
+import java.util.concurrent.CountDownLatch;
  25
+import java.util.concurrent.TimeUnit;
  26
+
  27
+import org.apache.commons.lang.mutable.MutableInt;
  28
+import org.apache.log4j.Level;
  29
+
  30
+import voldemort.cluster.Node;
  31
+import voldemort.cluster.failuredetector.FailureDetector;
  32
+import voldemort.store.InvalidMetadataException;
  33
+import voldemort.store.nonblockingstore.NonblockingStore;
  34
+import voldemort.store.nonblockingstore.NonblockingStoreCallback;
  35
+import voldemort.store.routed.MultiKeysPipelineData;
  36
+import voldemort.store.routed.Pipeline;
  37
+import voldemort.store.routed.Pipeline.Event;
  38
+import voldemort.store.routed.Response;
  39
+import voldemort.utils.ByteArray;
  40
+
  41
+public abstract class PerformParallelMultiKeysRequests<T> extends
  42
+        AbstractAction<Iterable<ByteArray>, Map<ByteArray, T>, MultiKeysPipelineData<T>> {
  43
+
  44
+    protected final long timeoutMs;
  45
+
  46
+    private final Map<Integer, NonblockingStore> nonblockingStores;
  47
+
  48
+    private final FailureDetector failureDetector;
  49
+
  50
+    public PerformParallelMultiKeysRequests(MultiKeysPipelineData<T> pipelineData,
  51
+                                            Event completeEvent,
  52