Skip to content

Commit

Permalink
Make the read call control the read size in KfsInputChannel, if read …
Browse files Browse the repository at this point in the history
…ahead logic is turned off
  • Loading branch information
mckurt committed May 25, 2016
1 parent 14c8695 commit c208e47
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ final public class KfsInputChannel implements ReadableByteChannel, Positionable
private ByteBuffer readBuffer;
private int kfsFd = -1;
private KfsAccess kfsAccess;
private boolean isReadAheadOff = false;

private final static native
int read(long cPtr, int fd, ByteBuffer buf, int begin, int end);
Expand Down Expand Up @@ -76,7 +77,7 @@ public synchronized int read(ByteBuffer dst) throws IOException
// Fill input buffer if it's empty
if (!readBuffer.hasRemaining()) {
readBuffer.clear();
readDirect(readBuffer);
readDirect(readBuffer, dst.remaining());
readBuffer.flip();

// If we failed to get anything, call that EOF
Expand Down Expand Up @@ -114,18 +115,20 @@ public synchronized int read(ByteBuffer dst) throws IOException
ByteBuffer readNext() throws IOException
{
readBuffer.clear();
readDirect(readBuffer);
readDirect(readBuffer, 0);
readBuffer.flip();
return readBuffer;
}

private void readDirect(ByteBuffer buf) throws IOException
private void readDirect(ByteBuffer buf, int remRequestedBytes) throws IOException
{
if (!buf.isDirect()) {
throw new IllegalArgumentException("need direct buffer");
}
final int pos = buf.position();
final int sz = read(kfsAccess.getCPtr(), kfsFd, buf, pos, buf.limit());
final int end = (isReadAheadOff && remRequestedBytes > 0) ?
Math.min(buf.limit(), pos + remRequestedBytes) : buf.limit();
final int sz = read(kfsAccess.getCPtr(), kfsFd, buf, pos, end);
kfsAccess.kfs_retToIOException(sz);
buf.position(pos + sz);
}
Expand Down Expand Up @@ -197,6 +200,7 @@ protected void finalize() throws Throwable
public void setReadAheadSize(long readAheadSize) {
if(readAheadSize >= 0) {
kfsAccess.kfs_setReadAheadSize(kfsFd, readAheadSize);
isReadAheadOff = readAheadSize <= 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,9 @@ public static void main(String args[]) {
// test new create methods
testCreateAPI(kfsAccess, basedir);

// test read when read-ahead is disabled
testDisableReadAhead(kfsAccess, basedir);

final Iterator<Map.Entry<String, String> > it =
kfsAccess.kfs_getStats().entrySet().iterator();
System.out.println("Clients stats:");
Expand Down Expand Up @@ -503,4 +506,85 @@ private static void verifyFileAttr(KfsAccess kfsAccess,
System.exit(1);
}
}

private static void testDisableReadAhead(KfsAccess kfsAccess, String baseDir)
throws IOException {
String filePath = baseDir + "/sample_file.1";
String createParams = "S";
KfsOutputChannel outputChannel = kfsAccess.kfs_create_ex(filePath,
true, createParams);
int numBytes = 1048576;
char[] dataBuf = new char[numBytes];
generateData(dataBuf, numBytes);
String s = new String(dataBuf);
byte[] buf = s.getBytes();
ByteBuffer b = ByteBuffer.wrap(buf, 0, buf.length);
int res = outputChannel.write(b);
if (res != buf.length) {
System.out.println("Was able to write only: " + res);
System.exit(1);
}
if (outputChannel == null) {
System.out.println("Unable to call create");
System.exit(1);
}
outputChannel.sync();
outputChannel.close();

KfsInputChannel inputChannel = kfsAccess.kfs_open(filePath);
if (inputChannel == null) {
System.out.println("open on " + filePath + "failed!");
System.exit(1);
}

inputChannel.setReadAheadSize(0);

byte[] dstBuf = new byte[128];
res = inputChannel.read(ByteBuffer.wrap(dstBuf, 0, 128));
s = new String(dstBuf);
for (int i = 0; i < 128; i++) {
if (dataBuf[i] != s.charAt(i)) {
System.out.println("Data mismatch at char: " + i);
System.exit(1);
}
}

inputChannel.seek(512);
long pos = inputChannel.tell();
if (pos != 512) {
System.out.println("Couldn't seek to byte 512. Pos: " + pos);
System.exit(1);
}

res = inputChannel.read(ByteBuffer.wrap(dstBuf, 0, 128));
s = new String(dstBuf);
for (int i = 0; i < 128; i++) {
if (dataBuf[512+i] != s.charAt(i)) {
System.out.println("Data mismatch at char " + i +
" after seeking to byte 512");
System.exit(1);
}
}

// seek to the beginning, enable read-ahead and make a small read
inputChannel.seek(0);
pos = inputChannel.tell();
if (pos != 0) {
System.out.println("Couldn't seek to the beginning. Pos: " + pos);
System.exit(1);
}

inputChannel.setReadAheadSize(1048576);

res = inputChannel.read(ByteBuffer.wrap(dstBuf, 0, 128));
s = new String(dstBuf);
for (int i = 0; i < 128; i++) {
if (dataBuf[i] != s.charAt(i)) {
System.out.println("Data mismatch at char " + i +
" after seeking to the beginning");
System.exit(1);
}
}
inputChannel.close();
}
}

0 comments on commit c208e47

Please sign in to comment.