Permalink
Browse files

Implemented bitcask.fold (sans bloomfilter)

  • Loading branch information...
1 parent e8ba33e commit feb5165bb193764d10f20e5fcdbdbcac66650bc3 @krestenkrab committed Jan 21, 2011
View
2 NOTICE
@@ -18,5 +18,5 @@ Bitcask Java
This product includes software covered by the following license
https://quickcheck.dev.java.net/
- Available under APACHE Livence, Version 2.0
+ Available under APACHE License, Version 2.0
@@ -56,11 +56,11 @@ public static BitCask open(File dirname, BitCaskOptions opts)
}
result.dirname = dirname;
-
+
BitCaskKeyDir keydir;
keydir = BitCaskKeyDir.keydir_new(dirname, opts.open_timeout_secs);
result.keydir = keydir;
- if (!keydir.is_ready()) {
+ if (!keydir.is_ready()) {
File[] files = result.readable_files();
BitCask.scan_key_files(files, keydir);
keydir.mark_ready();
@@ -72,19 +72,18 @@ public static BitCask open(File dirname, BitCaskOptions opts)
return result;
}
-
-
+
public void close() throws IOException {
-
+
// release?
keydir = null;
-
+
for (BitCaskFile read_file : read_files.values()) {
read_file.close();
}
-
+
read_files.clear();
-
+
if (write_file == null || write_file == BitCaskFile.FRESH_FILE) {
// ok
} else {
@@ -253,5 +252,71 @@ public String getString(String key) throws IOException {
return val.toStringUtf8();
}
+ public <T> T fold(final EntryIter<T> entryIter, T acc) throws IOException {
+
+ final int expiry_time = opts.expiry_time();
+ EntryIter<T> iter = new EntryIter<T>() {
+
+ @Override
+ public T each(ByteString key, ByteString value, int tstamp,
+ long entryPos, int entrySize, T acc) {
+
+ if (tstamp < expiry_time) {
+ return acc;
+ }
+
+ BitCaskEntry ent = keydir.get(key);
+ if (entryPos != ent.offset) {
+ return acc;
+ }
+
+ if (value.equals(TOMBSTONE)) {
+ return acc;
+ }
+
+ return entryIter.each(key, value, tstamp, entryPos, entrySize,
+ acc);
+
+ }
+ };
+
+ BitCaskFile[] files = open_fold_files();
+ if (files != null) {
+ for (int i = 0; i < files.length; i++) {
+ acc = files[i].fold(iter, acc);
+ }
+ }
+
+ return acc;
+ }
+
+ private BitCaskFile[] open_fold_files() {
+ // TODO: retries ?
+ File[] files = list_data_files(null, null);
+ return open_files(files);
+ }
+
+ BitCaskFile[] open_files(File[] files) {
+ BitCaskFile[] out = new BitCaskFile[files.length];
+
+ for (int i = 0; i < out.length; i++) {
+
+ try {
+ out[i] = BitCaskFile.open(files[i]);
+ } catch (IOException e) {
+
+ for (int j = 0; j < i; j++) {
+ try {
+ out[j].close();
+ } catch (IOException e1) {
+ }
+ }
+
+ return null;
+ }
+ }
+
+ return out;
+ }
}
@@ -59,7 +59,7 @@ private BitCaskFile(int file_id, File filename, FileChannel wch, FileChannel wch
this.wch = wch;
this.rch = rch;
this.wch_hint = wch_hint;
- this.write_offset = new AtomicLong(wch.position());
+ this.write_offset = new AtomicLong(rch.size());
}
public BitCaskFile() {
@@ -190,15 +190,9 @@ public static BitCaskFile open(File filename) throws IOException {
int tstamp = BitCaskFile.tstamp(filename);
- FileChannel wch = new FileOutputStream(filename, true).getChannel();
- FileChannel wch_hint = new FileOutputStream(hint_filename(filename),
- true).getChannel();
-
-
-
FileChannel rch = new RandomAccessFile(filename, "r").getChannel();
- return new BitCaskFile(tstamp, filename, wch, wch_hint, rch);
+ return new BitCaskFile(tstamp, filename, null, null, rch);
}
/** Create a new bitcask file in named directory */
@@ -270,6 +264,8 @@ static BitCaskFile create(File dirname, int tstamp) throws IOException {
int entry_length = HEADER_SIZE + key_len + val_len;
acc = iter.each(ByteString.copyFrom(kv, 0, key_len),
ByteString.copyFrom(kv, key_len, val_len), tstamp, pos, entry_length, acc);
+
+ pos += entry_length;
}
return acc;
@@ -2,10 +2,13 @@
import java.io.File;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
-import org.junit.Assert;
import org.junit.Test;
+import com.google.protobuf.ByteString;
+
import static org.junit.Assert.*;
public class BitCaskTest {
@@ -33,6 +36,64 @@ public void roundtripTest() throws Exception {
b.close();
}
+ @Test
+ public void foldTest() throws Exception {
+
+ BitCask b = initDataset("/tmp/bc.test.fold", defaultDataset());
+
+ final Map<ByteString,ByteString> l = new HashMap<ByteString, ByteString>();
+
+ b.fold(new EntryIter<Void>() {
+
+ @Override
+ public Void each(ByteString key, ByteString value, int tstamp,
+ long entryPos, int entrySize, Void acc) {
+
+ l.put(key, value);
+ return null;
+
+ }
+ }, null);
+
+ assertEquals(l, defaultDataset());
+
+ b.close();
+ }
+
+ private BitCask initDataset(String string,
+ Map<ByteString, ByteString> ds) throws Exception {
+
+ File dir = new File(string);
+ rmdir(dir);
+
+ BitCaskOptions opts = new BitCaskOptions();
+ opts.read_write = true;
+ BitCask b = BitCask.open(dir, opts);
+
+ for (Map.Entry<ByteString, ByteString> ent : ds.entrySet()) {
+ b.put(ent.getKey(), ent.getValue());
+ }
+
+ return b;
+ }
+
+ private Map<ByteString, ByteString> defaultDataset() {
+
+
+ Map<ByteString, ByteString> res = new HashMap<ByteString, ByteString>();
+
+ res.put(bs("k"), bs("v"));
+ res.put(bs("k2"), bs("v2"));
+ res.put(bs("k3"), bs("v3"));
+
+ return res ;
+
+ }
+
+ private static ByteString bs(String string) {
+ return ByteString.copyFromUtf8(string);
+ }
+
private void rmdir(File dir) throws IOException, InterruptedException {
Process p = Runtime.getRuntime().exec(
new String[] { "rm", "-Rf", dir.getPath() });

0 comments on commit feb5165

Please sign in to comment.