Permalink
Browse files

Create free-space manager (and failing test)

  • Loading branch information...
justinsb committed Dec 11, 2013
1 parent 1c9e6d2 commit 4a622292340eb462db080cf8177427de81fd3088
@@ -3,7 +3,6 @@
import java.nio.ByteBuffer;
public class MasterPage {
public final static int SIZE = 16;
final ByteBuffer buffer;
@@ -4,11 +4,16 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.util.Collections;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudata.keyvalue.freemap.FreeSpaceMap;
import com.cloudata.keyvalue.freemap.SpaceMapEntry;
import com.cloudata.util.Mmap;
import com.google.common.collect.Lists;
public class MmapPageStore extends PageStore {
@@ -18,6 +23,10 @@
final boolean uniqueKeys;
final FreeSpaceMap freeSpaceMap;
private int freeSpaceSnapshotTransactionCount;
private static final int ALIGNMENT = 256;
private static final int HEADER_SIZE = 16384;
@@ -29,7 +38,6 @@ private MmapPageStore(MappedByteBuffer buffer, boolean uniqueKeys) {
this.uniqueKeys = uniqueKeys;
MasterPage latest = null;
for (int i = 0; i < MASTERPAGE_SLOTS; i++) {
int position = i * MasterPage.SIZE;
MasterPage metadataPage = new MasterPage(buffer, position);
@@ -43,9 +51,53 @@ private MmapPageStore(MappedByteBuffer buffer, boolean uniqueKeys) {
this.nextTransactionId = latest.getTransactionId() + 1;
setCurrent(latest.getRoot(), latest.getTransactionPageId());
this.freeSpaceMap = recoverFreeSpaceMap(latest);
this.buffer.position(HEADER_SIZE);
}
private FreeSpaceMap recoverFreeSpaceMap(MasterPage latest) {
int transactionPageId = latest.getTransactionPageId();
List<TransactionPage> history = Lists.newArrayList();
FreeSpaceMap.SnapshotPage fsmSnapshot = null;
if (transactionPageId != 0) {
TransactionPage transactionPage = (TransactionPage) fetchPage(null, transactionPageId).page;
TransactionPage current = transactionPage;
while (true) {
history.add(current);
if (current.getFreeSpaceSnapshotId() != 0) {
fsmSnapshot = (FreeSpaceMap.SnapshotPage) fetchPage(null, current.getFreeSpaceSnapshotId()).page;
break;
}
int previousTransactionPageId = current.getPreviousTransactionPageId();
if (previousTransactionPageId == 0) {
break;
}
TransactionPage previousTransaction = (TransactionPage) fetchPage(null, previousTransactionPageId).page;
assert (previousTransaction != null);
}
Collections.reverse(history);
}
FreeSpaceMap fsm;
if (fsmSnapshot == null) {
fsm = FreeSpaceMap.createEmpty(HEADER_SIZE / ALIGNMENT, this.buffer.limit() / ALIGNMENT);
} else {
fsm = FreeSpaceMap.createFromSnapshot(fsmSnapshot);
}
for (TransactionPage txn : history) {
fsm.replay(txn);
}
return fsm;
}
public static MmapPageStore build(File data, boolean uniqueKeys) throws IOException {
if (!data.exists()) {
long size = 1024L * 1024L * 64L;
@@ -66,11 +118,23 @@ public static MmapPageStore build(File data, boolean uniqueKeys) throws IOExcept
}
@Override
public Page fetchPage(Page parent, int pageNumber) {
public PageRecord fetchPage(Page parent, int pageNumber) {
int offset = pageNumber * ALIGNMENT;
PageHeader header = new PageHeader(buffer, offset);
SpaceMapEntry space;
{
int dataSize = header.getDataSize();
int totalSize = dataSize + PageHeader.HEADER_SIZE;
int slots = totalSize / ALIGNMENT;
if ((totalSize % ALIGNMENT) != 0) {
slots++;
}
space = new SpaceMapEntry(pageNumber, slots);
}
Page page;
switch (header.getPageType()) {
@@ -97,28 +161,51 @@ public Page fetchPage(Page parent, int pageNumber) {
System.out.flush();
}
return page;
return new PageRecord(page, space);
}
@Override
public int writePage(Page page) {
public SpaceMapEntry writePage(Page page) {
int dataSize = page.getSerializedSize();
int totalSize = dataSize + PageHeader.HEADER_SIZE;
if (totalSize > buffer.remaining()) {
// TODO: Reclaim old space
// TODO: Incorporate padding into calculation?
throw new UnsupportedOperationException();
// int padding = totalSize % ALIGNMENT;
// if (padding != 0) {
// padding = ALIGNMENT - padding;
// }
int position;
SpaceMapEntry allocation;
{
int allocateSlots = totalSize / ALIGNMENT;
if ((totalSize % ALIGNMENT) != 0) {
allocateSlots++;
}
int allocated = freeSpaceMap.allocate(allocateSlots);
if (allocated < 0) {
// TODO: Grow database
throw new IllegalStateException();
}
position = allocated * ALIGNMENT;
allocation = new SpaceMapEntry(allocated, allocateSlots);
}
int position = buffer.position();
// if (totalSize > buffer.remaining()) {
// // TODO: Reclaim old space
// // TODO: Incorporate padding into calculation?
// throw new UnsupportedOperationException();
// }
//
// int position = buffer.position();
assert (position % ALIGNMENT) == 0;
// int newPageNumber = (position + (ALIGNMENT - 1)) / ALIGNMENT;
int newPageNumber = position / ALIGNMENT;
// int newPageNumber = position / ALIGNMENT;
ByteBuffer writeBuffer = buffer.duplicate();
writeBuffer.position(position);
PageHeader.write(writeBuffer, page.getPageType(), dataSize);
writeBuffer.limit(writeBuffer.position() + dataSize);
@@ -133,14 +220,10 @@ public int writePage(Page page) {
throw new IllegalStateException();
}
int padding = totalSize % ALIGNMENT;
if (padding != 0) {
padding = ALIGNMENT - padding;
}
buffer.position(buffer.position() + totalSize + padding);
assert (buffer.position() % ALIGNMENT) == 0;
// buffer.position(buffer.position() + totalSize + padding);
// assert (buffer.position() % ALIGNMENT) == 0;
return newPageNumber;
return allocation;
}
@Override
@@ -149,24 +232,32 @@ public void commitTransaction(TransactionPage transactionPage) {
synchronized (this) {
transactionPage.setPreviousTransactionPageId(currentTransactionPage);
if (freeSpaceSnapshotTransactionCount > 64) {
SpaceMapEntry fsmPageId = freeSpaceMap.writeSnapshot(this);
transactionPage.setFreeSpaceSnapshotId(fsmPageId.getPageId());
freeSpaceSnapshotTransactionCount = 0;
} else {
freeSpaceSnapshotTransactionCount++;
}
long transactionId = transactionPage.getTransactionId();
int transactionPageId = writePage(transactionPage);
SpaceMapEntry transactionPageId = writePage(transactionPage);
int newRootPage = transactionPage.getRootPageId();
int slot = transactionPageId % MASTERPAGE_SLOTS;
int slot = (int) (transactionId % MASTERPAGE_SLOTS);
int position = slot * MasterPage.SIZE;
ByteBuffer mmap = this.buffer.duplicate();
mmap.position(position);
MasterPage.create(mmap, newRootPage, transactionPageId, transactionId);
MasterPage.create(mmap, newRootPage, transactionPageId.getPageId(), transactionId);
log.info("Committing transaction {}. New root={}", transactionId, newRootPage);
setCurrent(newRootPage, transactionPageId);
setCurrent(newRootPage, transactionPageId.getPageId());
}
}
@@ -49,4 +49,9 @@ public static void write(ByteBuffer dest, byte pageType, int length) {
dest.putInt(0);
}
public int getDataSize() {
int length = buffer.getInt(offset + OFFSET_LENGTH);
return length;
}
}
@@ -5,22 +5,35 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudata.keyvalue.freemap.SpaceMapEntry;
public abstract class PageStore {
private static final Logger log = LoggerFactory.getLogger(PageStore.class);
protected int currentTransactionPage;
private int currentRootPage;
public abstract Page fetchPage(Page parent, int pageNumber);
public static class PageRecord {
public final Page page;
public final SpaceMapEntry space;
public PageRecord(Page page, SpaceMapEntry space) {
this.page = page;
this.space = space;
}
}
public abstract PageRecord fetchPage(Page parent, int pageNumber);
/**
* Writes the page to the PageStore (disk, usually)
*
* @param page
* @return the new page number
*/
public abstract int writePage(Page page);
public abstract SpaceMapEntry writePage(Page page);
public abstract void commitTransaction(TransactionPage transaction);
@@ -16,7 +16,7 @@ public ReadOnlyTransaction(PageStore pageStore, int rootPageId) {
@Override
public Page getPage(Page parent, int pageNumber) {
// TODO: Should we have a small cache?
return pageStore.fetchPage(parent, pageNumber);
return pageStore.fetchPage(parent, pageNumber).page;
}
@Override
@@ -43,7 +43,6 @@ protected void unlock() {
if (lock != null && !releasedLock) {
lock.unlock();
releasedLock = true;
}
}
}
Oops, something went wrong.

0 comments on commit 4a62229

Please sign in to comment.