Skip to content

Commit

Permalink
TEIID-2628 fixing issues with using the buffer from xmltablenode
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Aug 19, 2013
1 parent 0a1ebf7 commit f41c485
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 60 deletions.
21 changes: 3 additions & 18 deletions engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,7 @@

public class TupleBuffer {

public interface RowSource {
void finalRow() throws TeiidComponentException, TeiidProcessingException;
}

public final class TupleBufferTupleSource extends
public class TupleBufferTupleSource extends
AbstractTupleSource {
private final boolean singleUse;
private boolean noBlocking;
Expand All @@ -63,17 +59,11 @@ protected List<?> finalRow() throws TeiidComponentException, TeiidProcessingExce
if(isFinal || noBlocking || reverse) {
return null;
}
if (rowSourceLock == null) {
throw BlockedException.blockWithTrace("Blocking on non-final TupleBuffer", tupleSourceID, "size", getRowCount()); //$NON-NLS-1$ //$NON-NLS-2$
}
synchronized (rowSourceLock) {
rowSourceLock.finalRow();
return getCurrentTuple();
}
throw BlockedException.blockWithTrace("Blocking on non-final TupleBuffer", tupleSourceID, "size", getRowCount()); //$NON-NLS-1$ //$NON-NLS-2$
}

@Override
public int available() {
protected int available() {
if (!reverse) {
return rowCount - getCurrentIndex() + 1;
}
Expand Down Expand Up @@ -144,7 +134,6 @@ public static String[] getTypeNames(List<? extends Expression> expressions) {

private LobManager lobManager;
private String uuid;
private RowSource rowSourceLock;

public TupleBuffer(BatchManager manager, String id, List<? extends Expression> schema, LobManager lobManager, int batchSize) {
this.manager = manager;
Expand All @@ -154,10 +143,6 @@ public TupleBuffer(BatchManager manager, String id, List<? extends Expression> s
this.batchSize = batchSize;
}

public void setRowSourceLock(RowSource rowSourceLock) {
this.rowSourceLock = rowSourceLock;
}

public void setInlineLobs(boolean inline) {
if (this.lobManager != null) {
this.lobManager.setInlineLobs(inline);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.teiid.common.buffer.TupleSource;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.core.util.Assertion;
import org.teiid.query.processor.BatchIterator;
import org.teiid.query.processor.relational.MergeJoinStrategy.SortOption;
import org.teiid.query.processor.relational.SortUtility.Mode;
Expand Down Expand Up @@ -193,6 +194,7 @@ protected void prefetch(boolean limit) throws TeiidComponentException, TeiidProc
}
if (this.source.hasFinalBuffer()) {
this.buffer = this.source.getFinalBuffer(-1);
Assertion.assertTrue(this.buffer.isFinal());
return;
}
createPrefetch();
Expand Down Expand Up @@ -256,6 +258,7 @@ private TupleBuffer getTupleBuffer(boolean full) throws TeiidComponentException,
}
if (source.hasFinalBuffer()) {
this.buffer = source.getFinalBuffer(-1);
Assertion.assertTrue(this.buffer.isFinal());
return this.buffer;
}
this.implicitBuffer = ImplicitBuffer.FULL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
* When streaming the results will be fully built and stored in a buffer
* before being returned
*/
public class XMLTableNode extends SubqueryAwareRelationalNode implements RowProcessor, TupleBuffer.RowSource {
public class XMLTableNode extends SubqueryAwareRelationalNode implements RowProcessor {

private static Map<Class<?>, BuiltInAtomicType> typeMapping = new HashMap<Class<?>, BuiltInAtomicType>();

Expand Down Expand Up @@ -127,7 +127,7 @@ public synchronized void closeDirect() {
}

@Override
public void reset() {
public synchronized void reset() {
super.reset();
if (this.result != null) {
result.close();
Expand Down Expand Up @@ -224,13 +224,11 @@ private void evaluate(final boolean useFinalBuffer) throws TeiidComponentExcepti
public void run() {
try {
XQueryEvaluator.evaluateXQuery(table.getXQueryExpression(), contextItem, parameters, XMLTableNode.this, getContext());
} catch (TeiidException e) {
asynchException = new TeiidRuntimeException(e);
} catch (TeiidRuntimeException e) {
if (e != EARLY_TERMINATION) {
asynchException = e;
}
} catch (RuntimeException e) {
} catch (Throwable e) {
asynchException = new TeiidRuntimeException(e);
} finally {
synchronized (XMLTableNode.this) {
Expand All @@ -247,7 +245,6 @@ public void run() {
}
}
};
this.buffer.setRowSourceLock(this);
this.getContext().getExecutor().execute(r);
return;
}
Expand All @@ -258,21 +255,6 @@ public void run() {
}
}

@Override
public synchronized void finalRow() throws TeiidComponentException,
TeiidProcessingException {
while (state == State.BUILDING) {
try {
this.wait();
} catch (InterruptedException e) {
throw new TeiidRuntimeException(e);
}
}
if (this.asynchException != null) {
unwrapException(this.asynchException);
}
}

private List<?> processRow() throws ExpressionEvaluationException, BlockedException,
TeiidComponentException, TeiidProcessingException {
List<Object> tuple = new ArrayList<Object>(projectedColumns.size());
Expand Down Expand Up @@ -346,24 +328,6 @@ private Object getValue(AtomicValue value) throws XPathException {
return Value.convertToJava(value);
}

@Override
public boolean hasFinalBuffer() {
return this.table.getXQueryExpression().isStreaming();
}

@Override
public synchronized TupleBuffer getFinalBuffer(int maxRows) throws BlockedException,
TeiidComponentException, TeiidProcessingException {
this.rowLimit = maxRows;
evaluate(true);
usingOutput = true;
TupleBuffer finalBuffer = this.buffer;
if (!this.table.getXQueryExpression().isStreaming()) {
close();
}
return finalBuffer;
}

@Override
public synchronized void processRow(NodeInfo row) {
if (isClosed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.junit.Test;
import org.teiid.api.exception.query.ExpressionEvaluationException;
import org.teiid.client.util.ResultsFuture;
import org.teiid.common.buffer.BufferManagerFactory;
import org.teiid.core.TeiidProcessingException;
import org.teiid.core.types.BinaryType;
import org.teiid.core.types.BlobImpl;
Expand Down Expand Up @@ -463,7 +464,23 @@ public class TestSQLXMLProcessing {
Arrays.asList("<b xmlns=\"\" x=\"2\"/>", 1)
};

executeStreaming(sql, expected);
executeStreaming(sql, expected, -1);
}

@Test public void testXmlTableStreamingMultibatch() throws Throwable {
String sql = "select t.* from (select xmlelement(a, xmlagg(xmlelement(b, e1))) doc from pm1.g1) as x, xmltable('/a/b' passing doc columns x string path '.') as t"; //$NON-NLS-1$

final List<?>[] expected = new List<?>[] {
Arrays.asList("a"),
Arrays.asList(""),
Arrays.asList("a"),
Arrays.asList("c"),
Arrays.asList("b"),
Arrays.asList("a")
};

dataManager.setBlockOnce();
executeStreaming(sql, expected, 2);
}

@Test(expected=TeiidProcessingException.class) public void testXmlTableStreamingTimingWithError() throws Throwable {
Expand All @@ -474,12 +491,15 @@ public class TestSQLXMLProcessing {
Arrays.asList(2, 1)
};

executeStreaming(sql, expected);
executeStreaming(sql, expected, -1);
}

private void executeStreaming(String sql, final List<?>[] expected)
private void executeStreaming(String sql, final List<?>[] expected, int batchSize)
throws Throwable {
final CommandContext cc = createCommandContext();
if (batchSize != -1) {
cc.setBufferManager(BufferManagerFactory.getTestBufferManager(0, 1));
}
final ResultsFuture<Runnable> r = new ResultsFuture<Runnable>();
Executor ex = new Executor() {

Expand Down

0 comments on commit f41c485

Please sign in to comment.