Skip to content

Commit

Permalink
BlockReceiver with native buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
toddlipcon committed Jul 25, 2012
1 parent 55ca136 commit cba385e
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.ReadableByteChannel;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -157,14 +158,19 @@ public static DatanodeInfo[] fromProtos(

public static InputStream vintPrefixed(final InputStream input)
throws IOException {
int size = fullyReadVint(input);
assert size >= 0;

return new ExactSizeInputStream(input, size);
}

public static int fullyReadVint(InputStream input) throws IOException {
final int firstByte = input.read();
if (firstByte == -1) {
throw new EOFException("Premature EOF: no length prefix available");
}

int size = CodedInputStream.readRawVarint32(firstByte, input);
assert size >= 0;

return new ExactSizeInputStream(input, size);
return CodedInputStream.readRawVarint32(firstByte, input);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,38 +22,47 @@
import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.fromProto;

import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.SocketInputWrapper;

/** Receiver */
@InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class Receiver implements DataTransferProtocol {
protected final DataInputStream in;
protected final SocketInputWrapper in;
private DataInputStream bufferedDataInput;
private static final int VERY_SMALL_BUFFER = 6;

/** Create a receiver for DataTransferProtocol with a socket. */
protected Receiver(final DataInputStream in) {
protected Receiver(final SocketInputWrapper in) {
this.in = in;

this.bufferedDataInput = new DataInputStream(
new BufferedInputStream(in, VERY_SMALL_BUFFER));
}

/** Read an Op. It also checks protocol version. */
protected final Op readOp() throws IOException {
final short version = in.readShort();
final short version = bufferedDataInput.readShort();
if (version != DataTransferProtocol.DATA_TRANSFER_VERSION) {
throw new IOException( "Version Mismatch (Expected: " +
DataTransferProtocol.DATA_TRANSFER_VERSION +
", Received: " + version + " )");
}
return Op.read(in);
return Op.read(bufferedDataInput);
}

/** Process op by the corresponding method. */
Expand All @@ -63,19 +72,19 @@ protected final void processOp(Op op) throws IOException {
opReadBlock();
break;
case WRITE_BLOCK:
opWriteBlock(in);
opWriteBlock();
break;
case REPLACE_BLOCK:
opReplaceBlock(in);
opReplaceBlock();
break;
case COPY_BLOCK:
opCopyBlock(in);
opCopyBlock();
break;
case BLOCK_CHECKSUM:
opBlockChecksum(in);
opBlockChecksum();
break;
case TRANSFER_BLOCK:
opTransferBlock(in);
opTransferBlock();
break;
default:
throw new IOException("Unknown op " + op + " in data stream");
Expand All @@ -84,17 +93,32 @@ protected final void processOp(Op op) throws IOException {

/** Receive OP_READ_BLOCK */
private void opReadBlock() throws IOException {
OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));
OpReadBlockProto proto = OpReadBlockProto.parseFrom(
readVintPrefixedData());
readBlock(fromProto(proto.getHeader().getBaseHeader().getBlock()),
fromProto(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
proto.getOffset(),
proto.getLen());
}

private byte[] readVintPrefixedData() throws IOException {
int vint = HdfsProtoUtil.fullyReadVint(bufferedDataInput);
byte[] data = new byte[vint];

int avail = bufferedDataInput.available();
IOUtils.readFully(bufferedDataInput, data, 0, avail);
assert bufferedDataInput.available() == 0;

int remaining = vint - avail;
IOUtils.readFully(in, data, avail, remaining);

return data;
}

/** Receive OP_WRITE_BLOCK */
private void opWriteBlock(DataInputStream in) throws IOException {
final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in));
private void opWriteBlock() throws IOException {
final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(readVintPrefixedData());
writeBlock(fromProto(proto.getHeader().getBaseHeader().getBlock()),
fromProto(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
Expand All @@ -108,34 +132,34 @@ private void opWriteBlock(DataInputStream in) throws IOException {
}

/** Receive {@link Op#TRANSFER_BLOCK} */
private void opTransferBlock(DataInputStream in) throws IOException {
private void opTransferBlock() throws IOException {
final OpTransferBlockProto proto =
OpTransferBlockProto.parseFrom(vintPrefixed(in));
OpTransferBlockProto.parseFrom(readVintPrefixedData());
transferBlock(fromProto(proto.getHeader().getBaseHeader().getBlock()),
fromProto(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
fromProtos(proto.getTargetsList()));
}

/** Receive OP_REPLACE_BLOCK */
private void opReplaceBlock(DataInputStream in) throws IOException {
OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in));
private void opReplaceBlock() throws IOException {
OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(readVintPrefixedData());
replaceBlock(fromProto(proto.getHeader().getBlock()),
fromProto(proto.getHeader().getToken()),
proto.getDelHint(),
fromProto(proto.getSource()));
}

/** Receive OP_COPY_BLOCK */
private void opCopyBlock(DataInputStream in) throws IOException {
OpCopyBlockProto proto = OpCopyBlockProto.parseFrom(vintPrefixed(in));
private void opCopyBlock() throws IOException {
OpCopyBlockProto proto = OpCopyBlockProto.parseFrom(readVintPrefixedData());
copyBlock(fromProto(proto.getHeader().getBlock()),
fromProto(proto.getHeader().getToken()));
}

/** Receive OP_BLOCK_CHECKSUM */
private void opBlockChecksum(DataInputStream in) throws IOException {
OpBlockChecksumProto proto = OpBlockChecksumProto.parseFrom(vintPrefixed(in));
private void opBlockChecksum() throws IOException {
OpBlockChecksumProto proto = OpBlockChecksumProto.parseFrom(readVintPrefixedData());

blockChecksum(fromProto(proto.getHeader().getBlock()),
fromProto(proto.getHeader().getToken()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.zip.Checksum;
Expand All @@ -49,6 +50,7 @@
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.SocketInputWrapper;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;

Expand All @@ -63,7 +65,7 @@ class BlockReceiver implements Closeable {

private static final long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024;

private DataInputStream in = null; // from where data are read
private SocketInputWrapper in = null; // from where data are read
private DataChecksum clientChecksum; // checksum used by client
private DataChecksum diskChecksum; // checksum we write to disk

Expand All @@ -81,7 +83,7 @@ class BlockReceiver implements Closeable {
private int checksumSize;

private PacketReceiver packetReceiver =
new PacketReceiver(false);
new PacketReceiver(true);

protected final String inAddr;
protected final String myAddr;
Expand Down Expand Up @@ -114,8 +116,9 @@ class BlockReceiver implements Closeable {
private final boolean isTransfer;

private boolean syncOnClose;
private FileChannel outChannel;

BlockReceiver(final ExtendedBlock block, final DataInputStream in,
BlockReceiver(final ExtendedBlock block, final SocketInputWrapper in,
final String inAddr, final String myAddr,
final BlockConstructionStage stage,
final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
Expand Down Expand Up @@ -210,6 +213,7 @@ class BlockReceiver implements Closeable {
this.out = streams.getDataOut();
if (out instanceof FileOutputStream) {
this.outFd = ((FileOutputStream)out).getFD();
this.outChannel = ((FileOutputStream)out).getChannel();
} else {
LOG.warn("Could not get file descriptor for outputstream of class " +
out.getClass());
Expand Down Expand Up @@ -412,7 +416,7 @@ private void translateChunks(ByteBuffer dataBuf, ByteBuffer checksumBuf) {
*/
private int receivePacket() throws IOException {
// read the next packet
packetReceiver.receiveNextPacket(in);
packetReceiver.receiveNextPacket(in.getReadableByteChannel());

PacketHeader header = packetReceiver.getHeader();
if (LOG.isDebugEnabled()){
Expand Down Expand Up @@ -504,7 +508,7 @@ private int receivePacket() throws IOException {

// by this point, the data in the buffer uses the disk checksum

byte[] lastChunkChecksum;
byte[] lastChunkChecksum = new byte[checksumSize];

try {
long onDiskLen = replicaInfo.getBytesOnDisk();
Expand All @@ -527,13 +531,22 @@ private int receivePacket() throws IOException {
computePartialChunkCrc(onDiskLen, offsetInChecksum, bytesPerChecksum);
}

int startByteToDisk = (int)(onDiskLen-firstByteInBlock)
+ dataBuf.arrayOffset() + dataBuf.position();


int startByteToDisk = (int)(onDiskLen-firstByteInBlock);
int numBytesToDisk = (int)(offsetInBlock-onDiskLen);

assert dataBuf.position() == 0;
dataBuf.position(startByteToDisk);
// Write data to disk.
out.write(dataBuf.array(), startByteToDisk, numBytesToDisk);
if (outChannel != null) {
IOUtils.writeFully(outChannel, dataBuf);
} else {
// slow path for simulated dataset
byte[] buf = new byte[dataBuf.remaining()];
dataBuf.get(buf);

out.write(buf, 0, numBytesToDisk);
}

// If this is a partial chunk, then verify that this is the only
// chunk in the packet. Calculate new crc for this chunk.
Expand All @@ -545,7 +558,10 @@ private int receivePacket() throws IOException {
" len = " + len +
" bytesPerChecksum " + bytesPerChecksum);
}
partialCrc.update(dataBuf.array(), startByteToDisk, numBytesToDisk);
byte[] partialData = new byte[numBytesToDisk];
dataBuf.position(startByteToDisk);
dataBuf.get(partialData);
partialCrc.update(partialData, 0, numBytesToDisk);
byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
lastChunkChecksum = Arrays.copyOfRange(
buf, buf.length - checksumSize, buf.length
Expand All @@ -556,13 +572,13 @@ private int receivePacket() throws IOException {
}
partialCrc = null;
} else {
lastChunkChecksum = Arrays.copyOfRange(
checksumBuf.array(),
checksumBuf.arrayOffset() + checksumBuf.position() + checksumLen - checksumSize,
checksumBuf.arrayOffset() + checksumBuf.position() + checksumLen);
checksumOut.write(checksumBuf.array(),
checksumBuf.arrayOffset() + checksumBuf.position(),
checksumLen);
checksumBuf.position(checksumBuf.capacity() - checksumSize);
checksumBuf.get(lastChunkChecksum);
checksumBuf.position(0);
// TODO: optimize following
byte []tmp = new byte[checksumBuf.remaining()];
checksumBuf.get(tmp);
checksumOut.write(tmp, 0, checksumLen);
}
/// flush entire packet, sync unless close() will sync
flushOrSync(syncBlock && !lastPacketInBlock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ private DataXceiver(Socket s,
SocketInputWrapper socketInput,
DataNode datanode,
DataXceiverServer dataXceiverServer) throws IOException {
super(new DataInputStream(new BufferedInputStream(
socketInput, HdfsConstants.SMALL_BUFFER_SIZE)));
super(socketInput);

this.s = s;
this.socketInputWrapper = socketInput;
Expand Down Expand Up @@ -689,7 +688,7 @@ public void replaceBlock(final ExtendedBlock block,
Status opStatus = SUCCESS;
String errMsg = null;
BlockReceiver blockReceiver = null;
DataInputStream proxyReply = null;
SocketInputWrapper proxyReply = null;

try {
// get the output stream to the proxy
Expand All @@ -708,10 +707,11 @@ public void replaceBlock(final ExtendedBlock block,
new Sender(proxyOut).copyBlock(block, blockToken);

// receive the response from the proxy
proxyReply = new DataInputStream(new BufferedInputStream(
NetUtils.getInputStream(proxySock), HdfsConstants.IO_FILE_BUFFER_SIZE));
BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom(
HdfsProtoUtil.vintPrefixed(proxyReply));
proxyReply = NetUtils.getInputStream(proxySock);
int replySize = HdfsProtoUtil.fullyReadVint(proxyReply);
byte[] reply = new byte[replySize];
IOUtils.readFully(proxyReply, reply, 0, replySize);
BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom(reply);

if (copyResponse.getStatus() != SUCCESS) {
if (copyResponse.getStatus() == ERROR_ACCESS_TOKEN) {
Expand Down Expand Up @@ -752,7 +752,7 @@ public void replaceBlock(final ExtendedBlock block,
// receive the last byte that indicates the proxy released its thread resource
if (opStatus == SUCCESS) {
try {
proxyReply.readChar();
proxyReply.read();
} catch (IOException ignored) {
}
}
Expand Down

0 comments on commit cba385e

Please sign in to comment.