Skip to content

Commit

Permalink
Issue #5343 direct memory pointer was replaced by byte buffer.
Browse files Browse the repository at this point in the history
  • Loading branch information
laa committed Jan 5, 2016
1 parent 3b95068 commit e3e0fbc
Show file tree
Hide file tree
Showing 21 changed files with 848 additions and 577 deletions.
Expand Up @@ -6,28 +6,37 @@
import java.util.concurrent.ConcurrentLinkedQueue;

public class OByteBufferPool {
private static final OByteBufferPool INSTANCE = new OByteBufferPool();
private static final int PAGE_SIZE = OGlobalConfiguration.DISK_CACHE_PAGE_SIZE.getValueAsInteger() * 1024;
private static final OByteBufferPool INSTANCE = new OByteBufferPool(
OGlobalConfiguration.DISK_CACHE_PAGE_SIZE.getValueAsInteger() * 1024);
private final int pageSize;
private final ByteBuffer zeroPage;

private final ConcurrentLinkedQueue<ByteBuffer> pool = new ConcurrentLinkedQueue<ByteBuffer>();

public static OByteBufferPool instance() {
return INSTANCE;
}

public OByteBufferPool(int pageSize) {
this.pageSize = pageSize;
this.zeroPage = ByteBuffer.allocateDirect(pageSize);
}

public ByteBuffer acquireDirect(boolean clear) {
final ByteBuffer buffer = pool.poll();

if (buffer != null) {
buffer.position(0);

if (clear) {
buffer.put(new byte[PAGE_SIZE]);
buffer.put(zeroPage.duplicate());
buffer.position(0);
}

return buffer;
}

return ByteBuffer.allocateDirect(PAGE_SIZE);
return ByteBuffer.allocateDirect(pageSize);
}

public void release(ByteBuffer buffer) {
Expand Down
Expand Up @@ -19,10 +19,12 @@
*/
package com.orientechnologies.orient.core.storage.cache;

import com.orientechnologies.common.directmemory.OByteBufferPool;
import com.orientechnologies.common.directmemory.ODirectMemoryPointer;
import com.orientechnologies.common.directmemory.ODirectMemoryPointerFactory;
import com.orientechnologies.orient.core.storage.impl.local.paginated.wal.OLogSequenceNumber;

import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
Expand All @@ -33,36 +35,44 @@
* @since 05.08.13
*/
public class OCachePointer {
private static final int WRITERS_OFFSET = 32;
private static final int READERS_MASK = 0xFFFFFFFF;
private static final int WRITERS_OFFSET = 32;
private static final int READERS_MASK = 0xFFFFFFFF;

private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

private final AtomicInteger referrersCount = new AtomicInteger();
private final AtomicLong readersWritersReferrer = new AtomicLong();
private final AtomicInteger referrersCount = new AtomicInteger();
private final AtomicLong readersWritersReferrer = new AtomicLong();

private final AtomicInteger usagesCounter = new AtomicInteger();
private final AtomicInteger usagesCounter = new AtomicInteger();

private volatile OLogSequenceNumber lastFlushedLsn;

private volatile WritersListener writersListener;
private volatile WritersListener writersListener;

private final ODirectMemoryPointer dataPointer;
private final long fileId;
private final long pageIndex;
private final ByteBuffer buffer;
private final OByteBufferPool bufferPool;

public OCachePointer(final ODirectMemoryPointer dataPointer, final OLogSequenceNumber lastFlushedLsn, final long fileId,
final long pageIndex) {
this.lastFlushedLsn = lastFlushedLsn;
this.dataPointer = dataPointer;
private final ThreadLocal<ByteBuffer> threadLocalBuffer = new ThreadLocal<ByteBuffer>() {
@Override
protected ByteBuffer initialValue() {
if (buffer != null) {
final ByteBuffer b = buffer.duplicate();
b.position(0);
return b;
}

this.fileId = fileId;
this.pageIndex = pageIndex;
}
return null;
}
};

private final long fileId;
private final long pageIndex;

public OCachePointer(final byte[] data, final OLogSequenceNumber lastFlushedLsn, final long fileId, final long pageIndex) {
public OCachePointer(final ByteBuffer buffer, final OByteBufferPool bufferPool, final OLogSequenceNumber lastFlushedLsn,
final long fileId, final long pageIndex) {
this.lastFlushedLsn = lastFlushedLsn;
dataPointer = ODirectMemoryPointerFactory.instance().createPointer(data);
this.buffer = buffer;
this.bufferPool = bufferPool;

this.fileId = fileId;
this.pageIndex = pageIndex;
Expand Down Expand Up @@ -184,16 +194,16 @@ public void incrementReferrer() {

public void decrementReferrer() {
final int rf = referrersCount.decrementAndGet();
if (rf == 0) {
dataPointer.free();
if (rf == 0 && buffer != null) {
bufferPool.release(buffer);
}

if (rf < 0)
throw new IllegalStateException("Invalid direct memory state, number of referrers can not be negative " + rf);
}

public ODirectMemoryPointer getDataPointer() {
return dataPointer;
public ByteBuffer getBuffer() {
return threadLocalBuffer.get();
}

public void acquireExclusiveLock() {
Expand Down Expand Up @@ -224,8 +234,9 @@ public boolean tryAcquireSharedLock() {
protected void finalize() throws Throwable {
super.finalize();

if (referrersCount.get() > 0)
dataPointer.free();
if (referrersCount.get() > 0 && buffer != null) {
bufferPool.release(buffer);
}
}

@Override
Expand All @@ -237,21 +248,20 @@ public boolean equals(Object o) {

OCachePointer that = (OCachePointer) o;

if (dataPointer != null ? !dataPointer.equals(that.dataPointer) : that.dataPointer != null)
if (buffer != null ? !buffer.equals(that.buffer) : that.buffer != null)
return false;

return true;
}

@Override
public int hashCode() {
return dataPointer != null ? dataPointer.hashCode() : 0;
return buffer != null ? buffer.hashCode() : 0;
}

@Override
public String toString() {
return "OCachePointer{" + "referrersCount=" + referrersCount + ", usagesCount=" + usagesCounter + ", dataPointer="
+ dataPointer + '}';
return "OCachePointer{" + "referrersCount=" + referrersCount + ", usagesCount=" + usagesCounter + '}';
}

private long composeReadersWriters(int readers, int writers) {
Expand Down
Expand Up @@ -23,6 +23,7 @@
import com.orientechnologies.common.concur.lock.OInterruptedException;
import com.orientechnologies.common.concur.lock.ONewLockManager;
import com.orientechnologies.common.concur.lock.OReadersWriterSpinLock;
import com.orientechnologies.common.directmemory.OByteBufferPool;
import com.orientechnologies.common.directmemory.ODirectMemoryPointer;
import com.orientechnologies.common.directmemory.ODirectMemoryPointerFactory;
import com.orientechnologies.common.exception.OException;
Expand Down Expand Up @@ -65,6 +66,7 @@
import java.io.RandomAccessFile;
import java.lang.management.ManagementFactory;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
Expand Down Expand Up @@ -1184,9 +1186,9 @@ private void writeNameIdEntry(NameFileIdEntry nameFileIdEntry, boolean sync) thr

final int nameSize = stringSerializer.getObjectSize(nameFileIdEntry.name);
byte[] serializedRecord = new byte[OIntegerSerializer.INT_SIZE + nameSize + OLongSerializer.LONG_SIZE];
OIntegerSerializer.INSTANCE.serializeLiteral(nameSize,serializedRecord,0);
OIntegerSerializer.INSTANCE.serializeLiteral(nameSize, serializedRecord, 0);
stringSerializer.serialize(nameFileIdEntry.name, serializedRecord, OIntegerSerializer.INT_SIZE);
OLongSerializer.INSTANCE.serializeLiteral(nameFileIdEntry.fileId,serializedRecord,OIntegerSerializer.INT_SIZE + nameSize);
OLongSerializer.INSTANCE.serializeLiteral(nameFileIdEntry.fileId, serializedRecord, OIntegerSerializer.INT_SIZE + nameSize);

nameIdMapHolder.write(serializedRecord);

Expand Down Expand Up @@ -1240,6 +1242,7 @@ private OCachePointer[] cacheFileContent(final long fileId, final int intId, fin
final long firstPageStartPosition = startPageIndex * pageSize;
final long firstPageEndPosition = firstPageStartPosition + pageSize;

final OByteBufferPool bufferPool = OByteBufferPool.instance();
if (fileClassic.getFileSize() >= firstPageEndPosition) {
final OSessionStoragePerformanceStatistic sessionStoragePerformanceStatistic = OSessionStoragePerformanceStatistic
.getStatisticInstance();
Expand All @@ -1252,28 +1255,29 @@ private OCachePointer[] cacheFileContent(final long fileId, final int intId, fin

try {
if (pageCount == 1) {
final byte[] content = new byte[pageSize + 2 * PAGE_PADDING];
fileClassic.read(firstPageStartPosition, content, pageSize, PAGE_PADDING);
final ByteBuffer buffer = bufferPool.acquireDirect(false);
fileClassic.read(firstPageStartPosition, buffer);
buffer.position(0);

final ODirectMemoryPointer pointer = ODirectMemoryPointerFactory.instance().createPointer(content);
final OCachePointer dataPointer = new OCachePointer(pointer, lastLsn, fileId, startPageIndex);
final OCachePointer dataPointer = new OCachePointer(buffer, bufferPool, lastLsn, fileId, startPageIndex);
pagesRead = 1;
return new OCachePointer[] { dataPointer };
}

final long maxPageCount = (fileClassic.getFileSize() - firstPageStartPosition) / pageSize;
final int realPageCount = Math.min((int) maxPageCount, pageCount);

final byte[] content = new byte[realPageCount * pageSize];
fileClassic.read(firstPageStartPosition, content, content.length);
final ByteBuffer[] buffers = new ByteBuffer[realPageCount];
for (int i = 0; i < buffers.length; i++) {
buffers[i] = bufferPool.acquireDirect(false);
}

fileClassic.read(firstPageStartPosition, buffers);

final OCachePointer[] dataPointers = new OCachePointer[realPageCount];
for (int n = 0; n < dataPointers.length; n++) {
final byte[] pageContent = new byte[pageSize + 2 * PAGE_PADDING];
System.arraycopy(content, n * pageSize, pageContent, PAGE_PADDING, pageSize);

final ODirectMemoryPointer pointer = ODirectMemoryPointerFactory.instance().createPointer(pageContent);
dataPointers[n] = new OCachePointer(pointer, lastLsn, fileId, startPageIndex + n);
buffers[n].position(0);
dataPointers[n] = new OCachePointer(buffers[n], bufferPool, lastLsn, fileId, startPageIndex + n);
}

pagesRead = dataPointers.length;
Expand All @@ -1293,27 +1297,30 @@ private OCachePointer[] cacheFileContent(final long fileId, final int intId, fin

addAllocatedSpace(space);

final byte[] content = new byte[pageSize + 2 * PAGE_PADDING];
final ODirectMemoryPointer pointer = ODirectMemoryPointerFactory.instance().createPointer(content);
OCachePointer dataPointer = new OCachePointer(pointer, lastLsn, fileId, startPageIndex);
final ByteBuffer buffer = bufferPool.acquireDirect(true);
final OCachePointer dataPointer = new OCachePointer(buffer, bufferPool, lastLsn, fileId, startPageIndex);

cacheHit.setValue(true);
return new OCachePointer[] { dataPointer };
} else
return new OCachePointer[0];
}

private void flushPage(final int fileId, final long pageIndex, final ODirectMemoryPointer dataPointer) throws IOException {
private void flushPage(final int fileId, final long pageIndex, final ByteBuffer buffer) throws IOException {
storagePerformanceStatistic.startPageWriteToFileTimer();
try {
if (writeAheadLog != null) {
final OLogSequenceNumber lsn = ODurablePage.getLogSequenceNumberFromPage(dataPointer);
final OLogSequenceNumber lsn = ODurablePage.getLogSequenceNumberFromPage(buffer);
final OLogSequenceNumber flushedLSN = writeAheadLog.getFlushedLsn();

if (flushedLSN == null || flushedLSN.compareTo(lsn) < 0)
writeAheadLog.flush();
}

final byte[] content = dataPointer.get(PAGE_PADDING, pageSize);
final byte[] content = new byte[pageSize];
buffer.position(0);
buffer.get(content);

OLongSerializer.INSTANCE.serializeNative(MAGIC_NUMBER, content, 0);

final int crc32 = calculatePageCrc(content);
Expand Down Expand Up @@ -1569,9 +1576,10 @@ private int iterateByWritePagesSubRing(final NavigableSet<PageKey> subSet, final
continue;

try {
flushPage(entry.fileId, entry.pageIndex, pagePointer.getDataPointer());
final ByteBuffer buffer = pagePointer.getBuffer();
flushPage(entry.fileId, entry.pageIndex, buffer);

final OLogSequenceNumber flushedLSN = ODurablePage.getLogSequenceNumberFromPage(pagePointer.getDataPointer());
final OLogSequenceNumber flushedLSN = ODurablePage.getLogSequenceNumberFromPage(buffer);
pagePointer.setLastFlushedLsn(flushedLSN);
} finally {
pagePointer.releaseSharedLock();
Expand Down Expand Up @@ -1640,9 +1648,10 @@ private int iterateByCacheSubRing(final NavigableMap<PageKey, PageGroup> subMap,
continue;

try {
flushPage(pageKey.fileId, pageKey.pageIndex, pagePointer.getDataPointer());
final ByteBuffer buffer = pagePointer.getBuffer();
flushPage(pageKey.fileId, pageKey.pageIndex, buffer);

final OLogSequenceNumber flushedLSN = ODurablePage.getLogSequenceNumberFromPage(pagePointer.getDataPointer());
final OLogSequenceNumber flushedLSN = ODurablePage.getLogSequenceNumberFromPage(buffer);
pagePointer.setLastFlushedLsn(flushedLSN);
} finally {
pagePointer.releaseSharedLock();
Expand Down Expand Up @@ -1752,7 +1761,8 @@ private void flushRing(final NavigableMap<PageKey, PageGroup> subMap) throws IOE
continue;

try {
flushPage(pageKey.fileId, pageKey.pageIndex, pagePointer.getDataPointer());
final ByteBuffer buffer = pagePointer.getBuffer();
flushPage(pageKey.fileId, pageKey.pageIndex, buffer);
} finally {
pagePointer.releaseSharedLock();
}
Expand Down
Expand Up @@ -21,40 +21,40 @@

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileLock;

/**
* Interface to represent low-level File access.
*
*
* @author Luca Garulli (l.garulli--at--orientechnologies.com)
*
*/
public interface OFile {
/**
* Opens the file.
*
*
* @return
* @throws IOException
*/
boolean open() throws IOException;

/**
* Creates the file.
*
*
* @throws IOException
*/
void create() throws IOException;

/**
* Closes the file.
*
*
* @throws IOException
*/
void close() throws IOException;

/**
* Deletes the file.
*
*
* @throws IOException
*/
void delete() throws IOException;
Expand Down Expand Up @@ -89,11 +89,9 @@ public interface OFile {

void unlock() throws IOException;


/**
* Shrink the file content (filledUpTo attribute only)
*
*
*
* @param iSize
* @throws IOException
*/
Expand All @@ -120,4 +118,10 @@ public interface OFile {
void read(long iOffset, byte[] iData, int iLength, int iArrayOffset) throws IOException;

void write(long iOffset, byte[] iData, int iSize, int iArrayOffset) throws IOException;

void read(long offset, ByteBuffer buffer) throws IOException;

void read(long offset, ByteBuffer[] buffers) throws IOException;

void write(long offset, ByteBuffer buffer) throws IOException;
}

0 comments on commit e3e0fbc

Please sign in to comment.