Skip to content

Commit

Permalink
Improved query scan
Browse files Browse the repository at this point in the history
  • Loading branch information
lvca committed Oct 11, 2015
1 parent 2b26b5f commit 0954fd5
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 87 deletions.
Expand Up @@ -19,6 +19,16 @@
*/
package com.orientechnologies.orient.core.sql;

import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import com.orientechnologies.common.collection.OMultiCollectionIterator;
import com.orientechnologies.common.collection.OMultiValue;
import com.orientechnologies.common.collection.OSortedMultiIterator;
Expand Down Expand Up @@ -96,16 +106,6 @@
import com.orientechnologies.orient.core.storage.impl.local.OAbstractPaginatedStorage;
import com.orientechnologies.orient.core.storage.impl.local.paginated.OLocalPaginatedStorage;

import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
* Executes the SQL SELECT statement. the parse() method compiles the query and builds the meta information needed by the execute().
* If the query contains the ORDER BY clause, the results are temporary collected internally, then ordered and finally returned all
Expand Down Expand Up @@ -1712,10 +1712,24 @@ public void run() {
jobs.add(Orient.instance().submit(job));
}

boolean tipProvided = false;
while (runningJobs.get() > 0 || !resultQueue.isEmpty()) {
try {
final AsyncResult result = resultQueue.take();

final int qSize = resultQueue.size();

if (!tipProvided && qSize >= OGlobalConfiguration.QUERY_PARALLEL_RESULT_QUEUE_SIZE.getValueAsInteger() - 1) {
OLogManager
.instance()
.debug(
this,
"Parallel query '%s' has result queue full (size=%d), this could reduce concurrency level. Consider increasing queue size with setting: %s=<size>",
parserText, OGlobalConfiguration.QUERY_PARALLEL_RESULT_QUEUE_SIZE.getValueAsInteger(),
OGlobalConfiguration.QUERY_PARALLEL_RESULT_QUEUE_SIZE.getKey());
tipProvided = true;
}

if (OExecutionThreadLocal.isInterruptCurrentOperation())
throw new InterruptedException("Operation has been interrupted");

Expand Down
Expand Up @@ -109,11 +109,11 @@ public Object getValue(final OIdentifiable iRecord, final Object iCurrentResult,
}

// UNMARSHALL THE SINGLE FIELD
if (doc.deserializeFields(preLoadedFieldsArray)) {
final Object v = doc.rawField(name);
return transformValue(iRecord, iContext, v);
}
return null;
if (preLoadedFieldsArray != null && !doc.deserializeFields(preLoadedFieldsArray))
return null;

final Object v = doc.rawField(name);
return transformValue(iRecord, iContext, v);
}

public OBinaryField getBinaryField(final OIdentifiable iRecord) {
Expand Down
Expand Up @@ -525,56 +525,60 @@ private static int getEntryContentLength(int grownContentSize) {
}

@SuppressFBWarnings(value = "PZLA_PREFER_ZERO_LENGTH_ARRAYS")
public ORawBuffer readRecord(long clusterPosition) throws IOException {
public ORawBuffer readRecord(final long clusterPosition) throws IOException {
atomicOperationsManager.acquireReadLock(this);
try {
acquireSharedLock();
try {
OAtomicOperation atomicOperation = atomicOperationsManager.getCurrentOperation();
OClusterPositionMapBucket.PositionEntry positionEntry = clusterPositionMap.get(clusterPosition);
if (positionEntry == null)
return null;
return readRecordNoLock(clusterPosition);
} finally {
releaseSharedLock();
}
} finally {
atomicOperationsManager.releaseReadLock(this);
}
}

int recordPosition = positionEntry.getRecordPosition();
long pageIndex = positionEntry.getPageIndex();
protected ORawBuffer readRecordNoLock(final long clusterPosition) throws IOException {
final OClusterPositionMapBucket.PositionEntry positionEntry = clusterPositionMap.get(clusterPosition);
if (positionEntry == null)
return null;

if (getFilledUpTo(atomicOperation, fileId) <= pageIndex)
return null;
final int recordPosition = positionEntry.getRecordPosition();
final long pageIndex = positionEntry.getPageIndex();

ORecordVersion recordVersion = null;
OCacheEntry cacheEntry = loadPage(atomicOperation, fileId, pageIndex, false);
try {
final OClusterPage localPage = new OClusterPage(cacheEntry, false, getChangesTree(atomicOperation, cacheEntry));
if (localPage.isDeleted(recordPosition))
return null;
final OAtomicOperation atomicOperation = atomicOperationsManager.getCurrentOperation();
if (getFilledUpTo(atomicOperation, fileId) <= pageIndex)
return null;

recordVersion = localPage.getRecordVersion(recordPosition);
} finally {
releasePage(atomicOperation, cacheEntry);
}
ORecordVersion recordVersion = null;
final OCacheEntry cacheEntry = loadPage(atomicOperation, fileId, pageIndex, false);
try {
final OClusterPage localPage = new OClusterPage(cacheEntry, false, getChangesTree(atomicOperation, cacheEntry));
if (localPage.isDeleted(recordPosition))
return null;

byte[] fullContent = readFullEntry(clusterPosition, pageIndex, recordPosition, atomicOperation);
if (fullContent == null)
return null;
recordVersion = localPage.getRecordVersion(recordPosition);
} finally {
releasePage(atomicOperation, cacheEntry);
}

int fullContentPosition = 0;
final byte[] fullContent = readFullEntry(clusterPosition, pageIndex, recordPosition, atomicOperation);
if (fullContent == null)
return null;

byte recordType = fullContent[fullContentPosition];
fullContentPosition++;
int fullContentPosition = 0;

int readContentSize = OIntegerSerializer.INSTANCE.deserializeNative(fullContent, fullContentPosition);
fullContentPosition += OIntegerSerializer.INT_SIZE;
final byte recordType = fullContent[fullContentPosition];
fullContentPosition++;

byte[] recordContent = compression.uncompress(fullContent, fullContentPosition, readContentSize);
recordContent = encryption.decrypt(recordContent);
final int readContentSize = OIntegerSerializer.INSTANCE.deserializeNative(fullContent, fullContentPosition);
fullContentPosition += OIntegerSerializer.INT_SIZE;

return new ORawBuffer(recordContent, recordVersion, recordType);
} finally {
releaseSharedLock();
}
} finally {
atomicOperationsManager.releaseReadLock(this);
}
byte[] recordContent = compression.uncompress(fullContent, fullContentPosition, readContentSize);
recordContent = encryption.decrypt(recordContent);

return new ORawBuffer(recordContent, recordVersion, recordType);
}

@Override
Expand Down Expand Up @@ -1260,53 +1264,29 @@ public void scan(final boolean iAscendingOrder, final long iFrom, final long iTo
try {
acquireSharedLock();
try {
OAtomicOperation atomicOperation = atomicOperationsManager.getCurrentOperation();

final long firstPos = iFrom > -1 ? iFrom : getFirstPosition();
final long lastPos = iTo > -1 ? iTo : getLastPosition();
final long recordsToScan = lastPos - firstPos;
final long progressDump = recordsToScan / 10;

for (long clusterPosition = iAscendingOrder ? firstPos : lastPos; iAscendingOrder ? clusterPosition <= lastPos
: clusterPosition >= firstPos; clusterPosition += (iAscendingOrder ? 1 : -1)) {

OClusterPositionMapBucket.PositionEntry positionEntry = clusterPositionMap.get(clusterPosition);
if (positionEntry == null)
continue;

int recordPosition = positionEntry.getRecordPosition();
long pageIndex = positionEntry.getPageIndex();

if (getFilledUpTo(atomicOperation, fileId) <= pageIndex)
final ORawBuffer buffer = readRecordNoLock(clusterPosition);
if (buffer == null)
continue;

ORecordVersion recordVersion = null;
OCacheEntry cacheEntry = loadPage(atomicOperation, fileId, pageIndex, false);
try {
final OClusterPage localPage = new OClusterPage(cacheEntry, false, getChangesTree(atomicOperation, cacheEntry));
if (localPage.isDeleted(recordPosition))
continue;
if (progressDump > 1 && OLogManager.instance().isDebugEnabled()) {
final long recordsScanned = clusterPosition - firstPos;

recordVersion = localPage.getRecordVersion(recordPosition);
} finally {
releasePage(atomicOperation, cacheEntry);
if ((recordsScanned + 1) % progressDump == 0) {
OLogManager.instance().debug(this, "Scan cluster id=%d read %d/%d %.2f%%", id, recordsScanned, recordsToScan,
((float) recordsScanned * 100 / recordsToScan));
}
}

byte[] fullContent = readFullEntry(clusterPosition, pageIndex, recordPosition, atomicOperation);
if (fullContent == null)
continue;

int fullContentPosition = 0;

byte recordType = fullContent[fullContentPosition];
fullContentPosition++;

int readContentSize = OIntegerSerializer.INSTANCE.deserializeNative(fullContent, fullContentPosition);
fullContentPosition += OIntegerSerializer.INT_SIZE;

byte[] recordContent = compression.uncompress(fullContent, fullContentPosition, readContentSize);
recordContent = encryption.decrypt(recordContent);

final ORecord rec = Orient.instance().getRecordFactoryManager().newInstance(recordType);
ORecordInternal.fill(rec, new ORecordId(id, clusterPosition), recordVersion, recordContent, false);
final ORecord rec = Orient.instance().getRecordFactoryManager().newInstance(buffer.recordType);
ORecordInternal.fill(rec, new ORecordId(id, clusterPosition), buffer.version, buffer.buffer, false);

if (iCallback.call(rec).equals(Boolean.FALSE))
break;
Expand All @@ -1317,6 +1297,10 @@ public void scan(final boolean iAscendingOrder, final long iFrom, final long iTo
}
} finally {
atomicOperationsManager.releaseReadLock(this);

if (OLogManager.instance().isDebugEnabled()) {
OLogManager.instance().debug(this, "Scan cluster id=%d completed", id);
}
}
}

Expand Down

0 comments on commit 0954fd5

Please sign in to comment.