Permalink
Browse files

reslove conflicts after merging with Roshan's new code on rebalance

  • Loading branch information...
1 parent 09299af commit 207b94a3f783fd60d2d5d5d5ecb85f89a6250344 Lei Gao committed May 19, 2011
Showing with 186 additions and 73 deletions.
  1. +186 −73 src/java/voldemort/server/protocol/admin/FetchPartitionFileStreamRequestHandler.java
@@ -4,13 +4,16 @@
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
-import java.util.Map.Entry;
import org.apache.log4j.Logger;
@@ -27,8 +30,9 @@
import voldemort.store.stats.StreamStats;
import voldemort.utils.EventThrottler;
import voldemort.utils.Pair;
+import voldemort.utils.RebalanceUtils;
-import com.google.common.collect.Maps;
+import com.google.common.collect.Lists;
public class FetchPartitionFileStreamRequestHandler implements StreamRequestHandler {
@@ -46,45 +50,67 @@
private final StreamStats.Handle handle;
- private final ReadOnlyStorageEngine storageEngine;
+ private final Iterator<Pair<Integer, Integer>> partitionIterator;
- private final HashMap<Integer, List<Integer>> replicaToPartitionList;
+ private FetchStatus fetchStatus;
+
+ private int currentChunkId;
+
+ private Pair<Integer, Integer> currentPair;
+
+ private File indexFile;
+
+ private File dataFile;
+
+ private ChunkedFileWriter chunkedFileWriter;
+
+ private final List<Pair<Integer, Integer>> replicaToPartitionList;
protected FetchPartitionFileStreamRequestHandler(VAdminProto.FetchPartitionFilesRequest request,
MetadataStore metadataStore,
VoldemortConfig voldemortConfig,
StoreRepository storeRepository,
StreamStats stats) {
this.request = request;
-
StoreDefinition storeDef = metadataStore.getStoreDef(request.getStore());
boolean isReadOnly = storeDef.getType().compareTo(ReadOnlyStorageConfiguration.TYPE_NAME) == 0;
if(!isReadOnly) {
throw new VoldemortException("Should be fetching partition files only for read-only stores");
}
+ ReadOnlyStorageEngine storageEngine = AdminServiceRequestHandler.getReadOnlyStorageEngine(metadataStore,
+ storeRepository,
+ request.getStore());
+
HashMap<Integer, List<Integer>> localReplicaToPartitionList = ProtoUtils.decodePartitionTuple(request.getReplicaToPartitionList());
// Filter the replica to partition mapping so as to include only till
// the number of replicas
- this.replicaToPartitionList = Maps.newHashMap();
+ this.replicaToPartitionList = Lists.newArrayList();
for(int replicaType = 0; replicaType < storeDef.getReplicationFactor(); replicaType++) {
if(localReplicaToPartitionList.containsKey(replicaType)) {
- this.replicaToPartitionList.put(replicaType,
- localReplicaToPartitionList.get(replicaType));
+ List<Integer> partitionList = localReplicaToPartitionList.get(replicaType);
+ for(Iterator<Integer> it = partitionList.iterator(); it.hasNext();) {
+ this.replicaToPartitionList.add(new Pair<Integer, Integer>(replicaType,
+ it.next()));
+ }
}
}
- this.storageEngine = AdminServiceRequestHandler.getReadOnlyStorageEngine(metadataStore,
- storeRepository,
- request.getStore());
this.blockSize = voldemortConfig.getAllProps()
.getLong("partition.buffer.size.bytes",
voldemortConfig.getAdminSocketBufferSize());
this.storeDir = new File(storageEngine.getCurrentDirPath());
this.throttler = new EventThrottler(voldemortConfig.getStreamMaxReadBytesPerSec());
this.stats = stats;
- this.handle = stats.makeHandle(StreamStats.Operation.FETCH_FILE, replicaToPartitionList);
+ this.handle = stats.makeHandle(StreamStats.Operation.FETCH_FILE,
+ RebalanceUtils.flattenPartitionTuples(new HashSet<Pair<Integer, Integer>>(replicaToPartitionList)));
+ this.partitionIterator = Collections.unmodifiableList(replicaToPartitionList).iterator();
+ this.fetchStatus = FetchStatus.next_partition;
+ this.currentChunkId = 0;
+ this.indexFile = null;
+ this.dataFile = null;
+ this.chunkedFileWriter = null;
}
public StreamRequestDirection getDirection() {
@@ -104,81 +130,168 @@ public final void handleError(DataOutputStream outputStream, VoldemortException
public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
DataOutputStream outputStream)
throws IOException {
- HashMap<Object, Integer> bucketToNumChunks = storageEngine.getChunkedFileSet()
- .getChunkIdToNumChunks();
+ StreamRequestHandlerState handlerState = StreamRequestHandlerState.WRITING;
+
+ switch(fetchStatus) {
+ case next_partition:
+ handlerState = handleNextPartition();
+ break;
+ case send_data_file:
+ handleSendDataFile(outputStream);
+ break;
+ case send_index_file:
+ handleSendIndexFile();
+ break;
+ default:
+ throw new VoldemortException("Invalid fetch status: " + fetchStatus);
+ }
- for(Entry<Integer, List<Integer>> entry: replicaToPartitionList.entrySet()) {
+ return handlerState;
+ }
- int replicaType = entry.getKey();
+ private void handleSendIndexFile() throws IOException {
+ if(0 == chunkedFileWriter.streamFile()) {
+ // we are done with the index file, move to next chunk
+ logger.info("Completed streaming " + indexFile.getAbsolutePath());
+ this.chunkedFileWriter.close();
+ currentChunkId++;
+ dataFile = indexFile = null;
+ handle.incrementEntriesScanned();
+ fetchStatus = FetchStatus.send_data_file;
+ } else {
+ // index file not done yet, keep sending
+ fetchStatus = FetchStatus.send_index_file;
+ }
+ }
- // Go over every partition for this replica type
- for(int partitionId: entry.getValue()) {
+ private void handleSendDataFile(DataOutputStream outputStream) throws IOException {
+ if(null == dataFile && null == indexFile) {
+ // first time enter here, create files based on partition and
+ // chunk ids
+ String fileName = currentPair.getSecond().toString() + "_"
+ + currentPair.getFirst().toString() + "_"
+ + Integer.toString(currentChunkId);
+ dataFile = new File(this.storeDir, fileName + ".data");
+ indexFile = new File(this.storeDir, fileName + ".index");
+ validateFiles();
+ if(isPartitionFinished()) {
+ // finished with this partition, move to next one
+ fetchStatus = FetchStatus.next_partition;
+ return;
+ }
- // Check if this bucket exists
- if(!bucketToNumChunks.containsKey(Pair.create(partitionId, replicaType))) {
- throw new VoldemortException("Bucket [ partition = " + partitionId
- + ", replica = " + replicaType
- + " ] does not exist for store "
- + request.getStore());
- }
+ // create a new writer for data file
+ this.chunkedFileWriter = new ChunkedFileWriter(dataFile, outputStream);
+ logger.info("Streaming " + dataFile.getAbsolutePath());
+ this.chunkedFileWriter.writeHeader();
+ }
- // Get number of chunks for this bucket
- int numChunks = bucketToNumChunks.get(Pair.create(partitionId, replicaType));
-
- for(int chunkId = 0; chunkId < numChunks; chunkId++) {
- String fileName = Integer.toString(partitionId) + "_"
- + Integer.toString(replicaType) + "_"
- + Integer.toString(chunkId);
- File index = new File(this.storeDir, fileName + ".index");
- File data = new File(this.storeDir, fileName + ".data");
-
- // Both files in chunk exist, start streaming...
- logger.info("Streaming " + data.getAbsolutePath());
- streamFile(data, outputStream);
- logger.info("Completed streaming " + data.getAbsolutePath());
- logger.info("Streaming " + index.getAbsolutePath());
- streamFile(index, outputStream);
- logger.info("Completed streaming " + index.getAbsolutePath());
- handle.incrementEntriesScanned();
- }
+ if(0 == chunkedFileWriter.streamFile()) {
+ // we are done with the data file, move to index file
+ logger.info("Completed streaming " + dataFile.getAbsolutePath());
+ this.chunkedFileWriter.close();
+ this.chunkedFileWriter = new ChunkedFileWriter(indexFile, outputStream);
+ logger.info("Streaming " + indexFile.getAbsolutePath());
+ this.chunkedFileWriter.writeHeader();
+ fetchStatus = FetchStatus.send_index_file;
+ } else {
+ // data file not done yet, keep sending
+ fetchStatus = FetchStatus.send_data_file;
+ }
+ }
- }
+ private StreamRequestHandlerState handleNextPartition() {
+
+ StreamRequestHandlerState handlerState = StreamRequestHandlerState.WRITING;
+
+ if(partitionIterator.hasNext()) {
+ // start a new partition
+ currentPair = partitionIterator.next();
+ currentChunkId = 0;
+ dataFile = indexFile = null;
+ fetchStatus = FetchStatus.send_data_file;
+ } else {
+ // we are done since we have gone through the entire
+ // partition list
+ logger.info("Finished streaming files for partitions: " + replicaToPartitionList);
+ stats.closeHandle(handle);
+ handlerState = StreamRequestHandlerState.COMPLETE;
}
- stats.closeHandle(handle);
- return StreamRequestHandlerState.COMPLETE;
+
+ return handlerState;
}
- void streamFile(File fileToStream, DataOutputStream stream) throws IOException {
- FileChannel dataChannel = new FileInputStream(fileToStream).getChannel();
- try {
+ private void validateFiles() {
+ if(!indexFile.exists() && !dataFile.exists() && 0 == currentChunkId) {
+ throw new VoldemortException("Could not find any data for "
+ + currentPair.getSecond().toString() + "_"
+ + currentPair.getFirst().toString() + "_" + currentChunkId);
+ }
+
+ if(indexFile.exists() ^ dataFile.exists()) {
+ throw new VoldemortException("One of the following does not exist: "
+ + indexFile.toString() + " and " + dataFile.toString()
+ + ".");
+ }
+ }
+
+ private boolean isPartitionFinished() {
+ boolean finished = false;
+ if(!indexFile.exists() && !dataFile.exists() && currentChunkId > 0) {
+ finished = true;
+ }
+ return finished;
+ }
+
+ enum FetchStatus {
+ next_partition,
+ send_data_file,
+ send_index_file
+ }
+
+ class ChunkedFileWriter {
+
+ private final File fileToWrite;
+ private final DataOutputStream outStream;
+ private final FileChannel dataChannel;
+ private final WritableByteChannel outChannel;
+ private long currentPos;
+
+ ChunkedFileWriter(File fileToWrite, DataOutputStream stream) throws FileNotFoundException {
+ this.fileToWrite = fileToWrite;
+ this.outStream = stream;
+ this.dataChannel = new FileInputStream(fileToWrite).getChannel();
+ this.outChannel = Channels.newChannel(outStream);
+ this.currentPos = 0;
+ }
+
+ public void close() throws IOException {
+ dataChannel.close();
+ outChannel.close();
+ }
+
+ void writeHeader() throws IOException {
VAdminProto.FileEntry response = VAdminProto.FileEntry.newBuilder()
- .setFileName(fileToStream.getName())
+ .setFileName(fileToWrite.getName())
.setFileSizeBytes(dataChannel.size())
.build();
- // Write header
- ProtoUtils.writeMessage(stream, response);
+
+ ProtoUtils.writeMessage(outStream, response);
throttler.maybeThrottle(response.getSerializedSize());
+ }
- // Write rest of file
- WritableByteChannel channelOut = Channels.newChannel(stream);
-
- // Send chunks to help with throttling
- boolean completedFile = false;
- long chunkSize = 0;
- for(long chunkStart = 0; chunkStart < dataChannel.size() && !completedFile; chunkStart += blockSize) {
- if(dataChannel.size() - chunkStart < blockSize) {
- chunkSize = dataChannel.size() - chunkStart;
- completedFile = true;
- } else {
- chunkSize = blockSize;
- }
- dataChannel.transferTo(chunkStart, chunkSize, channelOut);
- throttler.maybeThrottle((int) chunkSize);
- }
- } finally {
- if(dataChannel != null) {
- dataChannel.close();
+ // this function returns the number of bytes left, 0 means done
+ long streamFile() throws IOException {
+ long bytesRemaining = dataChannel.size() - currentPos;
+ if(0 < bytesRemaining) {
+ long bytesToWrite = Math.min(bytesRemaining, blockSize);
+ long bytesWritten = dataChannel.transferTo(currentPos, bytesToWrite, outChannel);
+ currentPos += bytesWritten;
+ logger.info(bytesWritten + " of bytes sent");
+ throttler.maybeThrottle((int) bytesWritten);
}
+ bytesRemaining = dataChannel.size() - currentPos;
+ return bytesRemaining;
}
}
-}
+}

0 comments on commit 207b94a

Please sign in to comment.