Skip to content
This repository
Browse code

Refactor all PerformParallel*Request classes.

- got rid of anonymous call back classes
- factored out waitForResponses logic and processResponses logic for most of these classes. GetAll stands out as being fairly different from the others.
- did not refactor to the point of sharing common code across classes, just refactored within each class.
- added many TODOs to the code for further refactoring.
  • Loading branch information...
commit 3162ba2db1151df0cb56ce8f48cfe731d5e69b31 1 parent 551b96d
Jay Wylie authored November 29, 2012
273  src/java/voldemort/store/routed/action/PerformParallelDeleteRequests.java
@@ -71,11 +71,14 @@ public PerformParallelDeleteRequests(PD pipelineData,
71 71
         this.hintedHandoff = hintedHandoff;
72 72
     }
73 73
 
  74
+    // TODO: This is almost identical to PerformParallelPutRequests.execute
  75
+    @Override
74 76
     public void execute(final Pipeline pipeline) {
75 77
         List<Node> nodes = pipelineData.getNodes();
76  
-        final Map<Integer, Response<ByteArray, Object>> responses = new ConcurrentHashMap<Integer, Response<ByteArray, Object>>();
77 78
         int attempts = nodes.size();
78 79
         int blocks = Math.min(preferred, attempts);
  80
+
  81
+        final Map<Integer, Response<ByteArray, Object>> responses = new ConcurrentHashMap<Integer, Response<ByteArray, Object>>();
79 82
         final CountDownLatch attemptsLatch = new CountDownLatch(attempts);
80 83
         final CountDownLatch blocksLatch = new CountDownLatch(blocks);
81 84
 
@@ -83,62 +86,17 @@ public void execute(final Pipeline pipeline) {
83 86
             logger.trace("Attempting " + attempts + " " + pipeline.getOperation().getSimpleName()
84 87
                          + " operations in parallel");
85 88
 
86  
-        long beginTime = System.nanoTime();
  89
+        long startTimeNs = System.nanoTime();
87 90
 
88 91
         for(int i = 0; i < attempts; i++) {
89 92
             final Node node = nodes.get(i);
90 93
             pipelineData.incrementNodeIndex();
91 94
 
92  
-            NonblockingStoreCallback callback = new NonblockingStoreCallback() {
93  
-
94  
-                public void requestComplete(Object result, long requestTime) {
95  
-                    if(logger.isTraceEnabled())
96  
-                        logger.trace(pipeline.getOperation().getSimpleName()
97  
-                                     + " response received (" + requestTime + " ms.) from node "
98  
-                                     + node.getId());
99  
-
100  
-                    Response<ByteArray, Object> response = new Response<ByteArray, Object>(node,
101  
-                                                                                           key,
102  
-                                                                                           result,
103  
-                                                                                           requestTime);
104  
-                    responses.put(node.getId(), response);
105  
-                    if(enableHintedHandoff && pipeline.isFinished()
106  
-                       && response.getValue() instanceof UnreachableStoreException) {
107  
-                        Slop slop = new Slop(pipelineData.getStoreName(),
108  
-                                             Slop.Operation.DELETE,
109  
-                                             key,
110  
-                                             null,
111  
-                                             null,
112  
-                                             node.getId(),
113  
-                                             new Date());
114  
-                        pipelineData.addFailedNode(node);
115  
-                        hintedHandoff.sendHintSerial(node, version, slop);
116  
-                    }
117  
-
118  
-                    attemptsLatch.countDown();
119  
-                    blocksLatch.countDown();
120  
-
121  
-                    if(logger.isTraceEnabled())
122  
-                        logger.trace(attemptsLatch.getCount() + " attempts remaining. Will block "
123  
-                                     + " for " + blocksLatch.getCount() + " more ");
124  
-
125  
-                    // Note errors that come in after the pipeline has finished.
126  
-                    // These will *not* get a chance to be called in the loop of
127  
-                    // responses below.
128  
-                    if(pipeline.isFinished() && response.getValue() instanceof Exception
129  
-                       && !(response.getValue() instanceof ObsoleteVersionException)) {
130  
-                        if(response.getValue() instanceof InvalidMetadataException) {
131  
-                            pipelineData.reportException((InvalidMetadataException) response.getValue());
132  
-                            logger.warn("Received invalid metadata problem after a successful "
133  
-                                        + pipeline.getOperation().getSimpleName()
134  
-                                        + " call on node " + node.getId() + ", store '"
135  
-                                        + pipelineData.getStoreName() + "'");
136  
-                        } else {
137  
-                            handleResponseError(response, pipeline, failureDetector);
138  
-                        }
139  
-                    }
140  
-                }
141  
-            };
  95
+            NonblockingStoreCallback callback = new Callback(pipeline,
  96
+                                                             node,
  97
+                                                             responses,
  98
+                                                             attemptsLatch,
  99
+                                                             blocksLatch);
142 100
 
143 101
             if(logger.isTraceEnabled())
144 102
                 logger.info("Submitting " + pipeline.getOperation().getSimpleName()
@@ -148,74 +106,13 @@ public void requestComplete(Object result, long requestTime) {
148 106
             store.submitDeleteRequest(key, version, callback, timeoutMs);
149 107
         }
150 108
 
151  
-        try {
152  
-            long ellapsedNs = System.nanoTime() - beginTime;
153  
-            long remainingNs = (timeoutMs * Time.NS_PER_MS) - ellapsedNs;
154  
-            if(remainingNs > 0) {
155  
-                blocksLatch.await(remainingNs, TimeUnit.NANOSECONDS);
156  
-            }
157  
-        } catch(InterruptedException e) {
158  
-            if(logger.isEnabledFor(Level.WARN))
159  
-                logger.warn(e, e);
160  
-        }
161  
-
162  
-        for(Entry<Integer, Response<ByteArray, Object>> responseEntry: responses.entrySet()) {
163  
-            Response<ByteArray, Object> response = responseEntry.getValue();
164  
-            if(response.getValue() instanceof Exception) {
165  
-                if(response.getValue() instanceof ObsoleteVersionException) {
166  
-                    // ignore this completely here
167  
-                    // this means that a higher version was able
168  
-                    // to write on this node and should be termed as
169  
-                    // clean success.
170  
-                    responses.remove(responseEntry.getKey());
171  
-                } else if(handleResponseError(response, pipeline, failureDetector)) {
172  
-                    return;
173  
-                }
174  
-            } else {
175  
-                pipelineData.incrementSuccesses();
176  
-                failureDetector.recordSuccess(response.getNode(), response.getRequestTime());
177  
-                pipelineData.getZoneResponses().add(response.getNode().getZoneId());
178  
-                Response<ByteArray, V> rCast = Utils.uncheckedCast(response);
179  
-                pipelineData.getResponses().add(rCast);
180  
-                responses.remove(responseEntry.getKey());
181  
-            }
182  
-        }
  109
+        waitForResponses(startTimeNs, blocksLatch, responses, pipeline);
183 110
 
184 111
         // wait for more responses in case we did not have enough successful
185 112
         // response to achieve the required count
186 113
         boolean quorumSatisfied = true;
187 114
         if(pipelineData.getSuccesses() < required) {
188  
-            long ellapsedNs = System.nanoTime() - beginTime;
189  
-            long remainingNs = (timeoutMs * Time.NS_PER_MS) - ellapsedNs;
190  
-            if(remainingNs > 0) {
191  
-                try {
192  
-                    attemptsLatch.await(remainingNs, TimeUnit.NANOSECONDS);
193  
-                } catch(InterruptedException e) {
194  
-                    if(logger.isEnabledFor(Level.WARN))
195  
-                        logger.warn(e, e);
196  
-                }
197  
-
198  
-                for(Entry<Integer, Response<ByteArray, Object>> responseEntry: responses.entrySet()) {
199  
-                    Response<ByteArray, Object> response = responseEntry.getValue();
200  
-                    if(response.getValue() instanceof Exception) {
201  
-                        if(response.getValue() instanceof ObsoleteVersionException) {
202  
-                            // ignore this completely here
203  
-                            // this means that a higher version was able
204  
-                            // to write on this node and should be termed as
205  
-                            // clean success.
206  
-                            responses.remove(responseEntry.getKey());
207  
-                        } else if(handleResponseError(response, pipeline, failureDetector))
208  
-                            return;
209  
-                    } else {
210  
-                        pipelineData.incrementSuccesses();
211  
-                        failureDetector.recordSuccess(response.getNode(), response.getRequestTime());
212  
-                        pipelineData.getZoneResponses().add(response.getNode().getZoneId());
213  
-                        Response<ByteArray, V> rCast = Utils.uncheckedCast(response);
214  
-                        pipelineData.getResponses().add(rCast);
215  
-                        responses.remove(responseEntry.getKey());
216  
-                    }
217  
-                }
218  
-            }
  115
+            waitForResponses(startTimeNs, attemptsLatch, responses, pipeline);
219 116
 
220 117
             if(pipelineData.getSuccesses() < required) {
221 118
                 pipelineData.setFatalError(new InsufficientOperationalNodesException(required
@@ -240,32 +137,7 @@ public void requestComplete(Object result, long requestTime) {
240 137
                 if(zonesSatisfied >= (pipelineData.getZonesRequired() + 1)) {
241 138
                     pipeline.addEvent(completeEvent);
242 139
                 } else {
243  
-                    long timeMs = (System.nanoTime() - beginTime) / Time.NS_PER_MS;
244  
-
245  
-                    if((timeoutMs - timeMs) > 0) {
246  
-                        try {
247  
-                            attemptsLatch.await(timeoutMs - timeMs, TimeUnit.MILLISECONDS);
248  
-                        } catch(InterruptedException e) {
249  
-                            if(logger.isEnabledFor(Level.WARN))
250  
-                                logger.warn(e, e);
251  
-                        }
252  
-
253  
-                        for(Entry<Integer, Response<ByteArray, Object>> responseEntry: responses.entrySet()) {
254  
-                            Response<ByteArray, Object> response = responseEntry.getValue();
255  
-                            if(response.getValue() instanceof Exception) {
256  
-                                if(handleResponseError(response, pipeline, failureDetector))
257  
-                                    return;
258  
-                            } else {
259  
-                                pipelineData.incrementSuccesses();
260  
-                                failureDetector.recordSuccess(response.getNode(),
261  
-                                                              response.getRequestTime());
262  
-                                pipelineData.getZoneResponses().add(response.getNode().getZoneId());
263  
-                                Response<ByteArray, V> rCast = Utils.uncheckedCast(response);
264  
-                                pipelineData.getResponses().add(rCast);
265  
-                                responses.remove(responseEntry.getKey());
266  
-                            }
267  
-                        }
268  
-                    }
  140
+                    waitForResponses(startTimeNs, attemptsLatch, responses, pipeline);
269 141
 
270 142
                     if(pipelineData.getZoneResponses().size() >= (pipelineData.getZonesRequired() + 1)) {
271 143
                         pipeline.addEvent(completeEvent);
@@ -287,4 +159,123 @@ public void requestComplete(Object result, long requestTime) {
287 159
         }
288 160
     }
289 161
 
  162
+    // TODO: except for start time, this is same as
  163
+    // PerformParallelPutRequests.waitForResponses
  164
+    private void waitForResponses(long startTimeNs,
  165
+                                  CountDownLatch latch,
  166
+                                  final Map<Integer, Response<ByteArray, Object>> responses,
  167
+                                  final Pipeline pipeline) {
  168
+        long elapsedNs = System.nanoTime() - startTimeNs;
  169
+        long remainingNs = (timeoutMs * Time.NS_PER_MS) - elapsedNs;
  170
+        if(remainingNs > 0) {
  171
+            try {
  172
+                latch.await(remainingNs, TimeUnit.NANOSECONDS);
  173
+            } catch(InterruptedException e) {
  174
+                if(logger.isEnabledFor(Level.WARN))
  175
+                    logger.warn(e, e);
  176
+            }
  177
+
  178
+            processResponses(responses, pipeline);
  179
+        }
  180
+    }
  181
+
  182
+    // TODO: except for two lines, this is same as
  183
+    // PerformParallelPutRequests.processResponses
  184
+    private void processResponses(final Map<Integer, Response<ByteArray, Object>> responses,
  185
+                                  final Pipeline pipeline) {
  186
+        for(Entry<Integer, Response<ByteArray, Object>> responseEntry: responses.entrySet()) {
  187
+            Response<ByteArray, Object> response = responseEntry.getValue();
  188
+            // Treat ObsoleteVersionExceptions as success since such an
  189
+            // exception means that a higher version was able to write on the
  190
+            // node.
  191
+            if(response.getValue() instanceof Exception
  192
+               && !(response.getValue() instanceof ObsoleteVersionException)) {
  193
+                if(handleResponseError(response, pipeline, failureDetector))
  194
+                    return;
  195
+            } else {
  196
+                pipelineData.incrementSuccesses();
  197
+                failureDetector.recordSuccess(response.getNode(), response.getRequestTime());
  198
+                pipelineData.getZoneResponses().add(response.getNode().getZoneId());
  199
+
  200
+                // TODO: Are the next two lines necessary!?!?!?
  201
+                Response<ByteArray, V> rCast = Utils.uncheckedCast(response);
  202
+                pipelineData.getResponses().add(rCast);
  203
+
  204
+                responses.remove(responseEntry.getKey());
  205
+            }
  206
+        }
  207
+    }
  208
+
  209
+    // TODO: Almost identical to PerformParallelPutRequests.Callback. Anyway to
  210
+    // refactor into common code?
  211
+    public class Callback implements NonblockingStoreCallback {
  212
+
  213
+        final Pipeline pipeline;
  214
+        final Node node;
  215
+        final Map<Integer, Response<ByteArray, Object>> responses;
  216
+        final CountDownLatch attemptsLatch;
  217
+        final CountDownLatch blocksLatch;
  218
+
  219
+        Callback(Pipeline pipeline,
  220
+                 Node node,
  221
+                 Map<Integer, Response<ByteArray, Object>> responses,
  222
+                 CountDownLatch attemptsLatch,
  223
+                 CountDownLatch blocksLatch) {
  224
+            this.pipeline = pipeline;
  225
+            this.node = node;
  226
+            this.responses = responses;
  227
+            this.attemptsLatch = attemptsLatch;
  228
+            this.blocksLatch = blocksLatch;
  229
+        }
  230
+
  231
+        @Override
  232
+        public void requestComplete(Object result, long requestTime) {
  233
+            if(logger.isTraceEnabled())
  234
+                logger.trace(pipeline.getOperation().getSimpleName() + " response received ("
  235
+                             + requestTime + " ms.) from node " + node.getId());
  236
+
  237
+            Response<ByteArray, Object> response = new Response<ByteArray, Object>(node,
  238
+                                                                                   key,
  239
+                                                                                   result,
  240
+                                                                                   requestTime);
  241
+            responses.put(node.getId(), response);
  242
+
  243
+            // TODO: Must move heavy-weight ops out of callback
  244
+            if(enableHintedHandoff && pipeline.isFinished()
  245
+               && response.getValue() instanceof UnreachableStoreException) {
  246
+                Slop slop = new Slop(pipelineData.getStoreName(),
  247
+                                     Slop.Operation.DELETE,
  248
+                                     key,
  249
+                                     null,
  250
+                                     null,
  251
+                                     node.getId(),
  252
+                                     new Date());
  253
+                pipelineData.addFailedNode(node);
  254
+                hintedHandoff.sendHintSerial(node, version, slop);
  255
+            }
  256
+
  257
+            attemptsLatch.countDown();
  258
+            blocksLatch.countDown();
  259
+
  260
+            if(logger.isTraceEnabled())
  261
+                logger.trace(attemptsLatch.getCount() + " attempts remaining. Will block "
  262
+                             + " for " + blocksLatch.getCount() + " more ");
  263
+
  264
+            // TODO: Must move heavy-weight ops out of callback
  265
+            // Note errors that come in after the pipeline has finished.
  266
+            // These will *not* get a chance to be called in the loop of
  267
+            // responses below.
  268
+            if(pipeline.isFinished() && response.getValue() instanceof Exception
  269
+               && !(response.getValue() instanceof ObsoleteVersionException)) {
  270
+                if(response.getValue() instanceof InvalidMetadataException) {
  271
+                    pipelineData.reportException((InvalidMetadataException) response.getValue());
  272
+                    logger.warn("Received invalid metadata problem after a successful "
  273
+                                + pipeline.getOperation().getSimpleName() + " call on node "
  274
+                                + node.getId() + ", store '" + pipelineData.getStoreName() + "'");
  275
+                } else {
  276
+                    handleResponseError(response, pipeline, failureDetector);
  277
+                }
  278
+            }
  279
+        }
  280
+    }
290 281
 }
96  src/java/voldemort/store/routed/action/PerformParallelGetAllRequests.java
@@ -37,6 +37,7 @@
37 37
 import voldemort.store.routed.Pipeline.Event;
38 38
 import voldemort.store.routed.Response;
39 39
 import voldemort.utils.ByteArray;
  40
+import voldemort.utils.Utils;
40 41
 import voldemort.versioning.Versioned;
41 42
 
42 43
 import com.google.common.collect.Lists;
@@ -62,11 +63,12 @@ public PerformParallelGetAllRequests(GetAllPipelineData pipelineData,
62 63
         this.nonblockingStores = nonblockingStores;
63 64
     }
64 65
 
65  
-    @SuppressWarnings("unchecked")
  66
+    @Override
66 67
     public void execute(final Pipeline pipeline) {
67 68
         int attempts = pipelineData.getNodeToKeysMap().size();
  69
+
68 70
         final Map<Integer, Response<Iterable<ByteArray>, Object>> responses = new ConcurrentHashMap<Integer, Response<Iterable<ByteArray>, Object>>();
69  
-        final CountDownLatch latch = new CountDownLatch(attempts);
  71
+        final CountDownLatch attemptsLatch = new CountDownLatch(attempts);
70 72
 
71 73
         if(logger.isTraceEnabled())
72 74
             logger.trace("Attempting " + attempts + " " + pipeline.getOperation().getSimpleName()
@@ -78,37 +80,11 @@ public void execute(final Pipeline pipeline) {
78 80
             final Node node = entry.getKey();
79 81
             final Collection<ByteArray> keys = entry.getValue();
80 82
 
81  
-            NonblockingStoreCallback callback = new NonblockingStoreCallback() {
82  
-
83  
-                public void requestComplete(Object result, long requestTime) {
84  
-                    if(logger.isTraceEnabled())
85  
-                        logger.trace(pipeline.getOperation().getSimpleName()
86  
-                                     + " response received (" + requestTime + " ms.) from node "
87  
-                                     + node.getId());
88  
-
89  
-                    Response<Iterable<ByteArray>, Object> response = new Response<Iterable<ByteArray>, Object>(node,
90  
-                                                                                                               keys,
91  
-                                                                                                               result,
92  
-                                                                                                               requestTime);
93  
-                    responses.put(node.getId(), response);
94  
-                    latch.countDown();
95  
-
96  
-                    // Note errors that come in after the pipeline has finished.
97  
-                    // These will *not* get a chance to be called in the loop of
98  
-                    // responses below.
99  
-                    if(pipeline.isFinished() && response.getValue() instanceof Exception)
100  
-                        if(response.getValue() instanceof InvalidMetadataException) {
101  
-                            pipelineData.reportException((InvalidMetadataException) response.getValue());
102  
-                            logger.warn("Received invalid metadata problem after a successful "
103  
-                                        + pipeline.getOperation().getSimpleName()
104  
-                                        + " call on node " + node.getId() + ", store '"
105  
-                                        + pipelineData.getStoreName() + "'");
106  
-                        } else {
107  
-                            handleResponseError(response, pipeline, failureDetector);
108  
-                        }
109  
-                }
110  
-
111  
-            };
  83
+            NonblockingStoreCallback callback = new Callback(pipeline,
  84
+                                                             node,
  85
+                                                             keys,
  86
+                                                             responses,
  87
+                                                             attemptsLatch);
112 88
 
113 89
             if(logger.isTraceEnabled())
114 90
                 logger.trace("Submitting " + pipeline.getOperation().getSimpleName()
@@ -119,7 +95,7 @@ public void requestComplete(Object result, long requestTime) {
119 95
         }
120 96
 
121 97
         try {
122  
-            latch.await(timeoutMs, TimeUnit.MILLISECONDS);
  98
+            attemptsLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
123 99
         } catch(InterruptedException e) {
124 100
             if(logger.isEnabledFor(Level.WARN))
125 101
                 logger.warn(e, e);
@@ -130,7 +106,7 @@ public void requestComplete(Object result, long requestTime) {
130 106
                 if(handleResponseError(response, pipeline, failureDetector))
131 107
                     return;
132 108
             } else {
133  
-                Map<ByteArray, List<Versioned<byte[]>>> values = (Map<ByteArray, List<Versioned<byte[]>>>) response.getValue();
  109
+                Map<ByteArray, List<Versioned<byte[]>>> values = Utils.uncheckedCast(response.getValue());
134 110
 
135 111
                 for(ByteArray key: response.getKey()) {
136 112
                     MutableInt successCount = pipelineData.getSuccessCount(key);
@@ -171,4 +147,54 @@ public void requestComplete(Object result, long requestTime) {
171 147
 
172 148
         pipeline.addEvent(completeEvent);
173 149
     }
  150
+
  151
+    public class Callback implements NonblockingStoreCallback {
  152
+
  153
+        final Pipeline pipeline;
  154
+        final Node node;
  155
+        final Collection<ByteArray> keys;
  156
+        final Map<Integer, Response<Iterable<ByteArray>, Object>> responses;
  157
+        final CountDownLatch attemptsLatch;
  158
+
  159
+        Callback(Pipeline pipeline,
  160
+                 Node node,
  161
+                 Collection<ByteArray> keys,
  162
+                 Map<Integer, Response<Iterable<ByteArray>, Object>> responses,
  163
+                 CountDownLatch attemptsLatch) {
  164
+            this.pipeline = pipeline;
  165
+            this.node = node;
  166
+            this.keys = keys;
  167
+            this.responses = responses;
  168
+            this.attemptsLatch = attemptsLatch;
  169
+        }
  170
+
  171
+        @Override
  172
+        public void requestComplete(Object result, long requestTime) {
  173
+            if(logger.isTraceEnabled())
  174
+                logger.trace(pipeline.getOperation().getSimpleName() + " response received ("
  175
+                             + requestTime + " ms.) from node " + node.getId());
  176
+
  177
+            Response<Iterable<ByteArray>, Object> response = new Response<Iterable<ByteArray>, Object>(node,
  178
+                                                                                                       keys,
  179
+                                                                                                       result,
  180
+                                                                                                       requestTime);
  181
+            responses.put(node.getId(), response);
  182
+            attemptsLatch.countDown();
  183
+
  184
+            // TODO: Must move heavy-weight ops out of callback
  185
+            // Note errors that come in after the pipeline has finished.
  186
+            // These will *not* get a chance to be called in the loop of
  187
+            // responses below.
  188
+            if(pipeline.isFinished() && response.getValue() instanceof Exception)
  189
+                if(response.getValue() instanceof InvalidMetadataException) {
  190
+                    pipelineData.reportException((InvalidMetadataException) response.getValue());
  191
+                    logger.warn("Received invalid metadata problem after a successful "
  192
+                                + pipeline.getOperation().getSimpleName() + " call on node "
  193
+                                + node.getId() + ", store '" + pipelineData.getStoreName() + "'");
  194
+                } else {
  195
+                    handleResponseError(response, pipeline, failureDetector);
  196
+                }
  197
+
  198
+        }
  199
+    }
174 200
 }
269  src/java/voldemort/store/routed/action/PerformParallelPutRequests.java
@@ -91,6 +91,7 @@ public boolean isHintedHandoffEnabled() {
91 91
         return enableHintedHandoff;
92 92
     }
93 93
 
  94
+    @Override
94 95
     public void execute(final Pipeline pipeline) {
95 96
         Node master = pipelineData.getMaster();
96 97
         final Versioned<byte[]> versionedCopy = pipelineData.getVersionedCopy();
@@ -116,65 +117,12 @@ public void execute(final Pipeline pipeline) {
116 117
             final Node node = nodes.get(i);
117 118
             pipelineData.incrementNodeIndex();
118 119
 
119  
-            NonblockingStoreCallback callback = new NonblockingStoreCallback() {
120  
-
121  
-                public void requestComplete(Object result, long requestTime) {
122  
-                    if(logger.isTraceEnabled())
123  
-                        logger.trace(pipeline.getOperation().getSimpleName()
124  
-                                     + " response received (" + requestTime + " ms.) from node "
125  
-                                     + node.getId());
126  
-
127  
-                    Response<ByteArray, Object> response = new Response<ByteArray, Object>(node,
128  
-                                                                                           key,
129  
-                                                                                           result,
130  
-                                                                                           requestTime);
131  
-                    responses.put(node.getId(), response);
132  
-
133  
-                    if(logger.isDebugEnabled())
134  
-                        logger.debug("Finished secondary PUT for key "
135  
-                                     + ByteUtils.toHexString(key.get()) + " (keyRef: "
136  
-                                     + System.identityHashCode(key) + "); took " + requestTime
137  
-                                     + " ms on node " + node.getId() + "(" + node.getHost() + ")");
138  
-
139  
-                    if(isHintedHandoffEnabled() && pipeline.isFinished()) {
140  
-                        if(response.getValue() instanceof UnreachableStoreException) {
141  
-                            Slop slop = new Slop(pipelineData.getStoreName(),
142  
-                                                 Slop.Operation.PUT,
143  
-                                                 key,
144  
-                                                 versionedCopy.getValue(),
145  
-                                                 transforms,
146  
-                                                 node.getId(),
147  
-                                                 new Date());
148  
-                            pipelineData.addFailedNode(node);
149  
-                            hintedHandoff.sendHintSerial(node, versionedCopy.getVersion(), slop);
150  
-                        }
151  
-                    }
152  
-
153  
-                    attemptsLatch.countDown();
154  
-                    blocksLatch.countDown();
155  
-
156  
-                    if(logger.isTraceEnabled())
157  
-                        logger.trace(attemptsLatch.getCount() + " attempts remaining. Will block "
158  
-                                     + " for " + blocksLatch.getCount() + " more ");
159  
-
160  
-                    // Note errors that come in after the pipeline has finished.
161  
-                    // These will *not* get a chance to be called in the loop of
162  
-                    // responses below.
163  
-                    if(pipeline.isFinished() && response.getValue() instanceof Exception
164  
-                       && !(response.getValue() instanceof ObsoleteVersionException)) {
165  
-                        if(response.getValue() instanceof InvalidMetadataException) {
166  
-                            pipelineData.reportException((InvalidMetadataException) response.getValue());
167  
-                            logger.warn("Received invalid metadata problem after a successful "
168  
-                                        + pipeline.getOperation().getSimpleName()
169  
-                                        + " call on node " + node.getId() + ", store '"
170  
-                                        + pipelineData.getStoreName() + "'");
171  
-                        } else {
172  
-                            handleResponseError(response, pipeline, failureDetector);
173  
-                        }
174  
-                    }
175  
-                }
176  
-
177  
-            };
  120
+            NonblockingStoreCallback callback = new Callback(pipeline,
  121
+                                                             node,
  122
+                                                             versionedCopy,
  123
+                                                             responses,
  124
+                                                             attemptsLatch,
  125
+                                                             blocksLatch);
178 126
 
179 127
             if(logger.isTraceEnabled())
180 128
                 logger.trace("Submitting " + pipeline.getOperation().getSimpleName()
@@ -183,63 +131,11 @@ public void requestComplete(Object result, long requestTime) {
183 131
             NonblockingStore store = nonblockingStores.get(node.getId());
184 132
             store.submitPutRequest(key, versionedCopy, transforms, callback, timeoutMs);
185 133
         }
186  
-
187  
-        try {
188  
-            long ellapsedNs = System.nanoTime() - pipelineData.getStartTimeNs();
189  
-            long remainingNs = (timeoutMs * Time.NS_PER_MS) - ellapsedNs;
190  
-            if(remainingNs > 0)
191  
-                blocksLatch.await(remainingNs, TimeUnit.NANOSECONDS);
192  
-        } catch(InterruptedException e) {
193  
-            if(logger.isEnabledFor(Level.WARN))
194  
-                logger.warn(e, e);
195  
-        }
196  
-
197  
-        for(Entry<Integer, Response<ByteArray, Object>> responseEntry: responses.entrySet()) {
198  
-            Response<ByteArray, Object> response = responseEntry.getValue();
199  
-            // Treat ObsoleteVersionExceptions as success since such an
200  
-            // exception means that a higher version was able to write on the
201  
-            // node.
202  
-            if(response.getValue() instanceof Exception
203  
-               && !(response.getValue() instanceof ObsoleteVersionException)) {
204  
-                if(handleResponseError(response, pipeline, failureDetector))
205  
-                    return;
206  
-            } else {
207  
-                pipelineData.incrementSuccesses();
208  
-                failureDetector.recordSuccess(response.getNode(), response.getRequestTime());
209  
-                pipelineData.getZoneResponses().add(response.getNode().getZoneId());
210  
-                responses.remove(responseEntry.getKey());
211  
-            }
212  
-        }
  134
+        waitForResponses(blocksLatch, responses, pipeline);
213 135
 
214 136
         boolean quorumSatisfied = true;
215 137
         if(pipelineData.getSuccesses() < required) {
216  
-            long ellapsedNs = System.nanoTime() - pipelineData.getStartTimeNs();
217  
-            long remainingNs = (timeoutMs * Time.NS_PER_MS) - ellapsedNs;
218  
-            if(remainingNs > 0) {
219  
-                try {
220  
-                    attemptsLatch.await(remainingNs, TimeUnit.NANOSECONDS);
221  
-                } catch(InterruptedException e) {
222  
-                    if(logger.isEnabledFor(Level.WARN))
223  
-                        logger.warn(e, e);
224  
-                }
225  
-
226  
-                for(Entry<Integer, Response<ByteArray, Object>> responseEntry: responses.entrySet()) {
227  
-                    Response<ByteArray, Object> response = responseEntry.getValue();
228  
-                    // Treat ObsoleteVersionExceptions as success since such an
229  
-                    // exception means that a higher version was able to write
230  
-                    // on the node.
231  
-                    if(response.getValue() instanceof Exception
232  
-                       && !(response.getValue() instanceof ObsoleteVersionException)) {
233  
-                        if(handleResponseError(response, pipeline, failureDetector))
234  
-                            return;
235  
-                    } else {
236  
-                        pipelineData.incrementSuccesses();
237  
-                        failureDetector.recordSuccess(response.getNode(), response.getRequestTime());
238  
-                        pipelineData.getZoneResponses().add(response.getNode().getZoneId());
239  
-                        responses.remove(responseEntry.getKey());
240  
-                    }
241  
-                }
242  
-            }
  138
+            waitForResponses(attemptsLatch, responses, pipeline);
243 139
 
244 140
             if(pipelineData.getSuccesses() < required) {
245 141
                 pipelineData.setFatalError(new InsufficientOperationalNodesException(required
@@ -260,36 +156,12 @@ public void requestComplete(Object result, long requestTime) {
260 156
 
261 157
         if(quorumSatisfied) {
262 158
             if(pipelineData.getZonesRequired() != null) {
263  
-
264 159
                 int zonesSatisfied = pipelineData.getZoneResponses().size();
  160
+
265 161
                 if(zonesSatisfied >= (pipelineData.getZonesRequired() + 1)) {
266 162
                     pipeline.addEvent(completeEvent);
267 163
                 } else {
268  
-                    long timeMs = (System.nanoTime() - pipelineData.getStartTimeNs())
269  
-                                  / Time.NS_PER_MS;
270  
-
271  
-                    if((timeoutMs - timeMs) > 0) {
272  
-                        try {
273  
-                            attemptsLatch.await(timeoutMs - timeMs, TimeUnit.MILLISECONDS);
274  
-                        } catch(InterruptedException e) {
275  
-                            if(logger.isEnabledFor(Level.WARN))
276  
-                                logger.warn(e, e);
277  
-                        }
278  
-
279  
-                        for(Entry<Integer, Response<ByteArray, Object>> responseEntry: responses.entrySet()) {
280  
-                            Response<ByteArray, Object> response = responseEntry.getValue();
281  
-                            if(response.getValue() instanceof Exception) {
282  
-                                if(handleResponseError(response, pipeline, failureDetector))
283  
-                                    return;
284  
-                            } else {
285  
-                                pipelineData.incrementSuccesses();
286  
-                                failureDetector.recordSuccess(response.getNode(),
287  
-                                                              response.getRequestTime());
288  
-                                pipelineData.getZoneResponses().add(response.getNode().getZoneId());
289  
-                                responses.remove(responseEntry.getKey());
290  
-                            }
291  
-                        }
292  
-                    }
  164
+                    waitForResponses(attemptsLatch, responses, pipeline);
293 165
 
294 166
                     if(pipelineData.getZoneResponses().size() >= (pipelineData.getZonesRequired() + 1)) {
295 167
                         pipeline.addEvent(completeEvent);
@@ -305,10 +177,127 @@ public void requestComplete(Object result, long requestTime) {
305 177
                         pipeline.abort();
306 178
                     }
307 179
                 }
308  
-
309 180
             } else {
310 181
                 pipeline.addEvent(completeEvent);
311 182
             }
312 183
         }
313 184
     }
  185
+
  186
+    private void waitForResponses(CountDownLatch latch,
  187
+                                  final Map<Integer, Response<ByteArray, Object>> responses,
  188
+                                  final Pipeline pipeline) {
  189
+        long elapsedNs = System.nanoTime() - pipelineData.getStartTimeNs();
  190
+        long remainingNs = (timeoutMs * Time.NS_PER_MS) - elapsedNs;
  191
+        if(remainingNs > 0) {
  192
+            try {
  193
+                latch.await(remainingNs, TimeUnit.NANOSECONDS);
  194
+            } catch(InterruptedException e) {
  195
+                if(logger.isEnabledFor(Level.WARN))
  196
+                    logger.warn(e, e);
  197
+            }
  198
+
  199
+            processResponses(responses, pipeline);
  200
+        }
  201
+    }
  202
+
  203
+    private void processResponses(final Map<Integer, Response<ByteArray, Object>> responses,
  204
+                                  final Pipeline pipeline) {
  205
+        for(Entry<Integer, Response<ByteArray, Object>> responseEntry: responses.entrySet()) {
  206
+            Response<ByteArray, Object> response = responseEntry.getValue();
  207
+            // Treat ObsoleteVersionExceptions as success since such an
  208
+            // exception means that a higher version was able to write on the
  209
+            // node.
  210
+            if(response.getValue() instanceof Exception
  211
+               && !(response.getValue() instanceof ObsoleteVersionException)) {
  212
+                if(handleResponseError(response, pipeline, failureDetector))
  213
+                    return;
  214
+            } else {
  215
+                pipelineData.incrementSuccesses();
  216
+                failureDetector.recordSuccess(response.getNode(), response.getRequestTime());
  217
+                pipelineData.getZoneResponses().add(response.getNode().getZoneId());
  218
+                responses.remove(responseEntry.getKey());
  219
+            }
  220
+        }
  221
+    }
  222
+
  223
+    public class Callback implements NonblockingStoreCallback {
  224
+
  225
+        final Pipeline pipeline;
  226
+        final Node node;
  227
+        final Versioned<byte[]> versionedCopy;
  228
+        final Map<Integer, Response<ByteArray, Object>> responses;
  229
+        final CountDownLatch attemptsLatch;
  230
+        final CountDownLatch blocksLatch;
  231
+
  232
+        Callback(Pipeline pipeline,
  233
+                 Node node,
  234
+                 Versioned<byte[]> versionedCopy,
  235
+                 Map<Integer, Response<ByteArray, Object>> responses,
  236
+                 CountDownLatch attemptsLatch,
  237
+                 CountDownLatch blocksLatch) {
  238
+            this.pipeline = pipeline;
  239
+            this.node = node;
  240
+            this.versionedCopy = versionedCopy;
  241
+            this.responses = responses;
  242
+            this.attemptsLatch = attemptsLatch;
  243
+            this.blocksLatch = blocksLatch;
  244
+        }
  245
+
  246
+        @Override
  247
+        public void requestComplete(Object result, long requestTime) {
  248
+            if(logger.isTraceEnabled())
  249
+                logger.trace(pipeline.getOperation().getSimpleName() + " response received ("
  250
+                             + requestTime + " ms.) from node " + node.getId());
  251
+
  252
+            Response<ByteArray, Object> response = new Response<ByteArray, Object>(node,
  253
+                                                                                   key,
  254
+                                                                                   result,
  255
+                                                                                   requestTime);
  256
+            responses.put(node.getId(), response);
  257
+
  258
+            if(logger.isDebugEnabled())
  259
+                logger.debug("Finished secondary PUT for key " + ByteUtils.toHexString(key.get())
  260
+                             + " (keyRef: " + System.identityHashCode(key) + "); took "
  261
+                             + requestTime + " ms on node " + node.getId() + "(" + node.getHost()
  262
+                             + ")");
  263
+
  264
+            // TODO: Must move heavy-weight ops out of callback
  265
+            if(isHintedHandoffEnabled() && pipeline.isFinished()) {
  266
+                if(response.getValue() instanceof UnreachableStoreException) {
  267
+                    Slop slop = new Slop(pipelineData.getStoreName(),
  268
+                                         Slop.Operation.PUT,
  269
+                                         key,
  270
+                                         versionedCopy.getValue(),
  271
+                                         transforms,
  272
+                                         node.getId(),
  273
+                                         new Date());
  274
+                    pipelineData.addFailedNode(node);
  275
+                    hintedHandoff.sendHintSerial(node, versionedCopy.getVersion(), slop);
  276
+                }
  277
+            }
  278
+
  279
+            attemptsLatch.countDown();
  280
+            blocksLatch.countDown();
  281
+
  282
+            if(logger.isTraceEnabled())
  283
+                logger.trace(attemptsLatch.getCount() + " attempts remaining. Will block "
  284
+                             + " for " + blocksLatch.getCount() + " more ");
  285
+
  286
+            // TODO: Must move heavy-weight ops out of callback
  287
+            // Note errors that come in after the pipeline has finished.
  288
+            // These will *not* get a chance to be called in the loop of
  289
+            // responses below.
  290
+            if(pipeline.isFinished() && response.getValue() instanceof Exception
  291
+               && !(response.getValue() instanceof ObsoleteVersionException)) {
  292
+                if(response.getValue() instanceof InvalidMetadataException) {
  293
+                    pipelineData.reportException((InvalidMetadataException) response.getValue());
  294
+                    logger.warn("Received invalid metadata problem after a successful "
  295
+                                + pipeline.getOperation().getSimpleName() + " call on node "
  296
+                                + node.getId() + ", store '" + pipelineData.getStoreName() + "'");
  297
+                } else {
  298
+                    handleResponseError(response, pipeline, failureDetector);
  299
+                }
  300
+            }
  301
+        }
  302
+    }
314 303
 }
117  src/java/voldemort/store/routed/action/PerformParallelRequests.java
@@ -82,11 +82,13 @@ public PerformParallelRequests(PD pipelineData,
82 82
         this.insufficientZonesEvent = insufficientZonesEvent;
83 83
     }
84 84
 
  85
+    @Override
85 86
     public void execute(final Pipeline pipeline) {
86 87
         List<Node> nodes = pipelineData.getNodes();
87 88
         int attempts = Math.min(preferred, nodes.size());
  89
+
88 90
         final Map<Integer, Response<ByteArray, Object>> responses = new ConcurrentHashMap<Integer, Response<ByteArray, Object>>();
89  
-        final CountDownLatch latch = new CountDownLatch(attempts);
  91
+        final CountDownLatch attemptsLatch = new CountDownLatch(attempts);
90 92
 
91 93
         if(logger.isTraceEnabled())
92 94
             logger.trace("Attempting " + attempts + " " + pipeline.getOperation().getSimpleName()
@@ -96,54 +98,16 @@ public void execute(final Pipeline pipeline) {
96 98
             final Node node = nodes.get(i);
97 99
             pipelineData.incrementNodeIndex();
98 100
 
99  
-            final long startMs = logger.isDebugEnabled() ? System.currentTimeMillis() : -1;
100  
-
101  
-            NonblockingStoreCallback callback = new NonblockingStoreCallback() {
102  
-
103  
-                public void requestComplete(Object result, long requestTime) {
104  
-                    if(logger.isTraceEnabled())
105  
-                        logger.trace(pipeline.getOperation().getSimpleName()
106  
-                                     + " response received (" + requestTime + " ms.) from node "
107  
-                                     + node.getId());
108  
-
109  
-                    Response<ByteArray, Object> response = new Response<ByteArray, Object>(node,
110  
-                                                                                           key,
111  
-                                                                                           result,
112  
-                                                                                           requestTime);
113  
-                    if(logger.isDebugEnabled())
114  
-                        logger.debug("Finished " + pipeline.getOperation().getSimpleName()
115  
-                                     + " for key " + ByteUtils.toHexString(key.get())
116  
-                                     + " (keyRef: " + System.identityHashCode(key)
117  
-                                     + "); started at " + startMs + " took " + requestTime
118  
-                                     + " ms on node " + node.getId() + "(" + node.getHost() + ")");
119  
-
120  
-                    responses.put(node.getId(), response);
121  
-                    latch.countDown();
122  
-
123  
-                    // Note errors that come in after the pipeline has finished.
124  
-                    // These will *not* get a chance to be called in the loop of
125  
-                    // responses below.
126  
-                    if(pipeline.isFinished() && response.getValue() instanceof Exception) {
127  
-                        if(response.getValue() instanceof InvalidMetadataException) {
128  
-                            pipelineData.reportException((InvalidMetadataException) response.getValue());
129  
-                            logger.warn("Received invalid metadata problem after a successful "
130  
-                                        + pipeline.getOperation().getSimpleName()
131  
-                                        + " call on node " + node.getId() + ", store '"
132  
-                                        + pipelineData.getStoreName() + "'");
133  
-                        } else {
134  
-                            handleResponseError(response, pipeline, failureDetector);
135  
-                        }
136  
-                    }
137  
-                }
138  
-
139  
-            };
  101
+            NonblockingStoreCallback callback = new Callback(pipeline,
  102
+                                                             node,
  103
+                                                             responses,
  104
+                                                             attemptsLatch);
140 105
 
141 106
             if(logger.isTraceEnabled())
142 107
                 logger.trace("Submitting " + pipeline.getOperation().getSimpleName()
143 108
                              + " request on node " + node.getId());
144 109
 
145 110
             NonblockingStore store = nonblockingStores.get(node.getId());
146  
-
147 111
             if(pipeline.getOperation() == Operation.GET)
148 112
                 store.submitGetRequest(key, transforms, callback, timeoutMs);
149 113
             else if(pipeline.getOperation() == Operation.GET_VERSIONS)
@@ -154,8 +118,10 @@ else if(pipeline.getOperation() == Operation.GET_VERSIONS)
154 118
                                                 + pipeline.getOperation());
155 119
         }
156 120
 
  121
+        // TODO: Wait on attemptsLatch and processiong responses could be
  122
+        // refactored to use PerformParallelPut.waitForResponses.
157 123
         try {
158  
-            latch.await(timeoutMs, TimeUnit.MILLISECONDS);
  124
+            attemptsLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
159 125
         } catch(InterruptedException e) {
160 126
             if(logger.isEnabledFor(Level.WARN))
161 127
                 logger.warn(e, e);
@@ -167,10 +133,12 @@ else if(pipeline.getOperation() == Operation.GET_VERSIONS)
167 133
                     return;
168 134
             } else {
169 135
                 pipelineData.incrementSuccesses();
170  
-                Response<ByteArray, V> rCast = Utils.uncheckedCast(response);
171  
-                pipelineData.getResponses().add(rCast);
172 136
                 failureDetector.recordSuccess(response.getNode(), response.getRequestTime());
173 137
                 pipelineData.getZoneResponses().add(response.getNode().getZoneId());
  138
+
  139
+                // TODO: What about these two operations!? ARe they needed?
  140
+                Response<ByteArray, V> rCast = Utils.uncheckedCast(response);
  141
+                pipelineData.getResponses().add(rCast);
174 142
             }
175 143
         }
176 144
 
@@ -198,11 +166,8 @@ else if(pipeline.getOperation() == Operation.GET_VERSIONS)
198 166
 
199 167
                 pipeline.abort();
200 168
             }
201  
-
202 169
         } else {
203  
-
204 170
             if(pipelineData.getZonesRequired() != null) {
205  
-
206 171
                 int zonesSatisfied = pipelineData.getZoneResponses().size();
207 172
                 if(zonesSatisfied >= (pipelineData.getZonesRequired() + 1)) {
208 173
                     pipeline.addEvent(completeEvent);
@@ -224,13 +189,63 @@ else if(pipeline.getOperation() == Operation.GET_VERSIONS)
224 189
                                                                                           + zonesSatisfied
225 190
                                                                                           + " succeeded"));
226 191
                     }
227  
-
228 192
                 }
229  
-
230 193
             } else {
231 194
                 pipeline.addEvent(completeEvent);
232 195
             }
233 196
         }
234 197
     }
235 198
 
  199
+    public class Callback implements NonblockingStoreCallback {
  200
+
  201
+        final Pipeline pipeline;
  202
+        final Node node;
  203
+        final Map<Integer, Response<ByteArray, Object>> responses;
  204
+        final CountDownLatch attemptsLatch;
  205
+
  206
+        Callback(Pipeline pipeline,
  207
+                 Node node,
  208
+                 Map<Integer, Response<ByteArray, Object>> responses,
  209
+                 CountDownLatch attemptsLatch) {
  210
+            this.pipeline = pipeline;
  211
+            this.node = node;
  212
+            this.responses = responses;
  213
+            this.attemptsLatch = attemptsLatch;
  214
+        }
  215
+
  216
+        @Override
  217
+        public void requestComplete(Object result, long requestTime) {
  218
+            if(logger.isTraceEnabled())
  219
+                logger.trace(pipeline.getOperation().getSimpleName() + " response received ("
  220
+                             + requestTime + " ms.) from node " + node.getId());
  221
+
  222
+            Response<ByteArray, Object> response = new Response<ByteArray, Object>(node,
  223
+                                                                                   key,
  224
+                                                                                   result,
  225
+                                                                                   requestTime);
  226
+            if(logger.isDebugEnabled())
  227
+                logger.debug("Finished " + pipeline.getOperation().getSimpleName() + " for key "
  228
+                             + ByteUtils.toHexString(key.get()) + " (keyRef: "
  229
+                             + System.identityHashCode(key) + "); took " + requestTime
  230
+                             + " ms on node " + node.getId() + "(" + node.getHost() + ")");
  231
+
  232
+            responses.put(node.getId(), response);
  233
+            attemptsLatch.countDown();
  234
+
  235
+            // TODO: Must move heavy-weight ops out of callback
  236
+            // Note errors that come in after the pipeline has finished.
  237
+            // These will *not* get a chance to be called in the loop of
  238
+            // responses below.
  239
+            if(pipeline.isFinished() && response.getValue() instanceof Exception) {
  240
+                if(response.getValue() instanceof InvalidMetadataException) {
  241
+                    pipelineData.reportException((InvalidMetadataException) response.getValue());
  242
+                    logger.warn("Received invalid metadata problem after a successful "
  243
+                                + pipeline.getOperation().getSimpleName() + " call on node "