Skip to content

Commit

Permalink
New fix for issue #7388 was implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
laa committed Jul 13, 2017
1 parent 8df1b10 commit ab604d2
Showing 1 changed file with 60 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -642,11 +642,8 @@ public void loadCacheState(final OWriteCache writeCache) {
return;
}

cacheLock.acquireReadLock();
cacheLock.acquireWriteLock();
try {
if (a1in.size() > 0 || a1out.size() > 0 || am.size() > 0)
return;

final File rootDirectory = writeCache.getRootDirectory();
final File stateFile = new File(rootDirectory, CACHE_STATE_FILE);
if (stateFile.exists()) {
Expand Down Expand Up @@ -679,12 +676,13 @@ public void loadCacheState(final OWriteCache writeCache) {
} finally {
cacheState.close();
}

}
} catch (Exception e) {
OLogManager.instance()
.warn(this, "Cannot restore state of cache for storage placed under %s", writeCache.getRootDirectory(), e);
} finally {
cacheLock.releaseReadLock();
cacheLock.releaseWriteLock();
}
}

Expand Down Expand Up @@ -748,6 +746,9 @@ private void restoreQueueWithoutPageLoad(OWriteCache writeCache, LRUList queue,

queue.putToMRU(cacheEntry);
pages.add(cacheEntry.getPageIndex());

//truncate tail of the queue if queue will exceed limit is set by configuration
removeColdPagesWithCacheLock();
} finally {
internalFileId = dataInputStream.readInt();
}
Expand All @@ -772,59 +773,67 @@ private void restoreQueueWithPageLoad(OWriteCache writeCache, LRUList queue, Dat
// used only for statistics, and there is passed merely as stub
final OModifiableBoolean cacheHit = new OModifiableBoolean();

// first step, we will create two tree maps to sort data by position in file and load them with maximum speed and
// first step, we will create set to sort data by position in file and load them with maximum speed and
// then to put data into the queue to restore position of entries in LRU list.
final TreeSet<PageKey> filePositions = new TreeSet<PageKey>();

final TreeMap<PageKey, OPair<Long, OCacheEntry>> filePositionMap = new TreeMap<PageKey, OPair<Long, OCacheEntry>>();
final TreeMap<Long, OCacheEntry> queuePositionMap = new TreeMap<Long, OCacheEntry>();

long position = 0;
int internalFileId = dataInputStream.readInt();
while (internalFileId >= 0) {
final long pageIndex = dataInputStream.readLong();
try {
final long fileId = writeCache.externalFileId(internalFileId);
final OCacheEntry entry = new OCacheEntry(fileId, pageIndex, null, false);
filePositionMap.put(new PageKey(fileId, pageIndex), new OPair<Long, OCacheEntry>(position, entry));
queuePositionMap.put(position, entry);

position++;
//we replace only pages which are not loaded yet
if (queue.get(fileId, pageIndex) == null) {
filePositions.add(new PageKey(fileId, pageIndex));

//we put placeholder to the queue, later we will replace it with real data
//it is done to prevent cases when disk cache size will exceed limits
//set by configuration
final OCacheEntry cacheEntry = new OCacheEntry(fileId, pageIndex, null, false);
queue.putToMRU(cacheEntry);

Set<Long> pages = filePages.get(fileId);
if (pages == null) {
pages = new HashSet<Long>();

Set<Long> op = filePages.putIfAbsent(fileId, pages);
if (op != null) {
pages = op;
}
}

pages.add(pageIndex);

//remove part of the queue if queue size is bigger than allowed
removeColdPagesWithCacheLock();
}
} finally {
internalFileId = dataInputStream.readInt();
}
}

// second step: load pages sorted by position in file
for (final Map.Entry<PageKey, OPair<Long, OCacheEntry>> entry : filePositionMap.entrySet()) {
final PageKey pageKey = entry.getKey();
final OPair<Long, OCacheEntry> pair = entry.getValue();
//second step: load pages sorted by position in a file and replace placeholders by real data
for (PageKey pageKey : filePositions) {
final OCacheEntry cacheEntry = queue.get(pageKey.fileId, pageKey.pageIndex);

final OCachePointer[] pointers = writeCache.load(pageKey.fileId, pageKey.pageIndex, 1, false, cacheHit, true);

if (pointers.length == 0) {
queuePositionMap.remove(pair.key);
continue;
}
//some queue items may be deleted because it reached size limit
if (cacheEntry != null && cacheEntry.getCachePointer() == null) {
final OCachePointer[] pointers = writeCache.load(pageKey.fileId, pageKey.pageIndex, 1, false, cacheHit, true);

final OCacheEntry cacheEntry = pair.value;
cacheEntry.setCachePointer(pointers[0]);
}
if (pointers.length == 0) {
queue.remove(pageKey.fileId, pageKey.pageIndex);

// third step: add pages according to their order in LRU queue
for (final OCacheEntry cacheEntry : queuePositionMap.values()) {
final long fileId = cacheEntry.getFileId();
Set<Long> pages = filePages.get(fileId);
if (pages == null) {
pages = new HashSet<Long>();
final Set<Long> pages = filePages.get(pageKey.fileId);

Set<Long> op = filePages.putIfAbsent(fileId, pages);
if (op != null) {
pages = op;
if (pages != null) {
pages.remove(pageKey.pageIndex);
}
continue;
}
}

queue.putToMRU(cacheEntry);
pages.add(cacheEntry.getPageIndex());
cacheEntry.setCachePointer(pointers[0]);
}
}
}

Expand Down Expand Up @@ -1208,8 +1217,12 @@ private void removeColdPagesWithCacheLock() {
assert !removedFromAInEntry.isDirty();

final OCachePointer cachePointer = removedFromAInEntry.getCachePointer();
cachePointer.decrementReadersReferrer();
removedFromAInEntry.clearCachePointer();
//cache pointer can be null if we load initial state of cache from disk
//see #restoreQueueWithPageLoad for details
if (cachePointer != null) {
cachePointer.decrementReadersReferrer();
removedFromAInEntry.clearCachePointer();
}

a1out.putToMRU(removedFromAInEntry);
}
Expand All @@ -1234,8 +1247,12 @@ private void removeColdPagesWithCacheLock() {
assert !removedEntry.isDirty();

final OCachePointer cachePointer = removedEntry.getCachePointer();
cachePointer.decrementReadersReferrer();
removedEntry.clearCachePointer();
//cache pointer can be null if we load initial state of cache from disk
//see #restoreQueueWithPageLoad for details
if (cachePointer != null) {
cachePointer.decrementReadersReferrer();
removedEntry.clearCachePointer();
}

Set<Long> pageEntries = filePages.get(removedEntry.getFileId());
pageEntries.remove(removedEntry.getPageIndex());
Expand Down

0 comments on commit ab604d2

Please sign in to comment.