Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

MicroBatchIBackingMap: avoid store timeouts on multiput and multiget #532

Merged
merged 1 commit into from

2 participants

@jasonjckn
Collaborator

No description provided.

@nathanmarz nathanmarz merged commit 78b9f99 into nathanmarz:0.9.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Mar 29, 2013
This page is out of date. Refresh to see the latest.
Showing with 68 additions and 0 deletions.
  1. +68 −0 src/jvm/storm/trident/state/map/MicroBatchIBackingMap.java
View
68 src/jvm/storm/trident/state/map/MicroBatchIBackingMap.java
@@ -0,0 +1,68 @@
+package storm.trident.state.map;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+public class MicroBatchIBackingMap<T> implements IBackingMap<T> {
+ IBackingMap<T> _delegate;
+ Options _options;
+
+
+ public static class Options implements Serializable {
+ public int maxMultiGetBatchSize = 0; // 0 means delegate batch size = trident batch size.
+ public int maxMultiPutBatchSize = 0;
+ }
+
+ public MicroBatchIBackingMap(final Options options, final IBackingMap<T> delegate) {
+ _options = options;
+ _delegate = delegate;
+ assert options.maxMultiPutBatchSize >= 0;
+ assert options.maxMultiGetBatchSize >= 0;
+ }
+
+ @Override
+ public void multiPut(final List<List<Object>> keys, final List<T> values) {
+ int thisBatchSize;
+ if(_options.maxMultiPutBatchSize == 0) { thisBatchSize = keys.size(); }
+ else { thisBatchSize = _options.maxMultiPutBatchSize; }
+
+ LinkedList<List<Object>> keysTodo = new LinkedList<List<Object>>(keys);
+ LinkedList<T> valuesTodo = new LinkedList<T>(values);
+
+ while(!keysTodo.isEmpty()) {
+ List<List<Object>> keysBatch = new ArrayList<List<Object>>(thisBatchSize);
+ List<T> valuesBatch = new ArrayList<T>(thisBatchSize);
+ for(int i=0; i<thisBatchSize && !keysTodo.isEmpty(); i++) {
+ keysBatch.add(keysTodo.removeFirst());
+ valuesBatch.add(valuesTodo.removeFirst());
+ }
+
+ _delegate.multiPut(keysBatch, valuesBatch);
+ }
+ }
+
+ @Override
+ public List<T> multiGet(final List<List<Object>> keys) {
+ int thisBatchSize;
+ if(_options.maxMultiGetBatchSize == 0) { thisBatchSize = keys.size(); }
+ else { thisBatchSize = _options.maxMultiGetBatchSize; }
+
+ LinkedList<List<Object>> keysTodo = new LinkedList<List<Object>>(keys);
+
+ List<T> ret = new ArrayList<T>(keys.size());
+
+ while(!keysTodo.isEmpty()) {
+ List<List<Object>> keysBatch = new ArrayList<List<Object>>(thisBatchSize);
+ for(int i=0; i<thisBatchSize && !keysTodo.isEmpty(); i++) {
+ keysBatch.add(keysTodo.removeFirst());
+ }
+
+ List<T> retSubset = _delegate.multiGet(keysBatch);
+ ret.addAll(retSubset);
+ }
+
+ return ret;
+ }
+}
Something went wrong with that request. Please try again.