Permalink
Browse files

Support unique-key constraint

  • Loading branch information...
justinsb committed Dec 9, 2013
1 parent 5cfd93a commit 59ac376a00eaca57826440748106e24a513375c3
@@ -98,7 +98,8 @@ public KeyValueStore load(@Nonnull Long id) throws Exception {
dir.mkdirs();
return new KeyValueStore(dir);
boolean uniqueKeys = true;
return new KeyValueStore(dir, uniqueKeys);
} catch (Exception e) {
log.warn("Error building KeyValueStore", e);
throw e;
@@ -21,16 +21,16 @@
final Btree btree;
public KeyValueStore(File dir) throws IOException {
public KeyValueStore(File dir, boolean uniqueKeys) throws IOException {
File data = new File(dir, "data");
PageStore pageStore = MmapPageStore.build(data);
this.btree = new Btree(pageStore);
PageStore pageStore = MmapPageStore.build(data, uniqueKeys);
this.btree = new Btree(pageStore, uniqueKeys);
}
public void put(ByteBuffer key, ByteBuffer value) {
ReadWriteTransaction txn = btree.beginReadWrite();
txn.insert(key, value);
txn.insert(btree, key, value);
txn.commit();
}
@@ -57,7 +57,7 @@ public ByteBuffer get(final ByteBuffer key) {
ReadOnlyTransaction txn = btree.beginReadOnly();
GetEntryListener listener = new GetEntryListener(key);
txn.walk(key, listener);
txn.walk(btree, key, listener);
ByteBuffer value = listener.foundValue;
@@ -7,9 +7,11 @@
private static final Logger log = LoggerFactory.getLogger(Btree.class);
final PageStore pageStore;
final boolean uniqueKeys;
public Btree(PageStore pageStore) {
public Btree(PageStore pageStore, boolean uniqueKeys) {
this.pageStore = pageStore;
this.uniqueKeys = uniqueKeys;
}
public ReadWriteTransaction beginReadWrite() {
@@ -22,4 +24,8 @@ public ReadOnlyTransaction beginReadOnly() {
return txn;
}
public boolean isUniqueKeys() {
return uniqueKeys;
}
}
@@ -17,11 +17,15 @@
Mutable mutable;
public LeafPage(Page parent, int pageNumber, ByteBuffer buffer) {
final boolean uniqueKeys;
public LeafPage(Page parent, int pageNumber, ByteBuffer buffer, boolean uniqueKeys) {
super(parent, pageNumber, buffer);
this.uniqueKeys = uniqueKeys;
}
static class Mutable {
final boolean uniqueKeys;
final List<Entry> entries;
int totalKeySize;
int totalValueSize;
@@ -54,15 +58,30 @@ int lbound(ByteBuffer find) {
void insert(ByteBuffer key, ByteBuffer value) {
int position = lbound(key);
// TODO: Should we enforce uniqueness?
Entry newEntry = new Entry(key, value);
if (uniqueKeys) {
if (position < entries.size()) {
ByteBuffer midKey = getKey(position);
int comparison = ByteBuffers.compare(midKey, key);
if (comparison == 0) {
ByteBuffer oldValue = getValue(position);
entries.set(position, newEntry);
totalValueSize += value.remaining() - oldValue.remaining();
return;
}
}
}
entries.add(position, new Entry(key, value));
entries.add(position, newEntry);
totalKeySize += key.remaining();
totalValueSize += value.remaining();
}
Mutable(LeafPage page) {
this.uniqueKeys = page.uniqueKeys;
int n = page.getEntryCount();
this.entries = new ArrayList<Entry>(n);
@@ -312,7 +331,7 @@ public byte getPageType() {
return PAGE_TYPE;
}
public static LeafPage createNew(Page parent, int pageNumber) {
public static LeafPage createNew(Page parent, int pageNumber, boolean uniqueKeys) {
// TODO: Reuse a shared buffer?
ByteBuffer empty = ByteBuffer.allocate(6);
empty.putShort((short) 0);
@@ -321,7 +340,7 @@ public static LeafPage createNew(Page parent, int pageNumber) {
empty.flip();
return new LeafPage(parent, pageNumber, empty);
return new LeafPage(parent, pageNumber, empty, uniqueKeys);
}
@Override
@@ -18,32 +18,35 @@
int rootPage;
final boolean uniqueKeys;
private static final int ALIGNMENT = 256;
private static final int HEADER_SIZE = 16384;
private MmapPageStore(MappedByteBuffer buffer) {
private MmapPageStore(MappedByteBuffer buffer, boolean uniqueKeys) {
this.buffer = buffer;
this.uniqueKeys = uniqueKeys;
MetadataPage metadataPage = new MetadataPage(buffer, 0);
this.rootPage = metadataPage.getRoot();
this.buffer.position(HEADER_SIZE);
}
public static MmapPageStore build(File data) throws IOException {
public static MmapPageStore build(File data, boolean uniqueKeys) throws IOException {
if (!data.exists()) {
long size = 1024L * 1024L * 64L;
MappedByteBuffer mmap = Mmap.mmapFile(data, size);
MetadataPage.create(mmap, 0);
return new MmapPageStore(mmap);
return new MmapPageStore(mmap, uniqueKeys);
} else {
long size = data.length();
MappedByteBuffer mmap = Mmap.mmapFile(data, size);
return new MmapPageStore(mmap);
return new MmapPageStore(mmap, uniqueKeys);
}
}
@@ -61,7 +64,7 @@ public Page fetchPage(Page parent, int pageNumber) {
break;
case LeafPage.PAGE_TYPE:
page = new LeafPage(parent, pageNumber, header.getPageSlice());
page = new LeafPage(parent, pageNumber, header.getPageSlice(), uniqueKeys);
break;
default:
@@ -23,7 +23,7 @@ public Page getPage(Page parent, int pageNumber) {
int rootPageId;
@Override
protected Page getRootPage(boolean create) {
protected Page getRootPage(Btree btree, boolean create) {
if (rootPageId == 0) {
rootPageId = pageStore.getRootPageId();
}
@@ -99,8 +99,8 @@ public void commit() {
pageStore.commitTransaction(newRootPage);
}
public void insert(ByteBuffer key, ByteBuffer value) {
getRootPage(true).insert(this, key, value);
public void insert(Btree btree, ByteBuffer key, ByteBuffer value) {
getRootPage(btree, true).insert(this, key, value);
}
int createdPageCount;
@@ -121,7 +121,7 @@ private void createPage(Page parent, int pageNumber, Page newPage) {
int rootPageId;
@Override
protected Page getRootPage(boolean create) {
protected Page getRootPage(Btree btree, boolean create) {
if (rootPageId == 0) {
rootPageId = pageStore.getRootPageId();
}
@@ -133,7 +133,7 @@ protected Page getRootPage(boolean create) {
createdPageCount++;
int pageNumber = -createdPageCount;
LeafPage newPage = LeafPage.createNew(null, pageNumber);
LeafPage newPage = LeafPage.createNew(null, pageNumber, btree.isUniqueKeys());
createPage(null, pageNumber, newPage);
rootPageId = pageNumber;
@@ -16,14 +16,14 @@ public Transaction(PageStore pageStore) {
public abstract Page getPage(Page parent, int pageNumber);
public void walk(ByteBuffer from, EntryListener listener) {
Page rootPage = getRootPage(false);
public void walk(Btree btree, ByteBuffer from, EntryListener listener) {
Page rootPage = getRootPage(btree, false);
if (rootPage == null) {
log.info("No data; returning immediately from walk");
return;
}
rootPage.walk(this, from, listener);
}
protected abstract Page getRootPage(boolean create);
protected abstract Page getRootPage(Btree btree, boolean create);
}
@@ -78,10 +78,7 @@ public void simpleTest() throws Exception {
for (int i = 1; i < n; i++) {
byte[] key = Integer.toString(i).getBytes();
byte[] data = new byte[i];
for (int j = 0; j < i; j++) {
data[j] = (byte) (j % 0xff);
}
byte[] data = buildValue(i);
client.put(logId, ByteString.copyFrom(key), ByteString.copyFrom(data));
}
@@ -92,12 +89,51 @@ public void simpleTest() throws Exception {
byte[] key = Integer.toString(i).getBytes();
KeyValueEntry entry = client.read(logId, ByteString.copyFrom(key));
byte[] data = entry.getValue().toByteArray();
Assert.assertEquals(i, data.length);
byte[] expected = buildValue(i);
Assert.assertArrayEquals(expected, data);
}
for (int j = 0; j < i; j++) {
Assert.assertEquals((byte) (j % 0xff), data[j]);
}
}
@Test
public void testReplaceValue() throws Exception {
String url = SERVERS[0].getHttpUrl();
long logId = 3;
KeyValueClient client = new KeyValueClient(url);
int n = 20;
// Set i = i
for (int i = 1; i < n; i++) {
byte[] key = Integer.toString(i).getBytes();
byte[] data = buildValue(i);
client.put(logId, ByteString.copyFrom(key), ByteString.copyFrom(data));
}
// Set i = i * 2
for (int i = 1; i < n; i++) {
byte[] key = Integer.toString(i).getBytes();
byte[] data = buildValue(i * 2);
client.put(logId, ByteString.copyFrom(key), ByteString.copyFrom(data));
}
// Check i = i * 2
for (int i = 1; i < n; i++) {
byte[] key = Integer.toString(i).getBytes();
KeyValueEntry entry = client.read(logId, ByteString.copyFrom(key));
byte[] data = entry.getValue().toByteArray();
byte[] expected = buildValue(i * 2);
Assert.assertArrayEquals(expected, data);
}
}
private byte[] buildValue(int i) {
byte[] data = new byte[i];
for (int j = 0; j < i; j++) {
data[j] = (byte) (j % 0xff);
}
return data;
}
}

0 comments on commit 59ac376

Please sign in to comment.