Skip to content

Commit

Permalink
[MOD] Further simplified transaction log stuff. Next step: Remove
Browse files Browse the repository at this point in the history
BerkeleyDB persistent store as the underlying storage if it doesn't fit
in memory. Can be a simple random access file instead.
  • Loading branch information
JohannesLichtenberger committed Aug 20, 2017
1 parent ecf53cb commit 4dd7361
Show file tree
Hide file tree
Showing 50 changed files with 563 additions and 1,462 deletions.
Expand Up @@ -7,7 +7,7 @@

import org.sirix.api.PageReadTrx;
import org.sirix.api.ResourceManager;
import org.sirix.cache.RecordPageContainer;
import org.sirix.cache.PageContainer;
import org.sirix.exception.SirixIOException;
import org.sirix.io.Reader;
import org.sirix.node.Kind;
Expand Down Expand Up @@ -115,7 +115,7 @@ public void close() throws SirixIOException {
}

@Override
public <K extends Comparable<? super K>, V extends Record, S extends KeyValuePage<K, V>> RecordPageContainer<S> getRecordPageContainer(
public <K extends Comparable<? super K>, V extends Record, S extends KeyValuePage<K, V>> PageContainer getRecordPageContainer(
@Nonnull @Nonnegative Long key, int index, @Nonnull PageKind pageKind)
throws SirixIOException {
return delegate().<K, V, S>getRecordPageContainer(key, index, pageKind);
Expand Down
113 changes: 57 additions & 56 deletions bundles/sirix-core/src/main/java/org/sirix/access/PageReadTrxImpl.java
Expand Up @@ -45,8 +45,7 @@
import org.sirix.api.ResourceManager;
import org.sirix.cache.BufferManager;
import org.sirix.cache.IndexLogKey;
import org.sirix.cache.IndirectPageLogKey;
import org.sirix.cache.RecordPageContainer;
import org.sirix.cache.PageContainer;
import org.sirix.exception.SirixException;
import org.sirix.exception.SirixIOException;
import org.sirix.io.Reader;
Expand All @@ -57,6 +56,7 @@
import org.sirix.page.IndirectPage;
import org.sirix.page.NamePage;
import org.sirix.page.PageKind;
import org.sirix.page.PagePersistenter;
import org.sirix.page.PageReference;
import org.sirix.page.PathPage;
import org.sirix.page.PathSummaryPage;
Expand All @@ -73,6 +73,9 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.io.ByteArrayDataInput;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.UncheckedExecutionException;

/**
Expand All @@ -96,7 +99,7 @@ final class PageReadTrxImpl implements PageReadTrx {
private final RevisionRootPage mRootPage;

/** Internal reference to node cache. */
private final LoadingCache<IndexLogKey, RecordPageContainer<UnorderedKeyValuePage>> mNodeCache;
private final LoadingCache<IndexLogKey, PageContainer> mNodeCache;

/** Internal reference to page cache. */
private final LoadingCache<PageReference, Page> mPageCache;
Expand All @@ -110,18 +113,6 @@ final class PageReadTrxImpl implements PageReadTrx {
/** Determines if page reading transaction is closed or not. */
private boolean mClosed;

// /**
// * Optional page transaction log, dependent on the fact, if the log hasn't been completely
// * transferred into the data file.
// */
// private final Optional<TransactionLogPageCache> mPageLog;
//
// /**
// * Optional node transaction log, dependent on the fact, if the log hasn't been completely
// * transferred into the data file.
// */
// private final Optional<TransactionIndexLogCache<UnorderedKeyValuePage>> mNodeLog;

/** {@link ResourceConfiguration} instance. */
final ResourceConfiguration mResourceConfig;

Expand Down Expand Up @@ -183,10 +174,10 @@ final class PageReadTrxImpl implements PageReadTrx {

mNodeCache = CacheBuilder.newBuilder().maximumSize(10_000)
.expireAfterWrite(5_000, TimeUnit.SECONDS).expireAfterAccess(5_000, TimeUnit.SECONDS)
.build(new CacheLoader<IndexLogKey, RecordPageContainer<UnorderedKeyValuePage>>() {
.build(new CacheLoader<IndexLogKey, PageContainer>() {
@Override
public RecordPageContainer<UnorderedKeyValuePage> load(final IndexLogKey key) {
return getRecordPageContainer(key.getRecordPageKey(), key.getIndex(),
public PageContainer load(final IndexLogKey key) {
return pageReadTrx.getRecordPageContainer(key.getRecordPageKey(), key.getIndex(),
key.getIndexType());
}
});
Expand Down Expand Up @@ -243,7 +234,7 @@ public Optional<Record> getRecord(final long nodeKey, final PageKind pageKind,

final long recordPageKey = pageKey(nodeKey);

final RecordPageContainer<UnorderedKeyValuePage> cont;
final PageContainer cont;

try {
switch (pageKind) {
Expand All @@ -261,11 +252,11 @@ public Optional<Record> getRecord(final long nodeKey, final PageKind pageKind,
throw new SirixIOException(e.getCause());
}

if (cont.equals(RecordPageContainer.EMPTY_INSTANCE)) {
if (cont.equals(PageContainer.EMPTY_INSTANCE)) {
return Optional.empty();
}

final Record retVal = cont.getComplete().getValue(nodeKey);
final Record retVal = ((UnorderedKeyValuePage) cont.getComplete()).getValue(nodeKey);
return checkItemIfDeleted(retVal);
}

Expand Down Expand Up @@ -328,10 +319,12 @@ final RevisionRootPage loadRevRoot(final @Nonnegative int revisionKey) throws Si
try {
RevisionRootPage page = null;
if (mPageWriteTrx.isPresent()) {
page = (RevisionRootPage) mPageWriteTrx.get().mPageLog.get(reference.getLogKey());
final PageContainer cont = mPageWriteTrx.get().mLog.get(reference.getLogKey());
page = cont == null ? null : (RevisionRootPage) cont.getComplete();
}
if (page == null) {
assert reference.getKey() != Constants.NULL_ID || reference.getLogKey() != null;
assert reference.getKey() != Constants.NULL_ID
|| reference.getLogKey() != Constants.NULL_ID;
page = (RevisionRootPage) mPageCache.get(reference);
}
return page;
Expand Down Expand Up @@ -377,11 +370,6 @@ private Page getPage(final PageReference reference, final PageKind pageKind)
try {
Page page = reference.getPage();

if (mPageWriteTrx.isPresent()) {
final IndirectPageLogKey logKey = new IndirectPageLogKey(pageKind, -1, -1, 0);
reference.setLogKey(logKey);
}

if (page == null) {
page = mPageCache.get(reference);
reference.setPage(page);
Expand All @@ -400,7 +388,7 @@ public final UberPage getUberPage() {
}

@Override
public <K extends Comparable<? super K>, V extends Record, S extends KeyValuePage<K, V>> RecordPageContainer<S> getRecordPageContainer(
public <K extends Comparable<? super K>, V extends Record, T extends KeyValuePage<K, V>> PageContainer getRecordPageContainer(
final @Nonnegative Long recordPageKey, final int index, final PageKind pageKind)
throws SirixIOException {
assertNotClosed();
Expand All @@ -410,31 +398,29 @@ public <K extends Comparable<? super K>, V extends Record, S extends KeyValuePag
getLeafPageReference(checkNotNull(recordPageKey), index, checkNotNull(pageKind));

if (!pageReferenceToRecordPage.isPresent()) {
return RecordPageContainer.emptyInstance();
return PageContainer.emptyInstance();
}

// Try to get from resource buffer manager.
@SuppressWarnings("unchecked")
final RecordPageContainer<S> recordPageContainerFromBuffer =
(RecordPageContainer<S>) mResourceBufferManager.getRecordPageCache()
.get(pageReferenceToRecordPage.get());
final PageContainer recordPageContainerFromBuffer =
mResourceBufferManager.getRecordPageCache().get(pageReferenceToRecordPage.get());

if (recordPageContainerFromBuffer != null) {
return recordPageContainerFromBuffer;
}

// Load list of page "fragments" from persistent storage.
final List<S> pages = getSnapshotPages(pageReferenceToRecordPage.get());
final List<T> pages = getSnapshotPages(pageReferenceToRecordPage.get());

if (pages.isEmpty()) {
return RecordPageContainer.emptyInstance();
return PageContainer.emptyInstance();
}

final int mileStoneRevision = mResourceConfig.mRevisionsToRestore;
final Versioning revisioning = mResourceConfig.mRevisionKind;
final S completePage = revisioning.combineRecordPages(pages, mileStoneRevision, this);
final Page completePage = revisioning.combineRecordPages(pages, mileStoneRevision, this);

final RecordPageContainer<S> recordPageContainer = new RecordPageContainer<S>(completePage);
final PageContainer recordPageContainer = new PageContainer(completePage, clone(completePage));

// recordPageContainerFromBufferesource buffer manager.
if (!mPageWriteTrx.isPresent())
Expand All @@ -444,6 +430,18 @@ public <K extends Comparable<? super K>, V extends Record, S extends KeyValuePag
return recordPageContainer;
}

@SuppressWarnings("unchecked")
<E extends Page> E clone(final E toClone) throws SirixIOException {
try {
final ByteArrayDataOutput output = ByteStreams.newDataOutput();
PagePersistenter.serializePage(output, toClone);
final ByteArrayDataInput input = ByteStreams.newDataInput(output.toByteArray());
return (E) PagePersistenter.deserializePage(input, this);
} catch (final IOException e) {
throw new SirixIOException(e);
}
}

final Optional<PageReference> getLeafPageReference(final @Nonnegative long recordPageKey,
final int index, final PageKind pageKind) {
final PageReference tmpRef = getPageReference(mRootPage, pageKind, index);
Expand All @@ -462,14 +460,14 @@ final Optional<PageReference> getLeafPageReference(final @Nonnegative long recor
*
* @throws SirixIOException if an I/O-error occurs within the creation process
*/
final <K extends Comparable<? super K>, V extends Record, S extends KeyValuePage<K, V>> List<S> getSnapshotPages(
final <K extends Comparable<? super K>, V extends Record, T extends KeyValuePage<K, V>> List<T> getSnapshotPages(
final PageReference pageReference) {
assert pageReference != null;
final ResourceConfiguration config = mResourceManager.getResourceConfig();
final int revsToRestore = config.mRevisionsToRestore;
final int[] revisionsToRead =
config.mRevisionKind.getRevisionRoots(mRootPage.getRevision(), revsToRestore);
final List<S> pages = new ArrayList<>(revisionsToRead.length);
final List<T> pages = new ArrayList<>(revisionsToRead.length);
boolean first = true;
for (int i = 0; i < revisionsToRead.length; i++) {
Optional<PageReference> refToRecordPage = null;
Expand All @@ -484,7 +482,7 @@ final <K extends Comparable<? super K>, V extends Record, S extends KeyValuePage
final PageReference reference = refToRecordPage.get();
if (reference.getKey() != Constants.NULL_ID) {
@SuppressWarnings("unchecked")
final S page = (S) mPageReader.read(reference.getKey(), this);
final T page = (T) mPageReader.read(reference.getKey(), this);
pages.add(page);
if (page.size() == Constants.NDP_NODE_COUNT) {
// Page is full, thus we can skip reconstructing pages with elder
Expand All @@ -510,6 +508,7 @@ PageReference getPageReference(final RevisionRootPage revisionRoot, final PageKi
final int index) throws SirixIOException {
assert revisionRoot != null;
PageReference ref = null;

switch (pageKind) {
case RECORDPAGE:
ref = revisionRoot.getIndirectPageReference();
Expand All @@ -530,6 +529,7 @@ PageReference getPageReference(final RevisionRootPage revisionRoot, final PageKi
throw new IllegalStateException(
"Only defined for node, path summary, text value and attribute value pages!");
}

return ref;
}

Expand All @@ -547,15 +547,20 @@ final IndirectPage dereferenceIndirectPage(final PageReference reference) {
IndirectPage page = null;

if (mPageWriteTrx.isPresent()) {
page = (IndirectPage) mPageWriteTrx.get().mPageLog.get(reference.getLogKey());
// Try to get it from the transaction log if it's present.
final PageContainer cont = mPageWriteTrx.get().mLog.get(reference.getLogKey());
page = cont == null ? null : (IndirectPage) cont.getComplete();
}

if (page == null) {
// Then try to get the in-memory reference.
page = (IndirectPage) reference.getPage();
}

if (page == null
&& (reference.getKey() != Constants.NULL_ID || reference.getLogKey() != null)) {
if (page == null && (reference.getKey() != Constants.NULL_ID
|| reference.getLogKey() != Constants.NULL_ID)) {
// Then try to get it from the page cache which might read it from the persistent storage
// on a cache miss.
page = (IndirectPage) mPageCache.get(reference);
}

Expand Down Expand Up @@ -586,39 +591,35 @@ public final PageReference getPageReferenceForPage(final PageReference startRefe
checkArgument(key >= 0, "key must be >= 0!");
checkNotNull(pageKind);
int offset = 0;
int parentOffset = 0;
long levelKey = key;
final int[] inpLevelPageCountExp = mUberPage.getPageCountExp(pageKind);

// Iterate through all levels.
for (int level = 0, height = inpLevelPageCountExp.length; level < height; level++) {
offset = (int) (levelKey >> inpLevelPageCountExp[level]);
levelKey -= offset << inpLevelPageCountExp[level];
if (reference.getLogKey() == null) {
// && (mPageWriteTrx.isPresent() || mPageLog.isPresent())) {
reference.setLogKey(new IndirectPageLogKey(pageKind, index, level,
parentOffset * Constants.INP_REFERENCE_COUNT + offset));
}
final Page derefPage = dereferenceIndirectPage(reference);
if (derefPage == null) {
reference = null;
break;
} else {
try {
if (reference.getLogKey() == Constants.NULL_ID && mPageWriteTrx.isPresent()) {
reference.setLogKey(
mPageWriteTrx.get().appendLogRecord(new PageContainer(derefPage, derefPage)));
}
reference = derefPage.getReference(offset);
} catch (final IndexOutOfBoundsException e) {
throw new SirixIOException("Node key isn't supported, it's too big!");
}
}
parentOffset = offset;
}

// Return reference to leaf of indirect tree.
if (reference != null && reference.getLogKey() == null) {
// && (mPageWriteTrx.isPresent() || mPageLog.isPresent())) {
reference.setLogKey(new IndirectPageLogKey(pageKind, index, inpLevelPageCountExp.length,
parentOffset * Constants.INP_REFERENCE_COUNT + offset));
}
// if (reference != null && reference.getLogKey() == Constants.NULL_ID) {
// mPageWriteTrx.ifPresent(pageWriteTrx -> reference.setLogKey(pageWriteTrx.appendLogRecord(new
// PageContainer(derefPage, derefPage))));
// }
return reference;
}

Expand Down

0 comments on commit 4dd7361

Please sign in to comment.