Skip to content

Commit

Permalink
TEIID-3390 improving sql/xml generation
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Mar 18, 2015
1 parent f32128f commit adcbc36
Show file tree
Hide file tree
Showing 25 changed files with 269 additions and 60 deletions.
Expand Up @@ -79,6 +79,7 @@ public void setCharset(Charset charset) {
}

public void free() {
//we don't actually free the underlying streamFactory as this could be a caching scenario
this.streamFactory = null;
}

Expand Down
Expand Up @@ -155,7 +155,8 @@ protected int primaryHash(String value) {
};

public static final int MAX_STRING_LENGTH = PropertiesUtils.getIntProperty(System.getProperties(), "org.teiid.maxStringLength", 4000); //$NON-NLS-1$
public static final int MAX_LOB_MEMORY_BYTES = Math.max(nextPowOf2(2*MAX_STRING_LENGTH), 1<<13);
public static final int MAX_VARBINARY_BYTES = Math.max(nextPowOf2(2*MAX_STRING_LENGTH), 1<<13);
public static final int MAX_LOB_MEMORY_BYTES = Math.max(nextPowOf2(8*MAX_STRING_LENGTH), 1<<15);

public static int nextPowOf2(int val) {
int result = 1;
Expand Down
Expand Up @@ -94,7 +94,7 @@ private JDBCSQLTypeInfo() {}
addType(DataTypeManager.DefaultDataTypes.TIMESTAMP, 29, 29, DataTypeManager.DefaultDataClasses.TIMESTAMP.getName(), Types.TIMESTAMP);
addType(DataTypeManager.DefaultDataTypes.XML, Integer.MAX_VALUE, Integer.MAX_VALUE, SQLXML.class.getName(), Types.SQLXML);
addType(DataTypeManager.DefaultDataTypes.NULL, 4, 1, null, Types.NULL);
addType(DataTypeManager.DefaultDataTypes.VARBINARY, DataTypeManager.MAX_LOB_MEMORY_BYTES, DataTypeManager.MAX_LOB_MEMORY_BYTES, byte[].class.getName(), Types.VARBINARY, Types.BINARY);
addType(DataTypeManager.DefaultDataTypes.VARBINARY, DataTypeManager.MAX_VARBINARY_BYTES, DataTypeManager.MAX_VARBINARY_BYTES, byte[].class.getName(), Types.VARBINARY, Types.BINARY);

TypeInfo typeInfo = new TypeInfo(Integer.MAX_VALUE, 0, "ARRAY", Array.class.getName(), new int[Types.ARRAY]); //$NON-NLS-1$
CLASSNAME_TO_TYPEINFO.put(Array.class.getName(), typeInfo);
Expand Down
13 changes: 12 additions & 1 deletion common-core/src/main/java/org/teiid/core/types/Streamable.java
Expand Up @@ -143,7 +143,14 @@ public void writeExternal(ObjectOutput out) throws IOException {
MultiArrayOutputStream baos = null;
if (referenceStreamId == null) {
//TODO: detect when this buffering is not necessary
baos = new MultiArrayOutputStream(DataTypeManager.MAX_LOB_MEMORY_BYTES);
if (length > Integer.MAX_VALUE) {
throw new AssertionError("Should not inline a lob of length " + length); //$NON-NLS-1$
}
if (length > 0) {
baos = new MultiArrayOutputStream((int)length);
} else {
baos = new MultiArrayOutputStream(256);
}
DataOutputStream dataOutput = new DataOutputStream(baos);
try {
writeReference(dataOutput);
Expand All @@ -160,6 +167,10 @@ public void writeExternal(ObjectOutput out) throws IOException {
}
}

protected boolean isBinary() {
return true;
}

protected abstract void writeReference(DataOutput out) throws IOException;

}
Expand Up @@ -48,7 +48,7 @@ public Object transformDirect(Object value) throws TransformationException {
BlobType source = (BlobType)value;

try {
byte[] bytes = ObjectConverterUtil.convertToByteArray(source.getBinaryStream(), DataTypeManager.MAX_LOB_MEMORY_BYTES, true);
byte[] bytes = ObjectConverterUtil.convertToByteArray(source.getBinaryStream(), DataTypeManager.MAX_VARBINARY_BYTES, true);
return new BinaryType(bytes);
} catch (SQLException e) {
throw new TransformationException(CorePlugin.Event.TEIID10080, e, CorePlugin.Util.gs(CorePlugin.Event.TEIID10080, new Object[] {getSourceType().getName(), getTargetType().getName()}));
Expand Down
Expand Up @@ -61,7 +61,7 @@ public void cleanup() {
public static PhantomReference<Object> setCleanupReference(Object o, Removable r) {
PhantomCleanupReference ref = new PhantomCleanupReference(o, r);
REFERENCES.add(ref);
doCleanup();
doCleanup(true);
return ref;
}

Expand All @@ -73,8 +73,9 @@ public static void removeCleanupReference(PhantomReference<Object> ref) {
ref.clear();
}

public static void doCleanup() {
for (int i = 0; i < 10; i++) {
public static void doCleanup(boolean quick) {
int max = quick?10:Integer.MAX_VALUE;
for (int i = 0; i < max; i++) {
PhantomCleanupReference ref = (PhantomCleanupReference)QUEUE.poll();
if (ref == null) {
break;
Expand Down
14 changes: 11 additions & 3 deletions engine/src/main/java/org/teiid/common/buffer/FileStore.java
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.atomic.AtomicBoolean;

import org.teiid.common.buffer.AutoCleanupUtil.Removable;
import org.teiid.core.types.DataTypeManager;

public abstract class FileStore implements Removable {

Expand All @@ -43,9 +44,11 @@ public final class FileStoreOutputStream extends OutputStream {
private boolean bytesWritten;
private boolean closed;
private byte[] singleByte = new byte[1];
private int maxSize;

public FileStoreOutputStream(int size) {
this.buffer = new byte[size];
this.maxSize = size;
this.buffer = new byte[Math.min(size, 1<<8)];
}

@Override
Expand All @@ -57,11 +60,16 @@ public void write(int b) throws IOException {
@Override
public void write(byte[] b, int off, int len) throws IOException {
checkOpen();
if (len > buffer.length) {
if (len > maxSize) {
flushBuffer();
writeDirect(b, off, len);
return;
}
if (!bytesWritten && buffer.length < maxSize && count + len > buffer.length) {
byte[] nextBuffer = new byte[Math.min(maxSize, DataTypeManager.nextPowOf2(count + len))];
System.arraycopy(buffer, 0, nextBuffer, 0, count);
buffer = nextBuffer;
}
int bufferedLength = Math.min(len, buffer.length - count);
if (count < buffer.length) {
System.arraycopy(b, off, buffer, count, bufferedLength);
Expand Down Expand Up @@ -90,7 +98,7 @@ public void flushBuffer() throws IOException {

/**
* Return the buffer. Can be null if closed and the underlying filestore
* has been writen to.
* has been written to.
* @return
*/
public byte[] getBuffer() {
Expand Down
Expand Up @@ -51,18 +51,29 @@ public FileStoreInputStreamFactory(FileStore lobBuffer, String encoding) {

@Override
public InputStream getInputStream() {
return getInputStream(0);
return getInputStream(0, -1);
}

public InputStream getInputStream(long start) {
public InputStream getInputStream(long start, long len) {
if (fsos != null && !fsos.bytesWritten()) {
if (start > Integer.MAX_VALUE) {
throw new AssertionError("Invalid start " + start); //$NON-NLS-1$
}
int s = (int)start;
return new ByteArrayInputStream(fsos.getBuffer(), s, fsos.getCount() - s);
int intLen = fsos.getCount() - s;
if (len >= 0) {
intLen = (int)Math.min(len, len);
}
return new ByteArrayInputStream(fsos.getBuffer(), s, intLen);
}
return lobBuffer.createInputStream(start, length);
}

public byte[] getMemoryBytes() {
if (fsos != null && !fsos.bytesWritten() && fsos.getBuffer().length == fsos.getCount()) {
return fsos.getBuffer();
}
return lobBuffer.createInputStream(start);
throw new IllegalStateException("In persistent mode or not closed for writing"); //$NON-NLS-1$
}

@Override
Expand Down
Expand Up @@ -305,6 +305,6 @@ public int getLobCount() {

public void remove() {
this.lobReferences.clear();
//we don't remove the filestore as there could be local connection references to the lob objects
this.lobStore.remove();
}
}
Expand Up @@ -1080,7 +1080,7 @@ int evictFromMemoryBuffer(boolean acquire) {
try {
for (int i = 0; i < EVICTION_SCANS && next == EMPTY_ADDRESS; i++) {
//doing a cleanup may trigger the purging of resources
AutoCleanupUtil.doCleanup();
AutoCleanupUtil.doCleanup(true);
//scan the eviction queue looking for a victim
Iterator<PhysicalInfo> iter = memoryBufferEntries.getEvictionQueue().iterator();
while (((!acquire && lowBlocks(false)) || (acquire && (next = blocksInuse.getAndSetNextClearBit()) == EMPTY_ADDRESS)) && iter.hasNext()) {
Expand Down
Expand Up @@ -104,6 +104,7 @@ public void run() {
this.cancel();
return;
}
AutoCleanupUtil.doCleanup(false);
impl.cleaning.set(true);
try {
long evicted = impl.doEvictions(impl.maxProcessingBytes, true, impl.initialEvictionQueue);
Expand Down
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.teiid.common.buffer.AutoCleanupUtil;
import org.teiid.common.buffer.FileStore;
import org.teiid.common.buffer.StorageManager;
import org.teiid.core.TeiidComponentException;
Expand Down Expand Up @@ -159,7 +160,12 @@ private void setLength(RandomAccessFile fileAccess, long newLength, boolean trun
//this is a weak check, concurrent access may push us over the max. we are just trying to prevent large overage allocations
long used = usedBufferSpace.get() + bytesUsed;
if (used > maxBufferSpace) {
throw new OutOfDiskException(QueryPlugin.Util.getString("FileStoreageManager.space_exhausted", bytesUsed, used, maxBufferSpace)); //$NON-NLS-1$
System.gc(); //attempt a last ditch effort to cleanup
AutoCleanupUtil.doCleanup(false);
used = usedBufferSpace.get() + bytesUsed;
if (used > maxBufferSpace) {
throw new OutOfDiskException(QueryPlugin.Util.getString("FileStoreageManager.space_exhausted", bytesUsed, used, maxBufferSpace)); //$NON-NLS-1$
}
}
}
fileAccess.setLength(newLength);
Expand All @@ -168,9 +174,14 @@ private void setLength(RandomAccessFile fileAccess, long newLength, boolean trun
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "sampling bytes used:", used); //$NON-NLS-1$
}
if (bytesUsed > 0 && used > maxBufferSpace) {
fileAccess.setLength(currentLength);
usedBufferSpace.addAndGet(-bytesUsed);
throw new OutOfDiskException(QueryPlugin.Util.getString("FileStoreageManager.space_exhausted", bytesUsed, used, maxBufferSpace)); //$NON-NLS-1$
System.gc(); //attempt a last ditch effort to cleanup
AutoCleanupUtil.doCleanup(false);
used = usedBufferSpace.get();
if (used > maxBufferSpace) {
fileAccess.setLength(currentLength);
usedBufferSpace.addAndGet(-bytesUsed);
throw new OutOfDiskException(QueryPlugin.Util.getString("FileStoreageManager.space_exhausted", bytesUsed, used, maxBufferSpace)); //$NON-NLS-1$
}
}
}

Expand Down
Expand Up @@ -80,7 +80,7 @@ public int getBytes() {
private static Set<Class<?>> VARIABLE_SIZE_TYPES = new HashSet<Class<?>>();
static {
SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.STRING, new int[] {100, Math.max(100, DataTypeManager.nextPowOf2(DataTypeManager.MAX_STRING_LENGTH/16))});
SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.VARBINARY, new int[] {100, Math.max(100, DataTypeManager.MAX_LOB_MEMORY_BYTES/32)});
SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.VARBINARY, new int[] {100, Math.max(100, DataTypeManager.MAX_VARBINARY_BYTES/32)});
SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.DATE, new int[] {20, 28});
SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.TIME, new int[] {20, 28});
SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.TIMESTAMP, new int[] {20, 28});
Expand Down
Expand Up @@ -67,7 +67,7 @@ public InputStream getInputStream() throws IOException {
SaveOnReadInputStream.this.fsos.flush();
long start = SaveOnReadInputStream.this.fsisf.getLength();
SaveOnReadInputStream.this.close(); //force the pending read
InputStream is = SaveOnReadInputStream.this.fsisf.getInputStream(start);
InputStream is = SaveOnReadInputStream.this.fsisf.getInputStream(start, -1);
sis.setIn(is);
}
return fsisf.getInputStream();
Expand Down
4 changes: 2 additions & 2 deletions engine/src/main/java/org/teiid/query/eval/Evaluator.java
Expand Up @@ -940,11 +940,11 @@ private Object evaluateXMLQuery(List<?> tuple, XMLQuery xmlQuery, boolean exists
}
type = Type.CONTENT;
}
XMLType val = rp.concat.close();
XMLType val = rp.concat.close(context);
val.setType(rp.type);
return val;
}
return xmlQuery.getXQueryExpression().createXMLType(result.iter, this.context.getBufferManager(), emptyOnEmpty);
return xmlQuery.getXQueryExpression().createXMLType(result.iter, this.context.getBufferManager(), emptyOnEmpty, context);
} catch (TeiidProcessingException e) {
throw new FunctionExecutionException(QueryPlugin.Event.TEIID30333, e, QueryPlugin.Util.gs(QueryPlugin.Event.TEIID30333, e.getMessage()));
} catch (XPathException e) {
Expand Down
Expand Up @@ -68,7 +68,7 @@ public Object getResult(CommandContext commandContext) throws TeiidComponentExce
if (concat == null) {
return null;
}
result = concat.close();
result = concat.close(commandContext);
concat = null;
}
return result;
Expand Down

0 comments on commit adcbc36

Please sign in to comment.