Permalink
Browse files

Actually reclaim the pages from unneeded transactions

  • Loading branch information...
justinsb committed Dec 11, 2013
1 parent 65034e5 commit 14a79d7bb93dc3b79f3e4cb8b5fcf712b5da4460
@@ -25,7 +25,7 @@
final FreeSpaceMap freeSpaceMap;
private int freeSpaceSnapshotTransactionCount;
final List<SpaceMapEntry> deferredReclaim;
private static final int ALIGNMENT = 256;
@@ -36,6 +36,7 @@
private MmapPageStore(MappedByteBuffer buffer, boolean uniqueKeys) {
this.buffer = buffer;
this.uniqueKeys = uniqueKeys;
this.deferredReclaim = Lists.newArrayList();
MasterPage latest = null;
for (int i = 0; i < MASTERPAGE_SLOTS; i++) {
@@ -49,7 +50,7 @@ private MmapPageStore(MappedByteBuffer buffer, boolean uniqueKeys) {
}
this.nextTransactionId = latest.getTransactionId() + 1;
setCurrent(latest.getRoot(), latest.getTransactionPageId());
setCurrent(latest.getRoot(), latest.getTransactionId(), latest.getTransactionPageId());
this.freeSpaceMap = recoverFreeSpaceMap(latest);
@@ -61,6 +62,7 @@ private FreeSpaceMap recoverFreeSpaceMap(MasterPage latest) {
List<PageRecord> history = Lists.newArrayList();
PageRecord fsmSnapshot = null;
// Walk the list of transactions backwards until we find a FSM snapshot
if (transactionPageId != 0) {
PageRecord current = fetchPage(null, transactionPageId);
@@ -227,17 +229,18 @@ public SpaceMapEntry writePage(Page page) {
}
@Override
public void commitTransaction(TransactionPage transactionPage) {
public SpaceMapEntry commitTransaction(TransactionPage transactionPage) {
// Shouldn't need to be synchronized, but harmless...
synchronized (this) {
transactionPage.setPreviousTransactionPageId(currentTransactionPage);
if (freeSpaceSnapshotTransactionCount > 64) {
if (deferredReclaim.size() > 64) {
freeSpaceMap.reclaimAll(deferredReclaim);
SpaceMapEntry fsmPageId = freeSpaceMap.writeSnapshot(this);
transactionPage.setFreeSpaceSnapshotId(fsmPageId.getPageId());
freeSpaceSnapshotTransactionCount = 0;
} else {
freeSpaceSnapshotTransactionCount++;
deferredReclaim.clear();
}
long transactionId = transactionPage.getTransactionId();
@@ -257,7 +260,18 @@ public void commitTransaction(TransactionPage transactionPage) {
log.info("Committing transaction {}. New root={}", transactionId, newRootPage);
setCurrent(newRootPage, transactionPageId.getPageId());
setCurrent(newRootPage, transactionId, transactionPageId.getPageId());
return transactionPageId;
}
}
@Override
protected void reclaim(WriteTransaction txn, SpaceMapEntry txnSpace) {
synchronized (this) {
deferredReclaim.addAll(txn.getFreed());
// TODO: Any reason not to release the transaction at the same time?
deferredReclaim.add(txnSpace);
}
}
@@ -1,18 +1,41 @@
package com.cloudata.keyvalue.btree;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.locks.Lock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudata.keyvalue.freemap.SpaceMapEntry;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
public abstract class PageStore {
private static final Logger log = LoggerFactory.getLogger(PageStore.class);
protected int currentTransactionPage;
private int currentRootPage;
private long currentTransactionId;
private final List<ReadOnlyTransaction> readTransactions = Lists.newArrayList();
private WriteTransaction writeTransaction;
static class CleanupQueueEntry {
final WriteTransaction transaction;
final SpaceMapEntry space;
final long transactionId;
public CleanupQueueEntry(WriteTransaction transaction, SpaceMapEntry space) {
this.transaction = transaction;
this.transactionId = transaction.getTransactionId();
assert this.transactionId != 0;
this.space = space;
}
}
final Queue<CleanupQueueEntry> writeTransactionCleanupQueue;
public static class PageRecord {
public final Page page;
@@ -22,7 +45,10 @@ public PageRecord(Page page, SpaceMapEntry space) {
this.page = page;
this.space = space;
}
}
protected PageStore() {
writeTransactionCleanupQueue = Queues.newArrayDeque();
}
public abstract PageRecord fetchPage(Page parent, int pageNumber);
@@ -35,7 +61,7 @@ public PageRecord(Page page, SpaceMapEntry space) {
*/
public abstract SpaceMapEntry writePage(Page page);
public abstract void commitTransaction(TransactionPage transaction);
public abstract SpaceMapEntry commitTransaction(TransactionPage transaction);
protected long nextTransactionId;
@@ -46,21 +72,73 @@ public long assignTransactionId() {
public ReadOnlyTransaction beginReadOnlyTransaction() {
synchronized (this) {
log.info("Starting new read-write transaction with root page: {}", currentRootPage);
return new ReadOnlyTransaction(this, currentRootPage);
ReadOnlyTransaction txn = new ReadOnlyTransaction(this, currentRootPage, currentTransactionId);
readTransactions.add(txn);
return txn;
}
}
protected void setCurrent(int rootPage, int transactionPageId) {
protected void setCurrent(int rootPage, long transactionId, int transactionPageId) {
synchronized (this) {
this.currentRootPage = rootPage;
this.currentTransactionId = transactionId;
this.currentTransactionPage = rootPage;
}
}
public WriteTransaction beginReadWriteTransaction(Lock writeLock) {
synchronized (this) {
if (writeTransaction != null) {
throw new IllegalStateException();
}
log.info("Starting new read-write transaction with root page: {}", currentRootPage);
return new WriteTransaction(this, writeLock, currentRootPage);
writeTransaction = new WriteTransaction(this, writeLock, currentRootPage);
return writeTransaction;
}
}
public void finished(Transaction transaction) {
synchronized (this) {
boolean wasRead = false;
if (transaction == this.writeTransaction) {
if (writeTransaction.getTransactionId() != 0) {
writeTransactionCleanupQueue.add(new CleanupQueueEntry(writeTransaction, writeTransaction
.getTransactionSpaceMapEntry()));
} else {
// Transaction rollback...
if (!writeTransaction.getAllocated().isEmpty()) {
throw new UnsupportedOperationException();
}
}
this.writeTransaction = null;
} else {
if (!readTransactions.remove(transaction)) {
throw new IllegalStateException();
}
wasRead = true;
}
if (!writeTransactionCleanupQueue.isEmpty()) {
long keepTransaction = Long.MAX_VALUE;
for (ReadOnlyTransaction readTransaction : readTransactions) {
keepTransaction = Math.min(readTransaction.getSnapshotTransactionId(), keepTransaction);
}
while (!writeTransactionCleanupQueue.isEmpty()) {
CleanupQueueEntry cleanupQueueEntry = writeTransactionCleanupQueue.peek();
if (cleanupQueueEntry.transactionId >= keepTransaction) {
break;
}
cleanupQueueEntry = writeTransactionCleanupQueue.remove();
reclaim(cleanupQueueEntry.transaction, cleanupQueueEntry.space);
}
}
}
}
protected abstract void reclaim(WriteTransaction cleanup, SpaceMapEntry txnSpace);
}
@@ -7,10 +7,12 @@
private static final Logger log = LoggerFactory.getLogger(ReadOnlyTransaction.class);
final int rootPageId;
final long snapshotTransactionId;
public ReadOnlyTransaction(PageStore pageStore, int rootPageId) {
public ReadOnlyTransaction(PageStore pageStore, int rootPageId, long snapshotTransactionId) {
super(pageStore, null);
this.rootPageId = rootPageId;
this.snapshotTransactionId = snapshotTransactionId;
}
@Override
@@ -30,4 +32,8 @@ protected Page getRootPage(Btree btree, boolean create) {
return getPage(null, rootPageId);
}
public long getSnapshotTransactionId() {
return snapshotTransactionId;
}
}
@@ -36,6 +36,8 @@ public void walk(Btree btree, ByteBuffer from, EntryListener listener) {
@Override
public void close() {
unlock();
pageStore.finished(this);
}
protected void unlock() {
@@ -23,8 +23,17 @@
final Map<Integer, TrackedPage> trackedPages = Maps.newHashMap();
final List<SpaceMapEntry> freed = Lists.newArrayList();
final List<SpaceMapEntry> allocated = Lists.newArrayList();
private int rootPageId;
private long transactionId;
public long getTransactionId() {
return transactionId;
}
static class TrackedPage {
final Page page;
TrackedPage parent;
@@ -69,9 +78,6 @@ public Page getPage(Page parent, int pageNumber) {
}
public void commit() {
List<SpaceMapEntry> freed = Lists.newArrayList();
List<SpaceMapEntry> allocated = Lists.newArrayList();
Queue<TrackedPage> ready = Queues.newArrayDeque();
for (Entry<Integer, TrackedPage> entry : trackedPages.entrySet()) {
@@ -164,7 +170,9 @@ public void commit() {
log.info("Freed pages: {}", Joiner.on(",").join(freed));
pageStore.commitTransaction(transactionPage);
this.spaceMapEntry = pageStore.commitTransaction(transactionPage);
this.transactionId = transactionId;
}
public void doAction(Btree btree, KvAction action, ByteBuffer key, ByteBuffer value) {
@@ -173,6 +181,8 @@ public void doAction(Btree btree, KvAction action, ByteBuffer key, ByteBuffer va
int createdPageCount;
private SpaceMapEntry spaceMapEntry;
public int assignPageNumber() {
return -(++createdPageCount);
}
@@ -211,4 +221,16 @@ protected Page getRootPage(Btree btree, boolean create) {
return getPage(null, rootPageId);
}
public List<SpaceMapEntry> getFreed() {
return freed;
}
public SpaceMapEntry getTransactionSpaceMapEntry() {
return spaceMapEntry;
}
public List<SpaceMapEntry> getAllocated() {
return allocated;
}
}
@@ -21,7 +21,7 @@
private FreeSpaceMap(int start, int end) {
this.freeRanges = new RangeTree();
this.freeRanges.add(start, end - start);
this.freeRanges.release(start, end - start);
}
private FreeSpaceMap(SnapshotPage snapshot) {
@@ -35,18 +35,18 @@ public void replay(PageRecord txnRecord) {
int start = txn.getFreedStart(i);
int size = txn.getFreedLength(i);
freeRanges.add(start, size);
freeRanges.release(start, size);
}
for (int i = 0; i < txn.getAllocatedCount(); i++) {
int start = txn.getAllocatedStart(i);
int size = txn.getAllocatedLength(i);
freeRanges.remove(start, size);
freeRanges.replayAllocate(start, size);
}
// The free space map can't include the transaction itself
freeRanges.remove(txnRecord.space.start, txnRecord.space.length);
freeRanges.replayAllocate(txnRecord.space.start, txnRecord.space.length);
}
public static FreeSpaceMap createEmpty(int start, int end) {
@@ -62,6 +62,20 @@ public static FreeSpaceMap createFromSnapshot(PageRecord snapshotRecord) {
return fsm;
}
public SpaceMapEntry writeSnapshot(PageStore pageStore) {
ByteBuffer empty = ByteBuffer.allocate(0);
SnapshotPage page = new SnapshotPage(null, Integer.MIN_VALUE, empty);
return pageStore.writePage(page);
}
public int allocate(int size) {
return this.freeRanges.allocate(size);
}
public void reclaimAll(List<SpaceMapEntry> entries) {
freeRanges.releaseAll(entries);
}
// public void reclaimSpace(TransactionPage t) {
// int freelistSize = t.getFreelistSize();
// for (int i = 0; i < freelistSize; i++) {
@@ -159,14 +173,4 @@ public boolean shouldSplit() {
}
}
public SpaceMapEntry writeSnapshot(PageStore pageStore) {
ByteBuffer empty = ByteBuffer.allocate(0);
SnapshotPage page = new SnapshotPage(null, Integer.MIN_VALUE, empty);
return pageStore.writePage(page);
}
public int allocate(int size) {
return this.freeRanges.allocate(size);
}
}
Oops, something went wrong.

0 comments on commit 14a79d7

Please sign in to comment.