Skip to content

Commit

Permalink
New fix for issue #7388 was implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
laa committed Jul 13, 2017
1 parent e9cc252 commit 87c7594
Showing 1 changed file with 56 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -712,12 +712,6 @@ public void loadCacheState(final OWriteCache writeCache) {

cacheLock.acquireWriteLock();
try {
//we need to restore cache state only when we start server or
//embedded storage for all other cases we do not change queues
//to prevent memory leaks
if (a1in.size() > 0 || a1out.size() > 0 || am.size() > 0)
return;

final Path statePath = writeCache.getRootDirectory().resolve(CACHE_STATE_FILE);

if (Files.exists(statePath)) {
Expand Down Expand Up @@ -808,6 +802,8 @@ private void restoreQueueWithoutPageLoad(OWriteCache writeCache, LRUList queue,

queue.putToMRU(cacheEntry);
pages.add(cacheEntry.getPageIndex());

removeColdPagesWithCacheLock();
} finally {
internalFileId = dataInputStream.readInt();
}
Expand All @@ -832,59 +828,67 @@ private void restoreQueueWithPageLoad(OWriteCache writeCache, LRUList queue, Dat
// used only for statistics, and there is passed merely as stub
final OModifiableBoolean cacheHit = new OModifiableBoolean();

// first step, we will create two tree maps to sort data by position in file and load them with maximum speed and
// first step, we will create set to sort data by position in file and load them with maximum speed and
// then to put data into the queue to restore position of entries in LRU list.
final TreeSet<PageKey> filePositions = new TreeSet<PageKey>();

final TreeMap<PageKey, OPair<Long, OCacheEntry>> filePositionMap = new TreeMap<>();
final TreeMap<Long, OCacheEntry> queuePositionMap = new TreeMap<>();

long position = 0;
int internalFileId = dataInputStream.readInt();
while (internalFileId >= 0) {
final long pageIndex = dataInputStream.readLong();
try {
final long fileId = writeCache.externalFileId(internalFileId);
final OCacheEntry entry = new OCacheEntryImpl(fileId, pageIndex, null, false);
filePositionMap.put(new PageKey(fileId, pageIndex), new OPair<>(position, entry));
queuePositionMap.put(position, entry);

position++;
//we replace only pages which are not loaded yet
if (queue.get(fileId, pageIndex) == null) {
filePositions.add(new PageKey(fileId, pageIndex));

//we put placeholder to the queue, later we will replace it with real data
//it is done to prevent cases when disk cache size will exceed limits
//set by configuration
final OCacheEntry cacheEntry = new OCacheEntryImpl(fileId, pageIndex, null, false);
queue.putToMRU(cacheEntry);

Set<Long> pages = filePages.get(fileId);
if (pages == null) {
pages = new HashSet<Long>();

Set<Long> op = filePages.putIfAbsent(fileId, pages);
if (op != null) {
pages = op;
}
}

pages.add(pageIndex);

//remove part of the queue if queue size is bigger than allowed
removeColdPagesWithCacheLock();
}
} finally {
internalFileId = dataInputStream.readInt();
}
}

// second step: load pages sorted by position in file
for (final Map.Entry<PageKey, OPair<Long, OCacheEntry>> entry : filePositionMap.entrySet()) {
final PageKey pageKey = entry.getKey();
final OPair<Long, OCacheEntry> pair = entry.getValue();
//second step: load pages sorted by position in a file and replace placeholders by real data
for (PageKey pageKey : filePositions) {
final OCacheEntry cacheEntry = queue.get(pageKey.fileId, pageKey.pageIndex);

final OCachePointer[] pointers = writeCache.load(pageKey.fileId, pageKey.pageIndex, 1, false, cacheHit, true);
//some queue items may be deleted because it reached size limit
if (cacheEntry != null && cacheEntry.getCachePointer() == null) {
final OCachePointer[] pointers = writeCache.load(pageKey.fileId, pageKey.pageIndex, 1, false, cacheHit, true);

if (pointers.length == 0) {
queuePositionMap.remove(pair.key);
continue;
}
if (pointers.length == 0) {
queue.remove(pageKey.fileId, pageKey.pageIndex);

final OCacheEntry cacheEntry = pair.value;
cacheEntry.setCachePointer(pointers[0]);
}

// third step: add pages according to their order in LRU queue
for (final OCacheEntry cacheEntry : queuePositionMap.values()) {
final long fileId = cacheEntry.getFileId();
Set<Long> pages = filePages.get(fileId);
if (pages == null) {
pages = new HashSet<>();
final Set<Long> pages = filePages.get(pageKey.fileId);

Set<Long> op = filePages.putIfAbsent(fileId, pages);
if (op != null) {
pages = op;
if (pages != null) {
pages.remove(pageKey.pageIndex);
}
continue;
}
}

queue.putToMRU(cacheEntry);
pages.add(cacheEntry.getPageIndex());
cacheEntry.setCachePointer(pointers[0]);
}
}
}

Expand Down Expand Up @@ -1258,9 +1262,12 @@ private void removeColdPagesWithCacheLock() {
assert !removedFromAInEntry.isDirty();

final OCachePointer cachePointer = removedFromAInEntry.getCachePointer();
cachePointer.decrementReadersReferrer();
removedFromAInEntry.clearCachePointer();

//cache pointer can be null if we load initial state of cache from disk
//see #restoreQueueWithPageLoad for details
if (cachePointer != null) {
cachePointer.decrementReadersReferrer();
removedFromAInEntry.clearCachePointer();
}
a1out.putToMRU(removedFromAInEntry);
}

Expand All @@ -1284,8 +1291,12 @@ private void removeColdPagesWithCacheLock() {
assert !removedEntry.isDirty();

final OCachePointer cachePointer = removedEntry.getCachePointer();
cachePointer.decrementReadersReferrer();
removedEntry.clearCachePointer();
//cache pointer can be null if we load initial state of cache from disk
//see #restoreQueueWithPageLoad for details
if (cachePointer != null) {
cachePointer.decrementReadersReferrer();
removedEntry.clearCachePointer();
}

Set<Long> pageEntries = filePages.get(removedEntry.getFileId());
pageEntries.remove(removedEntry.getPageIndex());
Expand Down

0 comments on commit 87c7594

Please sign in to comment.