/
FetchStreamRequestHandler.java
128 lines (100 loc) · 5.26 KB
/
FetchStreamRequestHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package voldemort.server.protocol.admin;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.StringReader;
import java.util.HashMap;
import java.util.List;
import org.apache.log4j.Logger;
import voldemort.VoldemortException;
import voldemort.client.protocol.VoldemortFilter;
import voldemort.client.protocol.admin.filter.DefaultVoldemortFilter;
import voldemort.client.protocol.pb.ProtoUtils;
import voldemort.client.protocol.pb.VAdminProto;
import voldemort.cluster.Cluster;
import voldemort.server.StoreRepository;
import voldemort.server.VoldemortConfig;
import voldemort.server.protocol.StreamRequestHandler;
import voldemort.store.ErrorCodeMapper;
import voldemort.store.StorageEngine;
import voldemort.store.StoreDefinition;
import voldemort.store.metadata.MetadataStore;
import voldemort.store.stats.StreamStats;
import voldemort.store.stats.StreamStats.Handle;
import voldemort.utils.ByteArray;
import voldemort.utils.EventThrottler;
import voldemort.utils.NetworkClassLoader;
import voldemort.xml.ClusterMapper;
public abstract class FetchStreamRequestHandler implements StreamRequestHandler {
protected final VAdminProto.FetchPartitionEntriesRequest request;
protected final ErrorCodeMapper errorCodeMapper;
protected final Cluster initialCluster;
protected final EventThrottler throttler;
protected final HashMap<Integer, List<Integer>> replicaToPartitionList;
protected final VoldemortFilter filter;
protected final StorageEngine<ByteArray, byte[], byte[]> storageEngine;
protected long counter;
protected long skipRecords;
protected int fetched;
protected final long startTime;
protected final Handle handle;
protected final StreamStats stats;
protected final Logger logger = Logger.getLogger(getClass());
protected int nodeId;
protected StoreDefinition storeDef;
protected FetchStreamRequestHandler(VAdminProto.FetchPartitionEntriesRequest request,
MetadataStore metadataStore,
ErrorCodeMapper errorCodeMapper,
VoldemortConfig voldemortConfig,
StoreRepository storeRepository,
NetworkClassLoader networkClassLoader,
StreamStats stats,
StreamStats.Operation operation) {
this.nodeId = metadataStore.getNodeId();
this.request = request;
this.errorCodeMapper = errorCodeMapper;
this.replicaToPartitionList = ProtoUtils.decodePartitionTuple(request.getReplicaToPartitionList());
this.stats = stats;
this.handle = stats.makeHandle(operation, replicaToPartitionList);
this.storageEngine = AdminServiceRequestHandler.getStorageEngine(storeRepository,
request.getStore());
this.storeDef = metadataStore.getStoreDef(request.getStore());
if(request.hasInitialCluster()) {
this.initialCluster = new ClusterMapper().readCluster(new StringReader(request.getInitialCluster()));
} else {
this.initialCluster = metadataStore.getCluster();
}
this.throttler = new EventThrottler(voldemortConfig.getStreamMaxReadBytesPerSec());
if(request.hasFilter()) {
this.filter = AdminServiceRequestHandler.getFilterFromRequest(request.getFilter(),
voldemortConfig,
networkClassLoader);
} else {
this.filter = new DefaultVoldemortFilter();
}
this.startTime = System.currentTimeMillis();
this.counter = 0;
this.skipRecords = 1;
if(request.hasSkipRecords() && request.getSkipRecords() >= 0) {
this.skipRecords = request.getSkipRecords() + 1;
}
}
public final StreamRequestDirection getDirection() {
return StreamRequestDirection.WRITING;
}
public void close(DataOutputStream outputStream) throws IOException {
logger.info("Successfully scanned " + counter + " tuples, fetched " + fetched
+ " tuples for store '" + storageEngine.getName() + "' in "
+ ((System.currentTimeMillis() - startTime) / 1000) + " s");
ProtoUtils.writeEndOfStream(outputStream);
}
public final void handleError(DataOutputStream outputStream, VoldemortException e)
throws IOException {
VAdminProto.FetchPartitionEntriesResponse response = VAdminProto.FetchPartitionEntriesResponse.newBuilder()
.setError(ProtoUtils.encodeError(errorCodeMapper,
e))
.build();
ProtoUtils.writeMessage(outputStream, response);
logger.error("handleFetchPartitionEntries failed for request(" + request.toString() + ")",
e);
}
}