Permalink
Browse files

Real transaction handling; write a transaction record as a page, writ…

…e multiple links in the header
  • Loading branch information...
justinsb committed Dec 9, 2013
1 parent 398299a commit 0ecdc908c790c489f67e72dacc65c1239b8e7344
@@ -21,12 +21,12 @@ public Btree(PageStore pageStore, boolean uniqueKeys) {
public ReadWriteTransaction beginReadWrite() {
writeLock.lock();
ReadWriteTransaction txn = new ReadWriteTransaction(pageStore, writeLock);
ReadWriteTransaction txn = pageStore.beginReadWriteTransaction(writeLock);
return txn;
}
public ReadOnlyTransaction beginReadOnly() {
ReadOnlyTransaction txn = new ReadOnlyTransaction(pageStore);
ReadOnlyTransaction txn = pageStore.beginReadOnlyTransaction();
return txn;
}
@@ -0,0 +1,34 @@
package com.cloudata.keyvalue.btree;
import java.nio.ByteBuffer;
public class MasterPage {
public final static int SIZE = 16;
final ByteBuffer buffer;
final int offset;
public MasterPage(ByteBuffer buffer, int offset) {
this.buffer = buffer;
this.offset = offset;
}
public int getRoot() {
return buffer.getInt(offset);
}
public static void create(ByteBuffer mmap, int rootPage, int transactionPage, long transactionId) {
mmap.putInt(rootPage);
mmap.putInt(transactionPage);
mmap.putLong(transactionId);
}
public int getTransactionPageId() {
return buffer.getInt(offset + 4);
}
public long getTransactionId() {
return buffer.getLong(offset + 8);
}
}

This file was deleted.

Oops, something went wrong.
@@ -16,20 +16,32 @@
final MappedByteBuffer buffer;
int rootPage;
final boolean uniqueKeys;
private static final int ALIGNMENT = 256;
private static final int HEADER_SIZE = 16384;
private static final int MASTERPAGE_SLOTS = 8;
private MmapPageStore(MappedByteBuffer buffer, boolean uniqueKeys) {
this.buffer = buffer;
this.uniqueKeys = uniqueKeys;
MetadataPage metadataPage = new MetadataPage(buffer, 0);
this.rootPage = metadataPage.getRoot();
MasterPage latest = null;
for (int i = 0; i < MASTERPAGE_SLOTS; i++) {
int position = i * MasterPage.SIZE;
MasterPage metadataPage = new MasterPage(buffer, position);
if (latest == null) {
latest = metadataPage;
} else if (latest.getTransactionId() < metadataPage.getTransactionId()) {
latest = metadataPage;
}
}
this.nextTransactionId = latest.getTransactionId() + 1;
setCurrent(latest.getRoot(), latest.getTransactionPageId());
this.buffer.position(HEADER_SIZE);
}
@@ -39,7 +51,10 @@ public static MmapPageStore build(File data, boolean uniqueKeys) throws IOExcept
long size = 1024L * 1024L * 64L;
MappedByteBuffer mmap = Mmap.mmapFile(data, size);
MetadataPage.create(mmap, 0);
for (int i = 0; i < MASTERPAGE_SLOTS; i++) {
mmap.position(i * MasterPage.SIZE);
MasterPage.create(mmap, 0, 0, 0);
}
return new MmapPageStore(mmap, uniqueKeys);
} else {
@@ -67,6 +82,10 @@ public Page fetchPage(Page parent, int pageNumber) {
page = new LeafPage(parent, pageNumber, header.getPageSlice(), uniqueKeys);
break;
case TransactionPage.PAGE_TYPE:
page = new TransactionPage(parent, pageNumber, header.getPageSlice());
break;
default:
throw new IllegalStateException();
}
@@ -123,20 +142,30 @@ public int writePage(Page page) {
}
@Override
public int getRootPageId() {
return rootPage;
}
public void commitTransaction(TransactionPage transactionPage) {
// Shouldn't need to be synchronized, but harmless...
synchronized (this) {
transactionPage.setPreviousTransactionPage(currentTransactionPage);
@Override
public void commitTransaction(int newRootPage) {
ByteBuffer mmap = this.buffer.duplicate();
mmap.position(0);
long transactionId = transactionPage.getTransactionId();
int transactionPageId = writePage(transactionPage);
int newRootPage = transactionPage.getRootPage();
int slot = transactionPageId % MASTERPAGE_SLOTS;
MetadataPage.create(mmap, newRootPage);
int position = slot * MasterPage.SIZE;
log.info("Committing transaction. New root={}", newRootPage);
ByteBuffer mmap = this.buffer.duplicate();
mmap.position(position);
this.rootPage = newRootPage;
MasterPage.create(mmap, newRootPage, transactionPageId, transactionId);
log.info("Committing transaction {}. New root={}", transactionId, newRootPage);
setCurrent(newRootPage, transactionPageId);
}
}
}
@@ -1,7 +1,17 @@
package com.cloudata.keyvalue.btree;
import java.util.concurrent.locks.Lock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class PageStore {
private static final Logger log = LoggerFactory.getLogger(PageStore.class);
protected int currentTransactionPage;
private int currentRootPage;
public abstract Page fetchPage(Page parent, int pageNumber);
/**
@@ -12,7 +22,32 @@
*/
public abstract int writePage(Page page);
public abstract int getRootPageId();
public abstract void commitTransaction(TransactionPage transaction);
protected long nextTransactionId;
public long assignTransactionId() {
return nextTransactionId++;
}
public ReadOnlyTransaction beginReadOnlyTransaction() {
synchronized (this) {
log.info("Starting new read-write transaction with root page: {}", currentRootPage);
return new ReadOnlyTransaction(this, currentRootPage);
}
}
protected void setCurrent(int rootPage, int transactionPageId) {
synchronized (this) {
this.currentRootPage = rootPage;
this.currentTransactionPage = rootPage;
}
}
public abstract void commitTransaction(int newRootPage);
public ReadWriteTransaction beginReadWriteTransaction(Lock writeLock) {
synchronized (this) {
log.info("Starting new read-write transaction with root page: {}", currentRootPage);
return new ReadWriteTransaction(this, writeLock, currentRootPage);
}
}
}
@@ -6,8 +6,11 @@
public class ReadOnlyTransaction extends Transaction {
private static final Logger log = LoggerFactory.getLogger(ReadOnlyTransaction.class);
public ReadOnlyTransaction(PageStore pageStore) {
final int rootPageId;
public ReadOnlyTransaction(PageStore pageStore, int rootPageId) {
super(pageStore, null);
this.rootPageId = rootPageId;
}
@Override
@@ -16,14 +19,8 @@ public Page getPage(Page parent, int pageNumber) {
return pageStore.fetchPage(parent, pageNumber);
}
int rootPageId;
@Override
protected Page getRootPage(Btree btree, boolean create) {
if (rootPageId == 0) {
rootPageId = pageStore.getRootPageId();
}
if (rootPageId == 0) {
if (!create) {
return null;
@@ -18,6 +18,8 @@
final Map<Integer, TrackedPage> trackedPages = Maps.newHashMap();
private int rootPageId;
static class TrackedPage {
final Page page;
final TrackedPage parent;
@@ -30,11 +32,11 @@ public TrackedPage(Page page, TrackedPage parent, int originalPageNumber) {
this.parent = parent;
this.originalPageNumber = originalPageNumber;
}
}
public ReadWriteTransaction(PageStore pageStore, Lock lock) {
public ReadWriteTransaction(PageStore pageStore, Lock lock, int rootPageId) {
super(pageStore, lock);
this.rootPageId = rootPageId;
}
@Override
@@ -62,6 +64,13 @@ public void commit() {
}
}
if (rootPageId == 0) {
if (!ready.isEmpty()) {
throw new IllegalStateException();
}
return;
}
int newRootPage = -1;
while (!ready.isEmpty()) {
@@ -100,7 +109,17 @@ public void commit() {
assert newRootPage != -1;
pageStore.commitTransaction(newRootPage);
TransactionPage transactionPage;
long transactionId = pageStore.assignTransactionId();
{
createdPageCount++;
int pageNumber = -createdPageCount;
transactionPage = TransactionPage.createNew(pageNumber, transactionId);
}
transactionPage.setRootPageId(newRootPage);
pageStore.commitTransaction(transactionPage);
}
public void doAction(Btree btree, KvAction action, ByteBuffer key, ByteBuffer value) {
@@ -122,14 +141,8 @@ private void createPage(Page parent, int pageNumber, Page newPage) {
trackedPages.put(pageNumber, trackedPage);
}
int rootPageId;
@Override
protected Page getRootPage(Btree btree, boolean create) {
if (rootPageId == 0) {
rootPageId = pageStore.getRootPageId();
}
if (rootPageId == 0) {
if (!create) {
return null;
Oops, something went wrong.

0 comments on commit 0ecdc90

Please sign in to comment.