Skip to content

Commit

Permalink
log segment implementation cleanup and flush lock refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
tglman committed Feb 3, 2016
1 parent af51ade commit d5a9a95
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 139 deletions.
Expand Up @@ -843,4 +843,5 @@ public void checkFreeSpace(){
public int getCommitDelay() { public int getCommitDelay() {
return commitDelay; return commitDelay;
} }

} }
Expand Up @@ -11,8 +11,6 @@
import com.orientechnologies.orient.core.config.OGlobalConfiguration; import com.orientechnologies.orient.core.config.OGlobalConfiguration;
import com.orientechnologies.orient.core.exception.OStorageException; import com.orientechnologies.orient.core.exception.OStorageException;
import com.orientechnologies.orient.core.storage.OStorageAbstract; import com.orientechnologies.orient.core.storage.OStorageAbstract;
import com.orientechnologies.orient.core.storage.impl.local.OLowDiskSpaceInformation;
import com.orientechnologies.orient.core.storage.impl.local.OLowDiskSpaceListener;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;


import java.io.File; import java.io.File;
Expand All @@ -21,11 +19,11 @@
import java.lang.ref.WeakReference; import java.lang.ref.WeakReference;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.zip.CRC32; import java.util.zip.CRC32;
Expand All @@ -40,9 +38,9 @@ final class OLogSegment implements Comparable<OLogSegment> {
private final File file; private final File file;
private final long order; private final long order;
private final int maxPagesCacheSize; private final int maxPagesCacheSize;
private final AtomicReference<List<OLogRecord>> logCache = new AtomicReference<List<OLogRecord>>(new ArrayList<OLogRecord>()); protected final Lock cacheLock = new ReentrantLock();
private final ConcurrentLinkedQueue<OWALPage> pagesCache = new ConcurrentLinkedQueue<OWALPage>(); private List<OLogRecord> logCache = new ArrayList<OLogRecord>();
private final ScheduledExecutorService commitExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { private final ScheduledExecutorService commitExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override @Override
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
final Thread thread = new Thread(OStorageAbstract.storageThreadGroup, r); final Thread thread = new Thread(OStorageAbstract.storageThreadGroup, r);
Expand All @@ -53,8 +51,6 @@ public Thread newThread(Runnable r) {
}); });
private volatile long filledUpTo; private volatile long filledUpTo;
private boolean closed; private boolean closed;
private OWALPage currentPage;
private long nextPositionToFlush;
private OLogSequenceNumber last = null; private OLogSequenceNumber last = null;
private OLogSequenceNumber pendingLSNToFlush; private OLogSequenceNumber pendingLSNToFlush;


Expand All @@ -77,14 +73,21 @@ public void run() {
} }


private void commitLog() throws IOException { private void commitLog() throws IOException {
if (logCache.get().isEmpty())
return;


if (!flushNewData) if (!flushNewData)
return; return;
flushNewData = false; flushNewData = false;
List<OLogRecord> toFlush;
try {
cacheLock.lock();
if (logCache.isEmpty())
return;


List<OLogRecord> toFlush = logCache.getAndSet(new ArrayList<OLogRecord>()); toFlush = logCache;
logCache = new ArrayList<OLogRecord>();
}finally {
cacheLock.unlock();
}
if (toFlush.isEmpty()) if (toFlush.isEmpty())
return; return;
byte[] pageContent = new byte[OWALPage.PAGE_SIZE]; byte[] pageContent = new byte[OWALPage.PAGE_SIZE];
Expand All @@ -101,36 +104,25 @@ private void commitLog() throws IOException {


OLogSequenceNumber lsn = null; OLogSequenceNumber lsn = null;
int pageIndex = 0; int pageIndex = 0;
int flushedPage = curIndex;
int pos =0; int pos =0;
boolean lastToFlush=false; boolean lastToFlush=false;
for (OLogRecord log : toFlush) { for (OLogRecord log : toFlush) {
lsn = new OLogSequenceNumber(order, log.writeFrom); lsn = new OLogSequenceNumber(order, log.writeFrom);
pos = (int) (log.writeFrom % OWALPage.PAGE_SIZE); pos = (int) (log.writeFrom % OWALPage.PAGE_SIZE);
pageIndex = (int) (log.writeFrom / OWALPage.PAGE_SIZE); pageIndex = (int) (log.writeFrom / OWALPage.PAGE_SIZE);
int written = 0; int written = 0;
boolean mergeWithNextPage = true;
boolean recordTail = false;
while (written < log.record.length) { while (written < log.record.length) {
lastToFlush =true; lastToFlush =true;
int pageFreeSpace = OWALPage.calculateRecordSize(OWALPage.PAGE_SIZE - pos); int pageFreeSpace = OWALPage.calculateRecordSize(OWALPage.PAGE_SIZE - pos);

int contentLength = Math.min(pageFreeSpace, (log.record.length - written)); int contentLength = Math.min(pageFreeSpace, (log.record.length - written));
int fromRecord = written; int fromRecord = written;
written += contentLength; written += contentLength;
if (written == log.record.length) {
mergeWithNextPage = false; pos = writeContentInPage(pageContent, pos, log.record, written == log.record.length, fromRecord, contentLength);
recordTail = true;
}
OByteSerializer.INSTANCE.serializeNative(mergeWithNextPage ? (byte) 1 : 0, pageContent, pos);
OByteSerializer.INSTANCE.serializeNative(recordTail ? (byte) 1 : 0, pageContent, pos + 1);
OIntegerSerializer.INSTANCE.serializeNative(contentLength, pageContent, pos + 2);
System.arraycopy(log.record, fromRecord, pageContent, pos + OIntegerSerializer.INT_SIZE + 2, contentLength);
pos += OWALPage.calculateSerializedSize(contentLength);
OIntegerSerializer.INSTANCE.serializeNative(OWALPage.PAGE_SIZE - pos, pageContent, OWALPage.FREE_SPACE_OFFSET);
if (OWALPage.PAGE_SIZE - pos < OWALPage.MIN_RECORD_SIZE) { if (OWALPage.PAGE_SIZE - pos < OWALPage.MIN_RECORD_SIZE) {
synchronized (rndFile) { synchronized (rndFile) {
OLongSerializer.INSTANCE.serializeNative(OWALPage.MAGIC_NUMBER, pageContent, OWALPage.MAGIC_NUMBER_OFFSET);
rndFile.seek(pageIndex * OWALPage.PAGE_SIZE); rndFile.seek(pageIndex * OWALPage.PAGE_SIZE);
flushPage(pageContent); flushPage(pageContent);
} }
Expand All @@ -147,7 +139,6 @@ private void commitLog() throws IOException {
} }
if(lastToFlush) { if(lastToFlush) {
synchronized (rndFile) { synchronized (rndFile) {
OLongSerializer.INSTANCE.serializeNative(OWALPage.MAGIC_NUMBER, pageContent, OWALPage.MAGIC_NUMBER_OFFSET);
rndFile.seek(pageIndex * OWALPage.PAGE_SIZE); rndFile.seek(pageIndex * OWALPage.PAGE_SIZE);
flushPage(pageContent); flushPage(pageContent);
} }
Expand All @@ -156,103 +147,29 @@ private void commitLog() throws IOException {


} }


private void commit() throws IOException { /**
if (pagesCache.isEmpty()) * Write the content in the page and return the new page cursor position.
return; *

* @param pageContent buffer of the page to be filled
if (!flushNewData) * @param posInPage position in the page where to write
return; * @param log content to write to the page

* @param isLast flag to mark if is last portion of the record
flushNewData = false; * @param fromRecord the start of the portion of the record to write in this page

* @param contentLength the length of the portion of the record to write in this page
final int maxSize = pagesCache.size(); * @return the new page cursor position after this write.

*/
ByteBuffer[] pagesToFlush = new ByteBuffer[maxSize]; private int writeContentInPage(byte[] pageContent, int posInPage, byte[] log, boolean isLast, int fromRecord,int contentLength) {

OByteSerializer.INSTANCE.serializeNative(!isLast ? (byte) 1 : 0, pageContent, posInPage);
long filePointer = nextPositionToFlush; OByteSerializer.INSTANCE.serializeNative(isLast ? (byte) 1 : 0, pageContent, posInPage + 1);

OIntegerSerializer.INSTANCE.serializeNative(contentLength, pageContent, posInPage + 2);
int flushedPages = 0; System.arraycopy(log, fromRecord, pageContent, posInPage + OIntegerSerializer.INT_SIZE + 2, contentLength);
OLogSequenceNumber lastLSNToFlush = null; posInPage += OWALPage.calculateSerializedSize(contentLength);

OIntegerSerializer.INSTANCE.serializeNative(OWALPage.PAGE_SIZE - posInPage, pageContent, OWALPage.FREE_SPACE_OFFSET);
Iterator<OWALPage> pageIterator = pagesCache.iterator(); return posInPage;
while (flushedPages < maxSize) {
final OWALPage page = pageIterator.next();
synchronized (page) {
final int filledUpTo = page.getFilledUpTo();
int pos = OWALPage.RECORDS_OFFSET;

while (pos < filledUpTo) {
if (!page.mergeWithNextPage(pos)) {
if (pos == OWALPage.RECORDS_OFFSET && pendingLSNToFlush != null) {
lastLSNToFlush = pendingLSNToFlush;

pendingLSNToFlush = null;
} else
lastLSNToFlush = new OLogSequenceNumber(order, filePointer + flushedPages * (long) OWALPage.PAGE_SIZE + pos);
} else if (pendingLSNToFlush == null)
pendingLSNToFlush = new OLogSequenceNumber(order, filePointer + flushedPages * (long) OWALPage.PAGE_SIZE + pos);

pos += page.getSerializedRecordSize(pos);
}

final ByteBuffer dataBuffer;

if (flushedPages == maxSize - 1) {
dataBuffer = byteBufferPool.acquireDirect(false);

final ByteBuffer pageBuffer = page.getByteBuffer();

pageBuffer.position(0);
dataBuffer.position(0);

dataBuffer.put(pageBuffer);
} else {
ByteBuffer buffer = page.getByteBuffer();
dataBuffer = buffer.duplicate();
dataBuffer.order(buffer.order());
}

pagesToFlush[flushedPages] = dataBuffer;
}

flushedPages++;
}

synchronized (rndFile) {
rndFile.seek(filePointer);
for (int i = 0; i < pagesToFlush.length; i++) {
final ByteBuffer dataBuffer = pagesToFlush[i];
byte[] pageContent = new byte[OWALPage.PAGE_SIZE];
dataBuffer.position(0);
dataBuffer.get(pageContent);

if (i == pagesToFlush.length - 1)
byteBufferPool.release(dataBuffer);

flushPage(pageContent);
filePointer += OWALPage.PAGE_SIZE;
}

if (OGlobalConfiguration.WAL_SYNC_ON_PAGE_FLUSH.getValueAsBoolean())
rndFile.getFD().sync();
}

nextPositionToFlush = filePointer - OWALPage.PAGE_SIZE;

if (lastLSNToFlush != null)
writeAheadLog.setFlushedLsn(lastLSNToFlush);

for (int i = 0; i < flushedPages - 1; i++) {
OWALPage page = pagesCache.poll();
byteBufferPool.release(page.getByteBuffer());
}

assert !pagesCache.isEmpty();

writeAheadLog.checkFreeSpace();
} }


private void flushPage(byte[] content) throws IOException { private void flushPage(byte[] content) throws IOException {
OLongSerializer.INSTANCE.serializeNative(OWALPage.MAGIC_NUMBER, content, OWALPage.MAGIC_NUMBER_OFFSET);
CRC32 crc32 = new CRC32(); CRC32 crc32 = new CRC32();
crc32.update(content, OIntegerSerializer.INT_SIZE, OWALPage.PAGE_SIZE - OIntegerSerializer.INT_SIZE); crc32.update(content, OIntegerSerializer.INT_SIZE, OWALPage.PAGE_SIZE - OIntegerSerializer.INT_SIZE);
OIntegerSerializer.INSTANCE.serializeNative((int) crc32.getValue(), content, 0); OIntegerSerializer.INSTANCE.serializeNative((int) crc32.getValue(), content, 0);
Expand Down Expand Up @@ -339,7 +256,7 @@ public long filledUpTo() throws IOException {
} }


public OLogSequenceNumber begin() throws IOException { public OLogSequenceNumber begin() throws IOException {
if (!logCache.get().isEmpty()) if (!logCache.isEmpty())
return new OLogSequenceNumber(order, OWALPage.RECORDS_OFFSET); return new OLogSequenceNumber(order, OWALPage.RECORDS_OFFSET);


if (rndFile.length() > 0) if (rndFile.length() > 0)
Expand Down Expand Up @@ -428,12 +345,18 @@ public OLogSequenceNumber logRecord(byte[] record) throws IOException {
OLogRecord rec = generateLogRecord(filledUpTo, record); OLogRecord rec = generateLogRecord(filledUpTo, record);
filledUpTo = rec.writeTo; filledUpTo = rec.writeTo;
last = new OLogSequenceNumber(order, rec.writeFrom); last = new OLogSequenceNumber(order, rec.writeFrom);
logCache.get().add(rec); try {
cacheLock.lock();
logCache.add(rec);
}finally {
cacheLock.unlock();
}
long flushedPos = 0; long flushedPos = 0;
if (writeAheadLog.getFlushedLsn() != null) if (writeAheadLog.getFlushedLsn() != null)
flushedPos = writeAheadLog.getFlushedLsn().getPosition(); flushedPos = writeAheadLog.getFlushedLsn().getPosition();
if ((filledUpTo - flushedPos) / OWALPage.PAGE_SIZE > maxPagesCacheSize) { long pagesInCache = (filledUpTo - flushedPos) / OWALPage.PAGE_SIZE;
OLogManager.instance().info(this, "Max cache limit is reached (%d vs. %d), sync flush is performed", maxPagesCacheSize, pagesCache.size()); if (pagesInCache > maxPagesCacheSize) {
OLogManager.instance().info(this, "Max cache limit is reached (%d vs. %d), sync flush is performed", maxPagesCacheSize, pagesInCache);
flush(); flush();
} }
return last; return last;
Expand All @@ -449,7 +372,7 @@ public byte[] readRecord(OLogSequenceNumber lsn) throws IOException {
if (lsn.getPosition() >= filledUpTo) if (lsn.getPosition() >= filledUpTo)
return null; return null;


if (!logCache.get().isEmpty()) if (!logCache.isEmpty())
flush(); flush();


long pageIndex = lsn.getPosition() / OWALPage.PAGE_SIZE; long pageIndex = lsn.getPosition() / OWALPage.PAGE_SIZE;
Expand Down Expand Up @@ -551,7 +474,6 @@ public void close(boolean flush) throws IOException {


closed = true; closed = true;


currentPage = null;
} }
} }


Expand Down Expand Up @@ -589,19 +511,12 @@ private void initPageCache() throws IOException {
rndFile.readFully(content); rndFile.readFully(content);


if (checkPageIntegrity(content)) { if (checkPageIntegrity(content)) {
final ByteBuffer pageBuffer = byteBufferPool.acquireDirect(false); int freeSpace = OIntegerSerializer.INSTANCE.deserializeNative(content, OWALPage.FREE_SPACE_OFFSET);
pageBuffer.put(content); filledUpTo = (pagesCount - 1) * OWALPage.PAGE_SIZE + (OWALPage.PAGE_SIZE - freeSpace);
currentPage = new OWALPage(pageBuffer, false);
filledUpTo = (pagesCount - 1) * OWALPage.PAGE_SIZE + currentPage.getFilledUpTo();
nextPositionToFlush = (pagesCount - 1) * OWALPage.PAGE_SIZE;
} else { } else {
final ByteBuffer pageBuffer = byteBufferPool.acquireDirect(false); filledUpTo = pagesCount * OWALPage.PAGE_SIZE + OWALPage.RECORDS_OFFSET;
currentPage = new OWALPage(pageBuffer, true);
filledUpTo = pagesCount * OWALPage.PAGE_SIZE + currentPage.getFilledUpTo();
nextPositionToFlush = pagesCount * OWALPage.PAGE_SIZE;
} }


pagesCache.add(currentPage);
} }
} }


Expand Down Expand Up @@ -632,7 +547,7 @@ private boolean checkPageIntegrity(byte[] content) {
} }


private void selfCheck() throws IOException { private void selfCheck() throws IOException {
if (!logCache.get().isEmpty()) if (!logCache.isEmpty())
throw new IllegalStateException("WAL cache is not empty, we cannot verify WAL after it was started to be used"); throw new IllegalStateException("WAL cache is not empty, we cannot verify WAL after it was started to be used");


synchronized (rndFile) { synchronized (rndFile) {
Expand Down

0 comments on commit d5a9a95

Please sign in to comment.