Skip to content

Commit

Permalink
Issue #4144 is implemented. Only one test should be fixed.
Browse files Browse the repository at this point in the history
  • Loading branch information
laa committed Jun 3, 2015
1 parent 52dc06c commit af05433
Showing 1 changed file with 94 additions and 52 deletions.
Expand Up @@ -56,6 +56,9 @@
*/
public class OWOWCache implements OWriteCache, OCachePointer.WritersListener {
// we add 8 bytes before and after cache pages to prevent word tearing in mt case.

private final int MAX_PAGES_PER_FLUSH;

public static final int PAGE_PADDING = 8;

public static final String NAME_ID_MAP_EXTENSION = ".cm";
Expand All @@ -77,6 +80,7 @@ public class OWOWCache implements OWriteCache, OCachePointer.WritersListener {
private final String storagePath;

private final ConcurrentSkipListMap<PagedKey, PageGroup> cachePages = new ConcurrentSkipListMap<PagedKey, PageGroup>();

private final ConcurrentSkipListSet<PagedKey> writePages = new ConcurrentSkipListSet<PagedKey>();

private final OBinarySerializer<String> stringSerializer;
Expand All @@ -101,7 +105,10 @@ public class OWOWCache implements OWriteCache, OCachePointer.WritersListener {
private final int cacheMaxSize;

private int fileCounter = 0;

private PagedKey lastPageKey = new PagedKey(0, -1);
private PagedKey lastWritePageKey = new PagedKey(0, -1);

private File nameIdMapHolderFile;

private final ODistributedCounter allocatedSpace = new ODistributedCounter();
Expand Down Expand Up @@ -140,6 +147,8 @@ public OWOWCache(boolean syncOnPageFlush, int pageSize, long groupTTL, OWriteAhe
commitExecutor = Executors.newSingleThreadScheduledExecutor(new FlushThreadFactory(storageLocal.getName()));
lowSpaceEventsPublisher = Executors.newCachedThreadPool(new LowSpaceEventsPublisherFactory(storageLocal.getName()));

MAX_PAGES_PER_FLUSH = (int) (4000 / (1000.0 / pageFlushInterval));

if (pageFlushInterval > 0)
commitExecutor.scheduleWithFixedDelay(new PeriodicFlushTask(), pageFlushInterval, pageFlushInterval, TimeUnit.MILLISECONDS);

Expand Down Expand Up @@ -470,7 +479,9 @@ public Future store(final long fileId, final long pageIndex, final OCachePointer
PageGroup pageGroup = cachePages.get(pagedKey);
if (pageGroup == null) {
pageGroup = new PageGroup(System.currentTimeMillis(), dataPointer);

cachePages.put(pagedKey, pageGroup);

cacheSize.increment();

dataPointer.setWritersListener(this);
Expand Down Expand Up @@ -502,7 +513,7 @@ public OCachePointer load(long fileId, long pageIndex, boolean addNewPages) thro
final PagedKey pagedKey = new PagedKey(intId, pageIndex);
Lock groupLock = lockManager.acquireSharedLock(pagedKey);
try {
final PageGroup pageGroup = cachePages.get(pagedKey);
PageGroup pageGroup = cachePages.get(pagedKey);

OCachePointer pagePointer;
if (pageGroup == null) {
Expand Down Expand Up @@ -1218,56 +1229,62 @@ public void run() {
boolean forceFlush = false;

double writeCacheThreshold = ((double) wcs) / writeCacheMaxSize;
if (writeCacheThreshold > 0.3) {
if (writeCacheThreshold > 0.7) {
writeCacheThreshold = 0.7;
forceFlush = true;
}

writePagesToFlush = (int) Math.floor(writeCacheThreshold * wcs);
if (writeCacheThreshold > 0.3) {
writePagesToFlush = (int) Math.floor(((writeCacheThreshold - 0.3) / 0.4) * MAX_PAGES_PER_FLUSH);
iterateByWritePagesFirst = true;

if (writeCacheThreshold > 0.7)
forceFlush = true;
}

double cacheThreshold = ((double) cs) / cacheMaxSize;
if (cacheThreshold > 0.3) {
if (cacheThreshold > 0.7) {
cacheThreshold = 0.7;
forceFlush = true;
}
final int pagesToFlush = (int) Math.floor(((cacheThreshold - 0.3) / 0.4) * MAX_PAGES_PER_FLUSH);

final int pagesToFlush = (int) Math.floor((cacheThreshold - 0.7) * cs);
writePagesToFlush = Math.max(pagesToFlush, writePagesToFlush);
if (cacheThreshold > 0.7)
forceFlush = true;
}

if (writePagesToFlush < 4)
writePagesToFlush = 4;
writePagesToFlush = Math.max(4, Math.min(MAX_PAGES_PER_FLUSH, writePagesToFlush));

int flushedPages = 0;

flushedPages = flushRing(writePagesToFlush, flushedPages, false, iterateByWritePagesFirst);
if (flushedPages < writePagesToFlush) {
flushedPages = flushRing(writePagesToFlush, flushedPages, false, iterateByWritePagesFirst);
}

if (flushedPages < writePagesToFlush && iterateByWritePagesFirst)
if (flushedPages < writePagesToFlush && iterateByWritePagesFirst) {
flushedPages = flushRing(writePagesToFlush, flushedPages, false, false);
}

if (flushedPages < writePagesToFlush && forceFlush) {
flushedPages = flushRing(writePagesToFlush, flushedPages, true, iterateByWritePagesFirst);

if (flushedPages < writePagesToFlush && iterateByWritePagesFirst)
flushedPages = flushRing(writePagesToFlush, flushedPages, true, false);
}

if (flushedPages < writePagesToFlush && iterateByWritePagesFirst) {
flushRing(writePagesToFlush, flushedPages, true, false);

if (flushedPages < writePagesToFlush) {
flushRing(writePagesToFlush, flushedPages, true, false);
}
}
}
} catch (Exception e) {
OLogManager.instance().error(this, "Exception during data flush.", e);
}
}

private int flushRing(int writePagesToFlush, int flushedPages, boolean forceFlush, boolean iterateByWritePagesFirst)
throws IOException {

NavigableMap<PagedKey, PageGroup> subMap = null;
NavigableSet<PagedKey> writePagesSubset = null;

if (iterateByWritePagesFirst) {
writePagesSubset = writePages.tailSet(lastPageKey, false);
writePagesSubset = writePages.tailSet(lastWritePageKey, false);
} else {
subMap = cachePages.tailMap(lastPageKey, false);
}
Expand Down Expand Up @@ -1300,11 +1317,21 @@ private int iterateByWritePagesSubRing(NavigableSet<PagedKey> subSet, int writeP
int flushedRegions = 0;

long lastPageIndex = -1;
while (entriesIterator.hasNext() && (flushedWritePages < writePagesToFlush || flushedRegions < 2)) {
while (entriesIterator.hasNext()) {
PagedKey entry = entriesIterator.next();
if (lastPageIndex >= 0) {
if (entry.pageIndex != lastPageIndex + 1) {
flushedRegions++;
}
}

if (flushedWritePages > writePagesToFlush && flushedRegions >= 4)
break;

Lock groupLock = lockManager.acquireExclusiveLock(entry);
try {
final PageGroup group = cachePages.get(entry);

PageGroup group = cachePages.get(entry);
if (group == null) {
entriesIterator.remove();
continue;
Expand Down Expand Up @@ -1340,12 +1367,9 @@ private int iterateByWritePagesSubRing(NavigableSet<PagedKey> subSet, int writeP
lockManager.releaseLock(groupLock);
}

lastPageKey = entry;
lastWritePageKey = entry;
flushedWritePages++;

if (lastPageIndex >= 0 && lastPageIndex + 1 != entry.pageIndex)
flushedRegions++;

lastPageIndex = entry.pageIndex;

cacheSize.decrement();
Expand All @@ -1362,11 +1386,21 @@ private int iterateByCacheSubRing(NavigableMap<PagedKey, PageGroup> subMap, int
int flushedRegions = 0;

long lastPageIndex = -1;
while (entriesIterator.hasNext() && (flushedWritePages < writePagesToFlush || flushedRegions < 2)) {
while (entriesIterator.hasNext()) {
Map.Entry<PagedKey, PageGroup> entry = entriesIterator.next();

final PageGroup group = entry.getValue();
final PagedKey pagedKey = entry.getKey();

if (lastPageIndex >= 0) {
if (pagedKey.pageIndex != lastPageIndex + 1) {
flushedRegions++;

if (flushedWritePages > writePagesToFlush && flushedRegions >= 4)
break;
}
}

final boolean weakLockMode = group.creationTime - currentTime < groupTTL && !forceFlush;
if (group.recencyBit && weakLockMode) {
group.recencyBit = false;
Expand Down Expand Up @@ -1398,18 +1432,14 @@ private int iterateByCacheSubRing(NavigableMap<PagedKey, PageGroup> subMap, int
pagePointer.setWritersListener(null);

entriesIterator.remove();
writePages.remove(entry.getKey());
}
} finally {
lockManager.releaseLock(groupLock);
}

lastPageKey = pagedKey;
flushedWritePages++;

if (lastPageIndex >= 0 && lastPageIndex + 1 != pagedKey.pageIndex)
flushedRegions++;

flushedWritePages++;
lastPageIndex = pagedKey.pageIndex;

cacheSize.decrement();
Expand All @@ -1427,20 +1457,7 @@ private PeriodicalFuzzyCheckpointTask() {
public void run() {
OLogSequenceNumber minLsn = writeAheadLog.getFlushedLSN();

for (Map.Entry<PagedKey, PageGroup> entry : cachePages.entrySet()) {
Lock groupLock = lockManager.acquireExclusiveLock(entry.getKey());
try {
PageGroup group = entry.getValue();
final OCachePointer pagePointer = group.page;
if (pagePointer.getLastFlushedLsn() != null) {
if (minLsn.compareTo(pagePointer.getLastFlushedLsn()) > 0) {
minLsn = pagePointer.getLastFlushedLsn();
}
}
} finally {
lockManager.releaseLock(groupLock);
}
}
minLsn = findMinLsn(minLsn, cachePages);

OLogManager.instance().debug(this, "Start fuzzy checkpoint flushed LSN is %s", minLsn);
try {
Expand All @@ -1459,6 +1476,24 @@ public void run() {

OLogManager.instance().debug(this, "End fuzzy checkpoint");
}

private OLogSequenceNumber findMinLsn(OLogSequenceNumber minLsn, ConcurrentSkipListMap<PagedKey, PageGroup> ring) {
for (Map.Entry<PagedKey, PageGroup> entry : ring.entrySet()) {
Lock groupLock = lockManager.acquireExclusiveLock(entry.getKey());
try {
PageGroup group = entry.getValue();
final OCachePointer pagePointer = group.page;
if (pagePointer.getLastFlushedLsn() != null) {
if (minLsn.compareTo(pagePointer.getLastFlushedLsn()) > 0) {
minLsn = pagePointer.getLastFlushedLsn();
}
}
} finally {
lockManager.releaseLock(groupLock);
}
}
return minLsn;
}
}

private final class FileFlushTask implements Callable<Void> {
Expand All @@ -1473,7 +1508,13 @@ public Void call() throws Exception {
final PagedKey firstKey = new PagedKey(fileId, 0);
final PagedKey lastKey = new PagedKey(fileId, Long.MAX_VALUE);

NavigableMap<PagedKey, PageGroup> subMap = cachePages.subMap(firstKey, true, lastKey, true);
flushRing(cachePages.subMap(firstKey, true, lastKey, true));

files.get(fileId).synch();
return null;
}

private void flushRing(NavigableMap<PagedKey, PageGroup> subMap) throws IOException {
Iterator<Map.Entry<PagedKey, PageGroup>> entryIterator = subMap.entrySet().iterator();

while (entryIterator.hasNext()) {
Expand All @@ -1500,13 +1541,11 @@ public Void call() throws Exception {

cacheSize.decrement();
entryIterator.remove();

} finally {
lockManager.releaseLock(groupLock);
}
}

files.get(fileId).synch();
return null;
}
}

Expand All @@ -1522,7 +1561,12 @@ public Void call() throws Exception {
final PagedKey firstKey = new PagedKey(fileId, 0);
final PagedKey lastKey = new PagedKey(fileId, Long.MAX_VALUE);

NavigableMap<PagedKey, PageGroup> subMap = cachePages.subMap(firstKey, true, lastKey, true);
removeFromRing(cachePages.subMap(firstKey, true, lastKey, true));

return null;
}

private void removeFromRing(NavigableMap<PagedKey, PageGroup> subMap) {
Iterator<Map.Entry<PagedKey, PageGroup>> entryIterator = subMap.entrySet().iterator();

while (entryIterator.hasNext()) {
Expand All @@ -1547,8 +1591,6 @@ public Void call() throws Exception {
lockManager.releaseLock(groupLock);
}
}

return null;
}
}

Expand Down

0 comments on commit af05433

Please sign in to comment.