Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

TEIID-2429 last refinements to sort performance

  • Loading branch information...
commit dd6e72033da82a02eb9cfc323619add1735c9130 1 parent d69f44d
@shawkins shawkins authored
Showing with 303 additions and 195 deletions.
  1. +2 −2 common-core/src/main/java/org/teiid/core/types/DataTypeManager.java
  2. +4 −3 engine/src/main/java/org/teiid/common/buffer/AbstractTupleSource.java
  3. +21 −4 engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
  4. +109 −43 engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
  5. +19 −2 engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
  6. +2 −2 engine/src/main/java/org/teiid/query/processor/relational/DependentCriteriaProcessor.java
  7. +39 −68 engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
  8. +3 −2 engine/src/main/java/org/teiid/query/processor/relational/SortingFilter.java
  9. +3 −5 engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
  10. +2 −2 engine/src/test/java/org/teiid/common/buffer/TestSTree.java
  11. +16 −0 engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java
  12. +11 −11 engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java
  13. +1 −1  test-integration/common/src/test/java/org/teiid/systemmodel/TestSystemVirtualModel.java
  14. +46 −46 test-integration/common/src/test/resources/TestSystemVirtualModel/testColumns.expected
  15. +25 −4 test-integration/perf/src/test/java/org/teiid/query/eval/TestEnginePerformance.java
View
4 common-core/src/main/java/org/teiid/core/types/DataTypeManager.java
@@ -59,12 +59,12 @@
public class DataTypeManager {
static final String ARRAY_SUFFIX = "[]"; //$NON-NLS-1$
- private static final boolean USE_VALUE_CACHE = PropertiesUtils.getBooleanProperty(System.getProperties(), "org.teiid.useValueCache", false); //$NON-NLS-1$
+ public static final boolean USE_VALUE_CACHE = PropertiesUtils.getBooleanProperty(System.getProperties(), "org.teiid.useValueCache", false); //$NON-NLS-1$
private static final boolean COMPARABLE_LOBS = PropertiesUtils.getBooleanProperty(System.getProperties(), "org.teiid.comparableLobs", false); //$NON-NLS-1$
private static final boolean COMPARABLE_OBJECT = PropertiesUtils.getBooleanProperty(System.getProperties(), "org.teiid.comparableObject", false); //$NON-NLS-1$
public static final boolean PAD_SPACE = PropertiesUtils.getBooleanProperty(System.getProperties(), "org.teiid.padSpace", false); //$NON-NLS-1$
- private static boolean valueCacheEnabled;
+ private static boolean valueCacheEnabled = USE_VALUE_CACHE;
private interface ValueCache<T> {
T getValue(T value);
View
7 engine/src/main/java/org/teiid/common/buffer/AbstractTupleSource.java
@@ -57,10 +57,11 @@ public int getCurrentIndex() {
BlockedException, TeiidProcessingException {
if (available() > 0) {
//if (forwardOnly) {
- if (batch == null || !batch.containsRow(currentRow)) {
- batch = getBatch(currentRow);
+ int row = getCurrentIndex();
+ if (batch == null || !batch.containsRow(row)) {
+ batch = getBatch(row);
}
- return batch.getTuple(currentRow);
+ return batch.getTuple(row);
//}
//TODO: determine if we should directly hold a soft reference here
//return getRow(currentRow);
View
25 engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
@@ -49,6 +49,7 @@
AbstractTupleSource {
private final boolean singleUse;
private boolean noBlocking;
+ private boolean reverse;
private TupleBufferTupleSource(boolean singleUse) {
this.singleUse = singleUse;
@@ -56,7 +57,7 @@ private TupleBufferTupleSource(boolean singleUse) {
@Override
protected List<?> finalRow() throws TeiidComponentException, TeiidProcessingException {
- if(isFinal || noBlocking) {
+ if(isFinal || noBlocking || reverse) {
return null;
}
if (rowSourceLock == null) {
@@ -76,7 +77,10 @@ private TupleBufferTupleSource(boolean singleUse) {
@Override
public int available() {
- return rowCount - getCurrentIndex() + 1;
+ if (!reverse) {
+ return rowCount - getCurrentIndex() + 1;
+ }
+ return getCurrentIndex();
}
@Override
@@ -95,6 +99,19 @@ public void closeSource() {
public void setNoBlocking(boolean noBlocking) {
this.noBlocking = noBlocking;
}
+
+ public void setReverse(boolean reverse) {
+ this.reverse = reverse;
+ }
+
+ @Override
+ public int getCurrentIndex() {
+ if (!reverse) {
+ return super.getCurrentIndex();
+ }
+ return getRowCount() - super.getCurrentIndex() + 1;
+ }
+
}
/**
@@ -448,8 +465,8 @@ public void truncateTo(int rowLimit) throws TeiidComponentException {
/**
* Return a more accurate batch estimate or 0 if a new estimate is not available
*/
- public int getBatchMemorySizeEstimate() {
- return this.manager.getRowSizeEstimate()*this.batchSize;
+ public int getRowSizeEstimate() {
+ return this.manager.getRowSizeEstimate();
}
}
View
152 engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
@@ -94,15 +94,33 @@ public Cleaner(BufferManagerImpl bufferManagerImpl) {
@Override
public void run() {
- BufferManagerImpl impl = this.bufferRef.get();
- if (impl == null) {
- this.cancel();
- return;
- }
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
- LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Asynch eviction run", impl.reserveBatchBytes.get(), impl.maxReserveBytes.get(), impl.activeBatchBytes.get()); //$NON-NLS-1$
+ while (true) {
+ BufferManagerImpl impl = this.bufferRef.get();
+ if (impl == null) {
+ this.cancel();
+ return;
+ }
+ impl.cleaning.set(true);
+ try {
+ long evicted = impl.doEvictions(0, false, impl.initialEvictionQueue);
+ if (evicted != 0) {
+ continue;
+ }
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
+ LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Asynch eviction run", evicted, impl.reserveBatchBytes.get(), impl.maxReserveBytes, impl.activeBatchBytes.get()); //$NON-NLS-1$
+ }
+ } catch (Throwable t) {
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, t, "Exception during cleaning run"); //$NON-NLS-1$
+ }
+ synchronized (this) {
+ impl.cleaning.set(false);
+ try {
+ this.wait(100);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
}
- impl.doEvictions(0, false);
}
}
@@ -283,6 +301,7 @@ public int getSizeEstimate(List<? extends List<?>> obj) {
}
if (!retain) {
removeFromCache(this.id, batch);
+ persistBatchReferences(ce.getSizeEstimate());
} else {
addMemoryEntry(ce, false);
}
@@ -342,7 +361,7 @@ public BatchSoftReference(CacheEntry referent,
//set to acceptable defaults for testing
private int maxProcessingBytes = 1 << 21;
private Integer maxProcessingBytesOrig;
- AtomicLong maxReserveBytes = new AtomicLong(1 << 28);
+ long maxReserveBytes = 1 << 28;;
AtomicLong reserveBatchBytes = new AtomicLong();
AtomicLong overheadBytes = new AtomicLong();
private int maxActivePlans = DQPConfiguration.DEFAULT_MAX_ACTIVE_PLANS; //used as a hint to set the reserveBatchKB
@@ -360,6 +379,7 @@ public BatchSoftReference(CacheEntry referent,
private AtomicLong readAttempts = new AtomicLong();
//TODO: consider the size estimate in the weighting function
LrfuEvictionQueue<CacheEntry> evictionQueue = new LrfuEvictionQueue<CacheEntry>(readAttempts);
+ LrfuEvictionQueue<CacheEntry> initialEvictionQueue = new LrfuEvictionQueue<CacheEntry>(readAttempts);
ConcurrentHashMap<Long, CacheEntry> memoryEntries = new ConcurrentHashMap<Long, CacheEntry>(16, .75f, CONCURRENCY_LEVEL);
//limited size reference caches based upon the memory settings
@@ -392,10 +412,11 @@ protected boolean removeEldestEntry(Map.Entry<Long,BatchSoftReference> eldest) {
//TODO: this does not scale well with multiple embedded instances
private static final Timer timer = new Timer("BufferManager Cleaner", true); //$NON-NLS-1$
private Cleaner cleaner;
+ private AtomicBoolean cleaning = new AtomicBoolean();
public BufferManagerImpl() {
this.cleaner = new Cleaner(this);
- timer.schedule(cleaner, 1000, 100);
+ timer.schedule(cleaner, 100);
}
void clearSoftReference(BatchSoftReference bsr) {
@@ -536,10 +557,10 @@ public void setMaxProcessingKB(int maxProcessingKB) {
public void setMaxReserveKB(int maxReserveBatchKB) {
if (maxReserveBatchKB > -1) {
int maxReserve = maxReserveBatchKB<<10;
- this.maxReserveBytes.set(maxReserve);
+ this.maxReserveBytes = maxReserve;
this.reserveBatchBytes.set(maxReserve);
} else {
- this.maxReserveBytes.set(-1);
+ this.maxReserveBytes = -1;
}
}
@@ -548,15 +569,15 @@ public void initialize() throws TeiidComponentException {
long maxMemory = Runtime.getRuntime().maxMemory();
maxMemory = Math.max(0, maxMemory - (SYSTEM_OVERHEAD_MEGS << 20)); //assume an overhead for the AS/system stuff
if (getMaxReserveKB() < 0) {
- this.maxReserveBytes.set(0);
+ this.maxReserveBytes = 0;
int one_gig = 1 << 30;
if (maxMemory > one_gig) {
//assume 70% of the memory over the first gig
- this.maxReserveBytes.addAndGet((long)Math.max(0, (maxMemory - one_gig) * .7));
+ this.maxReserveBytes = (long)Math.max(0, (maxMemory - one_gig) * .7);
}
- this.maxReserveBytes.addAndGet(Math.max(0, Math.min(one_gig, maxMemory) >> 1));
+ this.maxReserveBytes += Math.max(0, Math.min(one_gig, maxMemory) >> 1);
}
- this.reserveBatchBytes.set(this.maxReserveBytes.get());
+ this.reserveBatchBytes.set(maxReserveBytes);
if (this.maxProcessingBytesOrig == null) {
//store the config value so that we can be reinitialized (this is not a clean approach)
this.maxProcessingBytesOrig = this.maxProcessingBytes;
@@ -572,7 +593,7 @@ public void initialize() throws TeiidComponentException {
weakReferenceCache = new WeakReferenceHashedValueCache<CacheEntry>(Math.min(30, logSize));
}
this.maxSoftReferences = 1 << Math.min(30, logSize);
- this.nominalProcessingMemoryMax = (int)Math.max(Math.min(this.maxReserveBytes.get(), this.maxProcessingBytes), Math.min(Integer.MAX_VALUE, 2*this.maxReserveBytes.get()/(Math.max(1, maxActivePlans/2) + Runtime.getRuntime().availableProcessors())));
+ this.nominalProcessingMemoryMax = (int)Math.max(Math.min(this.maxReserveBytes, 2*this.maxProcessingBytes), Math.min(Integer.MAX_VALUE, 2*this.maxReserveBytes/maxActivePlans));
}
void setNominalProcessingMemoryMax(int nominalProcessingMemoryMax) {
@@ -609,7 +630,7 @@ private void releaseBuffers(long count, boolean updateContext) {
lock.lock();
try {
this.reserveBatchBytes.addAndGet(count);
- batchesFreed.signal();
+ batchesFreed.signalAll();
} finally {
lock.unlock();
}
@@ -621,19 +642,23 @@ public int reserveBuffers(int count, BufferReserveMode mode) {
LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Reserving buffer space", count, mode); //$NON-NLS-1$
}
CommandContext context = CommandContext.getThreadLocalContext();
+ int existing = 0;
+ if (context != null) {
+ existing = (int)Math.min(Integer.MAX_VALUE, context.addAndGetReservedBuffers(0));
+ }
int result = count;
if (mode == BufferReserveMode.FORCE) {
reserve(count, context);
} else {
lock.lock();
try {
- count = Math.min(count, nominalProcessingMemoryMax);
+ count = Math.min(count, nominalProcessingMemoryMax - existing);
result = noWaitReserve(count, false, context);
} finally {
lock.unlock();
}
}
- persistBatchReferences();
+ persistBatchReferences(result);
return result;
}
@@ -658,7 +683,7 @@ public int reserveBuffersBlocking(int count, long[] val, boolean force) throws B
CommandContext context = CommandContext.getThreadLocalContext();
long reserved = 0;
if (context != null) {
- context.addAndGetReservedBuffers(0);
+ reserved = context.addAndGetReservedBuffers(0);
//TODO: in theory we have to check the whole stack as we could be
//issuing embedded queries back to ourselves
}
@@ -717,7 +742,7 @@ public int reserveBuffersBlocking(int count, long[] val, boolean force) throws B
result = count_orig;
}
val[0] = 0;
- persistBatchReferences();
+ persistBatchReferences(result);
return result;
}
@@ -751,28 +776,56 @@ private int noWaitReserve(int count, boolean allOrNothing, CommandContext contex
return count;
}
- void persistBatchReferences() {
+ void persistBatchReferences(int max) {
+ if (max <= 0) {
+ return;
+ }
+ if (!cleaning.get()) {
+ synchronized (cleaner) {
+ cleaner.notify();
+ }
+ }
long activeBatch = activeBatchBytes.get() + overheadBytes.get();
long reserveBatch = reserveBatchBytes.get();
- if (activeBatch <= reserveBatch) {
- long memoryCount = activeBatch + getMaxReserveKB() - reserveBatch;
- if (DataTypeManager.isValueCacheEnabled()) {
- if (memoryCount < getMaxReserveKB() / 8) {
- DataTypeManager.setValueCacheEnabled(false);
- }
- } else if (memoryCount > getMaxReserveKB() / 2) {
- DataTypeManager.setValueCacheEnabled(true);
+ long memoryCount = activeBatch + maxReserveBytes - reserveBatch;
+ if (memoryCount <= maxReserveBytes) {
+ if (DataTypeManager.USE_VALUE_CACHE && DataTypeManager.isValueCacheEnabled() && memoryCount < maxReserveBytes / 8) {
+ DataTypeManager.setValueCacheEnabled(false);
}
return;
+ } else if (DataTypeManager.USE_VALUE_CACHE) {
+ DataTypeManager.setValueCacheEnabled(true);
+ }
+ //we delay work here as there should be excess vm space, we are using an overestimate, and we want the cleaner to do the work if possible
+ //TODO: track sizes held by each queue independently
+ long maxToFree = Math.min(max, memoryCount - maxReserveBytes);
+ LrfuEvictionQueue<CacheEntry> first = initialEvictionQueue;
+ LrfuEvictionQueue<CacheEntry> second = evictionQueue;
+ if (evictionQueue.getSize() > 2*initialEvictionQueue.getSize()) {
+ //attempt to evict from the non-initial queue first as these should essentially be cost "free" and hopefully the reference cache can mitigate
+ //the cost of rereading
+ first = evictionQueue;
+ second = initialEvictionQueue;
+ }
+ maxToFree -= doEvictions(maxToFree, true, first);
+ if (maxToFree > 0) {
+ maxToFree = Math.min(maxToFree, activeBatchBytes.get() + overheadBytes.get() - reserveBatchBytes.get());
+ if (maxToFree > 0) {
+ doEvictions(maxToFree, true, second);
+ }
}
- long maxToFree = Math.min(maxProcessingBytes, (activeBatch - reserveBatch)<<1);
- doEvictions(maxToFree, true);
}
-
- void doEvictions(long maxToFree, boolean checkActiveBatch) {
- int freed = 0;
- while (freed <= maxToFree && (!checkActiveBatch || (maxToFree == 0 && activeBatchBytes.get() + overheadBytes.get() > reserveBatchBytes.get() * .7) || (maxToFree > 0 && activeBatchBytes.get() + overheadBytes.get() > reserveBatchBytes.get() * .8))) {
- CacheEntry ce = evictionQueue.firstEntry(checkActiveBatch);
+
+ long doEvictions(long maxToFree, boolean checkActiveBatch, LrfuEvictionQueue<CacheEntry> queue) {
+ if (queue == evictionQueue) {
+ maxToFree = Math.min(maxToFree, this.maxProcessingBytes);
+ }
+ long freed = 0;
+ while (freed <= maxToFree && (
+ !checkActiveBatch //age out
+ || queue == evictionQueue && activeBatchBytes.get() + overheadBytes.get() + this.maxReserveBytes/2 > reserveBatchBytes.get() //nominal cleaning criterion
+ || queue != evictionQueue && activeBatchBytes.get() > 0)) { //assume that basically all initial batches will need to be written out at some point
+ CacheEntry ce = queue.firstEntry(checkActiveBatch);
if (ce == null) {
break;
}
@@ -802,12 +855,13 @@ void doEvictions(long maxToFree, boolean checkActiveBatch) {
freed += ce.getSizeEstimate();
}
activeBatchBytes.addAndGet(-ce.getSizeEstimate());
- evictionQueue.remove(ce); //ensures that an intervening get will still be cleaned
+ queue.remove(ce); //ensures that an intervening get will still be cleaned
checkActiveBatch = true;
}
}
}
}
+ return freed;
}
boolean evict(CacheEntry ce) throws Exception {
@@ -862,10 +916,17 @@ CacheEntry fastGet(Long batch, boolean prefersMemory, boolean retain) {
//there is a minute chance the batch was evicted
//this call ensures that we won't leak
if (memoryEntries.containsKey(batch)) {
- evictionQueue.touch(ce);
+ if (ce.isPersistent()) {
+ evictionQueue.touch(ce);
+ } else {
+ initialEvictionQueue.touch(ce);
+ }
}
} else {
evictionQueue.remove(ce);
+ if (!ce.isPersistent()) {
+ initialEvictionQueue.remove(ce);
+ }
}
}
if (!retain) {
@@ -926,11 +987,11 @@ private void remove(CacheEntry ce, boolean inMemory) {
}
void addMemoryEntry(CacheEntry ce, boolean initial) {
- persistBatchReferences();
+ persistBatchReferences(ce.getSizeEstimate());
synchronized (ce) {
boolean added = memoryEntries.put(ce.getId(), ce) == null;
if (initial) {
- evictionQueue.add(ce);
+ initialEvictionQueue.add(ce);
} else if (added) {
evictionQueue.recordAccess(ce);
evictionQueue.add(ce);
@@ -1016,6 +1077,7 @@ public void shutdown() {
this.cache = null;
this.memoryEntries.clear();
this.evictionQueue.getEvictionQueue().clear();
+ this.initialEvictionQueue.getEvictionQueue().clear();
this.cleaner.cancel();
}
@@ -1156,7 +1218,7 @@ public void setInlineLobs(boolean inlineLobs) {
}
public int getMaxReserveKB() {
- return (int)maxReserveBytes.get()>>10;
+ return (int)maxReserveBytes>>10;
}
public void setCache(Cache cache) {
@@ -1176,6 +1238,10 @@ public boolean hasState(String stateId) {
return this.getTupleBuffer(stateId) != null;
}
+ public long getReferenceHits() {
+ return referenceHit.get();
+ }
+
@Override
public Streamable<?> persistLob(Streamable<?> lob, FileStore store,
byte[] bytes) throws TeiidComponentException {
View
21 engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
@@ -26,6 +26,7 @@
import java.util.Map;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.teiid.common.buffer.BaseCacheEntry;
@@ -48,6 +49,7 @@
protected AtomicLong clock;
protected long maxInterval;
protected long halfLife;
+ private AtomicInteger size = new AtomicInteger();
public LrfuEvictionQueue(AtomicLong clock) {
this.clock = clock;
@@ -55,11 +57,19 @@ public LrfuEvictionQueue(AtomicLong clock) {
}
public boolean remove(V value) {
- return evictionQueue.remove(value.getKey()) != null;
+ if (evictionQueue.remove(value.getKey()) != null) {
+ size.addAndGet(-1);
+ return true;
+ }
+ return false;
}
public boolean add(V value) {
- return evictionQueue.put(value.getKey(), value) == null;
+ if (evictionQueue.put(value.getKey(), value) == null) {
+ size.addAndGet(1);
+ return true;
+ }
+ return false;
}
public void touch(V value) {
@@ -80,6 +90,9 @@ public V firstEntry(boolean poll) {
Map.Entry<CacheKey, V> entry = null;
if (poll) {
entry = evictionQueue.pollFirstEntry();
+ if (entry != null) {
+ size.addAndGet(-1);
+ }
} else {
entry = evictionQueue.firstEntry();
}
@@ -130,4 +143,8 @@ public void setHalfLife(long halfLife) {
this.maxInterval = 62*this.halfLife;
}
+ public int getSize() {
+ return size.get();
+ }
+
}
View
4 engine/src/main/java/org/teiid/query/processor/relational/DependentCriteriaProcessor.java
@@ -105,8 +105,8 @@ public void sort() throws BlockedException,
}
}
List<Boolean> sortDirection = Collections.nCopies(sortSymbols.size(), OrderBy.ASC);
- this.sortUtility = new SortUtility(originalVs.getTupleBuffer().createIndexedTupleSource(), sortSymbols, sortDirection, Mode.DUP_REMOVE, dependentNode.getBufferManager(), dependentNode.getConnectionID(), originalVs.getTupleBuffer().getSchema());
- this.sortUtility.setSkipBuffer(true);
+ this.sortUtility = new SortUtility(null, sortSymbols, sortDirection, Mode.DUP_REMOVE, dependentNode.getBufferManager(), dependentNode.getConnectionID(), originalVs.getTupleBuffer().getSchema());
+ this.sortUtility.setWorkingBuffer(originalVs.getTupleBuffer());
}
dvs = new DependentValueSource(sortUtility.sort());
} else {
View
107 engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
@@ -112,7 +112,6 @@ public String toString() {
private static final int MERGE = 2;
private static final int DONE = 3;
private TupleBuffer workingBuffer;
- private boolean skipBuffer;
private long[] attempts = new long[2];
public SortUtility(TupleSource sourceID, List<OrderByItem> items, Mode mode, BufferManager bufferMgr,
@@ -227,7 +226,7 @@ private TupleBuffer createTupleBuffer() throws TeiidComponentException {
protected void initialSort(boolean onePass) throws TeiidComponentException, TeiidProcessingException {
outer: while (!doneReading) {
- if (!this.skipBuffer) {
+ if (this.source != null) {
//sub-phase 1 - build up a working buffer of tuples
if (this.workingBuffer == null) {
this.workingBuffer = createTupleBuffer();
@@ -275,46 +274,40 @@ protected void initialSort(boolean onePass) throws TeiidComponentException, Teii
int totalReservedBuffers = 0;
try {
int maxRows = this.batchSize;
- boolean resizable = true;
Collection<List<?>> workingTuples = null;
- TupleSource ts = source;
boolean done = false;
-
- if (this.workingBuffer != null) {
- /*
- * if we have a working buffer, then we can balance the work between the initial / multi-pass sort based upon the row count
- * and an updated estimate of the batch memory size
- */
- this.workingBuffer.close();
- schemaSize = Math.max(1, this.workingBuffer.getBatchMemorySizeEstimate());
- resizable = false;
- long memorySpaceNeeded = spaceNeeded(workingBuffer.getRowCount());
- if (onePass) {
- //one pass just needs small sub-lists
- memorySpaceNeeded = Math.min(memorySpaceNeeded, bufferManager.getMaxProcessingSize());
- }
- totalReservedBuffers = bufferManager.reserveBuffers(Math.min(bufferManager.getMaxProcessingSize(), (int)Math.min(memorySpaceNeeded, Integer.MAX_VALUE)), BufferReserveMode.FORCE);
-
- if (totalReservedBuffers != memorySpaceNeeded) {
- int processingSublists = Math.max(2, bufferManager.getMaxProcessingSize()/schemaSize);
- int desiredSpace = (int)Math.min(Integer.MAX_VALUE, spaceNeeded(workingBuffer.getRowCount()/processingSublists + (workingBuffer.getRowCount()%processingSublists)));
- if (desiredSpace > totalReservedBuffers) {
- totalReservedBuffers += bufferManager.reserveBuffers(desiredSpace - totalReservedBuffers, BufferReserveMode.NO_WAIT);
- //TODO: wait to force 2/3 pass processing
- } else if (memorySpaceNeeded <= Integer.MAX_VALUE) {
- totalReservedBuffers += bufferManager.reserveBuffers((int)memorySpaceNeeded - totalReservedBuffers, BufferReserveMode.NO_WAIT);
- }
- if (totalReservedBuffers > schemaSize) {
- int additional = totalReservedBuffers%schemaSize;
- totalReservedBuffers-=additional;
- //release any excess
- bufferManager.releaseBuffers(additional);
- }
- }
- ts = workingBuffer.createIndexedTupleSource(true);
- processed+=this.workingBuffer.getRowCount();
- maxRows = Math.max(1, (totalReservedBuffers/schemaSize))*batchSize;
- }
+ /*
+ * we can balance the work between the initial / multi-pass sort based upon the row count
+ * and an updated estimate of the batch memory size
+ */
+ this.workingBuffer.close();
+ schemaSize = Math.max(1, this.workingBuffer.getRowSizeEstimate()*this.batchSize);
+ long memorySpaceNeeded = workingBuffer.getRowCount()*this.workingBuffer.getRowSizeEstimate();
+ if (onePass) {
+ //one pass just needs small sub-lists
+ memorySpaceNeeded = Math.min(memorySpaceNeeded, bufferManager.getMaxProcessingSize());
+ }
+ totalReservedBuffers = bufferManager.reserveBuffers(Math.min(bufferManager.getMaxProcessingSize(), (int)Math.min(memorySpaceNeeded, Integer.MAX_VALUE)), BufferReserveMode.FORCE);
+ if (totalReservedBuffers != memorySpaceNeeded) {
+ int processingSublists = Math.max(2, bufferManager.getMaxProcessingSize()/schemaSize);
+ int desiredSpace = (int)Math.min(Integer.MAX_VALUE, (workingBuffer.getRowCount()/processingSublists + (workingBuffer.getRowCount()%processingSublists))*(long)this.workingBuffer.getRowSizeEstimate());
+ if (desiredSpace > totalReservedBuffers) {
+ totalReservedBuffers += bufferManager.reserveBuffers(desiredSpace - totalReservedBuffers, BufferReserveMode.NO_WAIT);
+ //TODO: wait to force 2/3 pass processing
+ } else if (memorySpaceNeeded <= Integer.MAX_VALUE) {
+ totalReservedBuffers += bufferManager.reserveBuffers((int)memorySpaceNeeded - totalReservedBuffers, BufferReserveMode.NO_WAIT);
+ }
+ if (totalReservedBuffers > schemaSize) {
+ int additional = totalReservedBuffers%schemaSize;
+ totalReservedBuffers-=additional;
+ //release any excess
+ bufferManager.releaseBuffers(additional);
+ }
+ }
+ TupleBufferTupleSource ts = workingBuffer.createIndexedTupleSource(source != null);
+ ts.setReverse(workingBuffer.getRowCount() > this.batchSize);
+ processed+=this.workingBuffer.getRowCount();
+ maxRows = Math.max(1, (totalReservedBuffers/schemaSize))*batchSize;
if (mode == Mode.SORT) {
workingTuples = new ArrayList<List<?>>();
} else {
@@ -323,17 +316,7 @@ protected void initialSort(boolean onePass) throws TeiidComponentException, Teii
outer: while (!done) {
while(!done) {
if (workingTuples.size() >= maxRows) {
- if (!resizable) {
- break;
- }
- //attempt to reserve more working memory incrementally
- int reserved = bufferManager.reserveBuffers(schemaSize,
- (totalReservedBuffers + schemaSize <= bufferManager.getMaxProcessingSize())?BufferReserveMode.FORCE:BufferReserveMode.NO_WAIT);
- totalReservedBuffers += reserved;
- if (reserved != schemaSize) {
- break;
- }
- maxRows += this.batchSize;
+ break;
}
List<?> tuple = ts.nextTuple();
@@ -358,23 +341,15 @@ protected void initialSort(boolean onePass) throws TeiidComponentException, Teii
}
workingTuples.clear();
sublist.saveBatch();
- if (resizable) {
- schemaSize = Math.max(1, sublist.getBatchMemorySizeEstimate());
- resizable = false;
- maxRows = Math.max(batchSize, totalReservedBuffers/schemaSize);
- if (totalReservedBuffers > schemaSize) {
- int additional = totalReservedBuffers%schemaSize;
- totalReservedBuffers -= additional;
- bufferManager.releaseBuffers(additional);
- }
- }
}
} catch (BlockedException e) {
Assertion.failed("should not block during memory sublist sorting"); //$NON-NLS-1$
} finally {
bufferManager.releaseBuffers(totalReservedBuffers);
if (this.workingBuffer != null) {
- this.workingBuffer.remove();
+ if (this.source != null) {
+ this.workingBuffer.remove();
+ }
this.workingBuffer = null;
}
}
@@ -385,12 +360,8 @@ protected void initialSort(boolean onePass) throws TeiidComponentException, Teii
this.phase = MERGE;
}
- private long spaceNeeded(int rows) {
- return (rows/batchSize + ((rows%batchSize)!=0?1:0))*(long)schemaSize;
- }
-
- public void setSkipBuffer(boolean skipBuffer) {
- this.skipBuffer = skipBuffer;
+ public void setWorkingBuffer(TupleBuffer workingBuffer) {
+ this.workingBuffer = workingBuffer;
}
protected void mergePhase() throws TeiidComponentException, TeiidProcessingException {
View
5 engine/src/main/java/org/teiid/query/processor/relational/SortingFilter.java
@@ -126,8 +126,9 @@ public Object getResult(CommandContext commandContext)
// Sort
if (sortUtility == null) {
- sortUtility = new SortUtility(collectionBuffer.createIndexedTupleSource(true), sortItems, removeDuplicates?Mode.DUP_REMOVE_SORT:Mode.SORT, mgr, groupName, collectionBuffer.getSchema());
- this.sortUtility.setSkipBuffer(true);
+ sortUtility = new SortUtility(null, sortItems, removeDuplicates?Mode.DUP_REMOVE_SORT:Mode.SORT, mgr, groupName, collectionBuffer.getSchema());
+ collectionBuffer.setForwardOnly(true);
+ this.sortUtility.setWorkingBuffer(collectionBuffer);
}
TupleBuffer sorted = sortUtility.sort();
sorted.setForwardOnly(true);
View
8 engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
@@ -280,16 +280,12 @@ public void sort(SortOption sortOption) throws TeiidComponentException, TeiidPro
}
if (this.sortUtility == null) {
TupleSource ts = null;
- boolean skipBuffer = false;
if (this.buffer != null) {
this.buffer.setForwardOnly(true);
if (this.prefetch != null) {
this.prefetch.setPosition(1);
this.prefetch.disableSave();
ts = this.prefetch;
- } else {
- ts = this.buffer.createIndexedTupleSource();
- skipBuffer = true;
}
} else {
ts = new BatchIterator(this.source);
@@ -297,7 +293,9 @@ public void sort(SortOption sortOption) throws TeiidComponentException, TeiidPro
this.sortUtility = new SortUtility(ts, expressions, Collections.nCopies(expressions.size(), OrderBy.ASC),
sortOption == SortOption.SORT_DISTINCT?Mode.DUP_REMOVE_SORT:Mode.SORT, this.source.getBufferManager(), this.source.getConnectionID(), source.getElements());
this.markDistinct(sortOption == SortOption.SORT_DISTINCT && expressions.size() == this.getOuterVals().size());
- this.sortUtility.setSkipBuffer(skipBuffer);
+ if (ts == null) {
+ this.sortUtility.setWorkingBuffer(this.buffer);
+ }
}
if (sortOption == SortOption.NOT_SORTED) {
this.buffers = sortUtility.onePassSort();
View
4 engine/src/test/java/org/teiid/common/buffer/TestSTree.java
@@ -134,8 +134,8 @@
}
@Test public void testSearch() throws TeiidComponentException, TeiidProcessingException {
- BufferManagerImpl bm = BufferManagerFactory.createBufferManager();
- bm.setProcessorBatchSize(1);
+ //due to buffering changes we need to hold this in memory directly rather than serialize it out as that will lead to GC overhead errors
+ BufferManagerImpl bm = BufferManagerFactory.getTestBufferManager(Integer.MAX_VALUE, 1);
ElementSymbol e1 = new ElementSymbol("x");
e1.setType(Integer.class);
View
16 engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java
@@ -31,6 +31,7 @@
import org.junit.Test;
import org.teiid.common.buffer.BufferManager.TupleSourceType;
+import org.teiid.common.buffer.TupleBuffer.TupleBufferTupleSource;
import org.teiid.core.types.ClobType;
import org.teiid.core.types.DataTypeManager;
import org.teiid.query.sql.symbol.ElementSymbol;
@@ -60,6 +61,21 @@
assertEquals(2, batch.getBeginRow());
}
+ @Test public void testReverseIteration() throws Exception {
+ ElementSymbol x = new ElementSymbol("x"); //$NON-NLS-1$
+ x.setType(DataTypeManager.DefaultDataClasses.INTEGER);
+ List<ElementSymbol> schema = Arrays.asList(x);
+ TupleBuffer tb = BufferManagerFactory.getStandaloneBufferManager().createTupleBuffer(schema, "x", TupleSourceType.PROCESSOR); //$NON-NLS-1$
+ tb.addTuple(Arrays.asList(1));
+ tb.addTuple(Arrays.asList(2));
+ TupleBufferTupleSource tbts = tb.createIndexedTupleSource();
+ tbts.setReverse(true);
+ assertTrue(tbts.hasNext());
+ assertEquals(2, tbts.nextTuple().get(0));
+ assertEquals(1, tbts.nextTuple().get(0));
+ assertFalse(tbts.hasNext());
+ }
+
@Test public void testTruncate() throws Exception {
ElementSymbol x = new ElementSymbol("x"); //$NON-NLS-1$
x.setType(DataTypeManager.DefaultDataClasses.INTEGER);
View
22 engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java
@@ -809,19 +809,19 @@ public void helpTestRepeatedMerge(boolean indexDistinct) throws Exception {
this.leftTuples[11] = Arrays.asList((Integer)null);
expected = new List[] {
- Arrays.asList(64, 64),
- Arrays.asList(36, 36),
- Arrays.asList(8, 8),
- Arrays.asList(48, 48),
- Arrays.asList(20, 20),
- Arrays.asList(60, 60),
- Arrays.asList(32, 32),
- Arrays.asList(4, 4),
- Arrays.asList(16, 16),
- Arrays.asList(56, 56),
- Arrays.asList(28, 28),
Arrays.asList(0, 0),
Arrays.asList(0, 0),
+ Arrays.asList(28, 28),
+ Arrays.asList(56, 56),
+ Arrays.asList(16, 16),
+ Arrays.asList(4, 4),
+ Arrays.asList(32, 32),
+ Arrays.asList(20, 20),
+ Arrays.asList(60, 60),
+ Arrays.asList(48, 48),
+ Arrays.asList(8, 8),
+ Arrays.asList(36, 36),
+ Arrays.asList(64, 64),
};
helpCreateJoin();
EnhancedSortMergeJoinStrategy psj = new EnhancedSortMergeJoinStrategy(SortOption.SORT, SortOption.SORT);
View
2  test-integration/common/src/test/java/org/teiid/systemmodel/TestSystemVirtualModel.java
@@ -131,7 +131,7 @@ protected void checkResult(String testName, String query) throws Exception {
}
@Test public void testColumns() throws Exception {
- checkResult("testColumns", "select* from SYS.Columns order by Name"); //$NON-NLS-1$ //$NON-NLS-2$
+ checkResult("testColumns", "select* from SYS.Columns order by Name, uid"); //$NON-NLS-1$ //$NON-NLS-2$
}
@Test public void testTableType() throws Exception {
View
92 test-integration/common/src/test/resources/TestSystemVirtualModel/testColumns.expected
46 additions, 46 deletions not shown
View
29 test-integration/perf/src/test/java/org/teiid/query/eval/TestEnginePerformance.java
@@ -34,6 +34,7 @@
import java.sql.Time;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
@@ -44,6 +45,7 @@
import javax.xml.stream.XMLOutputFactory;
import javax.xml.stream.XMLStreamWriter;
+import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;
import org.teiid.api.exception.query.QueryParserException;
@@ -82,7 +84,10 @@
@SuppressWarnings("nls")
public class TestEnginePerformance {
+ private static boolean debug = false;
+
private static BufferManagerImpl bm;
+ private static BufferFrontedFileStoreCache cache;
private static ExecutorService es;
private static Random r = new Random(0);
@@ -191,7 +196,7 @@ public Void call() throws Exception {
for (int i = 0; i < rowCount; i++) {
data[i] = Arrays.asList(i, String.valueOf(i));
}
- //Collections.shuffle(Arrays.asList(data), r);
+ Collections.shuffle(Arrays.asList(data), r);
return data;
}
@@ -260,11 +265,11 @@ public Void call() throws Exception {
bm = new BufferManagerImpl();
bm.setMaxProcessingKB(1<<12);
- bm.setMaxReserveKB((1<<19)-(1<<17));
+ bm.setMaxReserveKB((1<<18)-(1<<16));
bm.setMaxActivePlans(20);
- BufferFrontedFileStoreCache cache = new BufferFrontedFileStoreCache();
- cache.setMemoryBufferSpace(1<<27);
+ cache = new BufferFrontedFileStoreCache();
+ cache.setMemoryBufferSpace(1<<26);
FileStorageManager fsm = new FileStorageManager();
fsm.setStorageDirectory(UnitTestUtil.getTestScratchPath() + "/data");
cache.setStorageManager(fsm);
@@ -275,6 +280,12 @@ public Void call() throws Exception {
es = Executors.newCachedThreadPool();
}
+ @After public void tearDown() throws Exception {
+ if (debug) {
+ showStats();
+ }
+ }
+
private void helpTestXMLTable(int iterations, int threadCount, String file, int expectedRowCount) throws QueryParserException,
TeiidException, InterruptedException, Exception {
String sql = "select * from xmltable('/root/child' passing xmlparse(document cast(? as clob) wellformed) columns x integer path '@id', y long path 'gc2') as x"; //$NON-NLS-1$
@@ -529,6 +540,16 @@ public Object clone() {
helpTestLargeSort(2, 4, 100000);
}
+ private static void showStats() {
+ System.out.println(bm.getBatchesAdded());
+ System.out.println(bm.getReferenceHits());
+ System.out.println(bm.getReadAttempts());
+ System.out.println(bm.getReadCount());
+ System.out.println(bm.getWriteCount());
+ System.out.println(cache.getStorageReads());
+ System.out.println(cache.getStorageWrites());
+ }
+
/**
* Generates a 5 MB document
*/
Please sign in to comment.
Something went wrong with that request. Please try again.