Skip to content

Commit

Permalink
Preliminary buffer-cache implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
justinsb committed Jan 8, 2014
1 parent c24a834 commit 93327e6
Show file tree
Hide file tree
Showing 18 changed files with 1,009 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@

import com.cloudata.btree.Btree;
import com.cloudata.btree.BtreeQuery;
import com.cloudata.btree.Database;
import com.cloudata.btree.Keyspace;
import com.cloudata.btree.MmapPageStore;
import com.cloudata.btree.PageStore;
import com.cloudata.btree.ReadOnlyTransaction;
import com.cloudata.btree.WriteTransaction;
import com.cloudata.keyvalue.operation.KeyValueOperation;
Expand All @@ -26,14 +25,14 @@ public class KeyValueStore {

public KeyValueStore(File dir, boolean uniqueKeys) throws IOException {
File data = new File(dir, "data");
PageStore pageStore = MmapPageStore.build(data, uniqueKeys);
Database db = Database.build(data);

log.warn("Building new btree @{}", dir);

this.btree = new Btree(pageStore, uniqueKeys);
this.btree = new Btree(db, uniqueKeys);
}

public void doAction(KeyValueOperation operation) {
public void doAction(KeyValueOperation operation) throws IOException {
if (operation.isReadOnly()) {
try (ReadOnlyTransaction txn = btree.beginReadOnly()) {
txn.doAction(btree, operation);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package com.cloudata.btree;

import io.netty.buffer.ByteBuf;

import java.io.Closeable;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;

public class BackingFile implements Closeable {
private static final Logger log = LoggerFactory.getLogger(BackingFile.class);

final File file;

final AsynchronousFileChannel fileChannel;

public BackingFile(File file) throws IOException {
this.file = file;

Path path = Paths.get(file.getAbsolutePath());

log.warn("We want to disable the read cache (probably?)");
this.fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.READ);
}

public ListenableFuture<ByteBuffer> read(ByteBuffer buffer, long position) {
final SettableFuture<ByteBuffer> future = SettableFuture.create();

read(buffer, buffer.position(), position, future);

return future;
}

/**
* Read into a ByteBuf. Adds reference for duration of read, and releases it unless future is set.
*/
public ListenableFuture<ByteBuf> read(final ByteBuf buf, final long position) {
final SettableFuture<ByteBuf> ret = SettableFuture.create();

ByteBuffer nioBuffer = buf.nioBuffer();

assert nioBuffer.remaining() == buf.writableBytes();

ListenableFuture<ByteBuffer> future = read(buf.nioBuffer(), position);

buf.retain();

Futures.addCallback(future, new FutureCallback<ByteBuffer>() {
@Override
public void onSuccess(ByteBuffer result) {
assert buf.readableBytes() == result.remaining();

// Caller releases
ret.set(buf);
}

@Override
public void onFailure(Throwable t) {
buf.release();

ret.setException(t);
}
});

return ret;
}

private void read(ByteBuffer dest, final int destStart, final long filePosition,
final SettableFuture<ByteBuffer> future) {
try {
fileChannel.read(dest, filePosition, dest, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer n, ByteBuffer dest) {
if (n == -1) {
future.setException(new EOFException());
return;
}

if (dest.remaining() > 0) {
long newPosition = filePosition + n;

read(dest, destStart, newPosition, future);
return;
}

dest.position(destStart);
future.set(dest);
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
future.setException(exc);
}
});
} catch (Exception e) {
future.setException(e);
}
}

public void sync() throws IOException {
boolean metadata = false;
fileChannel.force(metadata);
}

public ListenableFuture<Void> write(ByteBuffer src, final long position) {
final SettableFuture<Void> future = SettableFuture.create();
write(src, position, future);
return future;
}

public ListenableFuture<Void> write(final ByteBuf buf, final long position) {
ListenableFuture<Void> future = write(buf.nioBuffer(), position);

buf.retain();

Futures.addCallback(future, new FutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
buf.release();
}

@Override
public void onFailure(Throwable t) {
buf.release();
}
});

return future;
}

private void write(ByteBuffer src, final long position, final SettableFuture<Void> future) {
try {
fileChannel.write(src, position, src, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer wrote, ByteBuffer src) {
if (wrote < 0) {
future.setException(new IOException());
return;
}

if (src.remaining() > 0) {
long newPosition = position + wrote;

write(src, newPosition, future);
return;
}

future.set(null);
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
future.setException(exc);
}
});
} catch (Exception e) {
future.setException(e);
}
}

public long size() {
return file.length();
}

@Override
public void close() throws IOException {
fileChannel.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,8 @@ public boolean isUniqueKeys() {
return uniqueKeys;
}

public Database getDb() {
return db;
}

}
Loading

0 comments on commit 93327e6

Please sign in to comment.