Skip to content

Commit

Permalink
TEIID-5093 adding explicit streaming to s3
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Oct 24, 2017
1 parent cf1478f commit b1c4496
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 25 deletions.
Expand Up @@ -253,6 +253,11 @@ private void addGetTextFileMethod(MetadataFactory metadataFactory) {
param = metadataFactory.addProcedureParameter("encryptionkey", TypeFacility.RUNTIME_NAMES.STRING, Type.In, p); //$NON-NLS-1$
param.setAnnotation("Server side encryption key to decrypt the object"); //$NON-NLS-1$
param.setNullType(NullType.Nullable);

param = metadataFactory.addProcedureParameter("stream", TypeFacility.RUNTIME_NAMES.BOOLEAN, Type.In, p); //$NON-NLS-1$
param.setAnnotation("If the result should be streamed."); //$NON-NLS-1$
param.setNullType(NullType.Nullable);
param.setDefaultValue("false"); //$NON-NLS-1$

metadataFactory.addProcedureResultSetColumn("file", TypeFacility.RUNTIME_NAMES.CLOB, p); //$NON-NLS-1$
metadataFactory.addProcedureResultSetColumn("endpoint", TypeFacility.RUNTIME_NAMES.STRING, p); //$NON-NLS-1$
Expand All @@ -278,6 +283,11 @@ private void addGetFileMethod(MetadataFactory metadataFactory) {
param.setAnnotation("Server side encryption key to decrypt the object"); //$NON-NLS-1$
param.setNullType(NullType.Nullable);

param = metadataFactory.addProcedureParameter("stream", TypeFacility.RUNTIME_NAMES.BOOLEAN, Type.In, p); //$NON-NLS-1$
param.setAnnotation("If the result should be streamed."); //$NON-NLS-1$
param.setNullType(NullType.Nullable);
param.setDefaultValue("false"); //$NON-NLS-1$

metadataFactory.addProcedureResultSetColumn("file", TypeFacility.RUNTIME_NAMES.BLOB, p); //$NON-NLS-1$
metadataFactory.addProcedureResultSetColumn("endpoint", TypeFacility.RUNTIME_NAMES.STRING, p); //$NON-NLS-1$
metadataFactory.addProcedureResultSetColumn("lastModified", TypeFacility.RUNTIME_NAMES.TIMESTAMP, p); //$NON-NLS-1$
Expand All @@ -287,7 +297,7 @@ private void addGetFileMethod(MetadataFactory metadataFactory) {

@Override
public boolean areLobsUsableAfterClose() {
return true;
return false;
}

}
Expand Up @@ -42,6 +42,7 @@
import org.teiid.core.types.BlobType;
import org.teiid.core.types.ClobImpl;
import org.teiid.core.types.ClobType;
import org.teiid.core.types.InputStreamFactory;
import org.teiid.core.types.InputStreamFactory.BlobInputStreamFactory;
import org.teiid.core.types.XMLType;
import org.teiid.core.util.ObjectConverterUtil;
Expand Down Expand Up @@ -72,6 +73,7 @@ public class S3ProcedureExecution implements ProcedureExecution {
private BinaryWSProcedureExecution execution = null;
boolean isText = false;
boolean isList = false;
boolean streaming = false;

public S3ProcedureExecution(Call command, S3ExecutionFactory ef, RuntimeMetadata metadata, ExecutionContext ec,
WSConnection conn) {
Expand Down Expand Up @@ -197,7 +199,7 @@ private BinaryWSProcedureExecution saveFile(List<Argument> arguments) throws Tra
headers.put("Content-Type", "application/octet-stream");

LogManager.logDetail(LogConstants.CTX_WS, "Saving", endpoint); //$NON-NLS-1$
return invokeHTTP("PUT", endpoint, new BlobType(contents), headers,true);
return invokeHTTP("PUT", endpoint, new BlobType(contents), headers);
} catch (SQLException | IOException e) {
throw new TranslatorException(e);
}
Expand All @@ -213,6 +215,11 @@ private BinaryWSProcedureExecution getFile(List<Argument> arguments) throws Tran

String encryption = (String)arguments.get(6).getArgumentValue().getValue();
String encryptionKey = (String)arguments.get(7).getArgumentValue().getValue();
Boolean isStreaming = (Boolean)arguments.get(8).getArgumentValue().getValue();

if (isStreaming != null) {
this.streaming = isStreaming;
}

if (bucket == null) {
bucket = this.ef.getBucket();
Expand Down Expand Up @@ -263,7 +270,7 @@ private BinaryWSProcedureExecution getFile(List<Argument> arguments) throws Tran
}

LogManager.logDetail(LogConstants.CTX_WS, "Getting", endpoint); //$NON-NLS-1$
return invokeHTTP("GET", endpoint, null, headers,true);
return invokeHTTP("GET", endpoint, null, headers);
} catch (MalformedURLException | NoSuchAlgorithmException e) {
throw new TranslatorException(e);
}
Expand Down Expand Up @@ -313,7 +320,7 @@ private BinaryWSProcedureExecution deleteFile(List<Argument> arguments) throws T
}
headers.put("Content-Type", "text/plain");
LogManager.logDetail(LogConstants.CTX_WS, "Deleting", endpoint); //$NON-NLS-1$
return invokeHTTP("DELETE", endpoint, null, headers,true);
return invokeHTTP("DELETE", endpoint, null, headers);
} catch (MalformedURLException e) {
throw new TranslatorException(e);
}
Expand Down Expand Up @@ -368,14 +375,14 @@ private BinaryWSProcedureExecution listBucket(List<Argument> arguments) throws T
this.isText = true;
this.isList = true;
LogManager.logDetail(LogConstants.CTX_WS, "Getting", endpoint); //$NON-NLS-1$
return invokeHTTP("GET", endpoint, null, headers, true);
return invokeHTTP("GET", endpoint, null, headers);
} catch (MalformedURLException e) {
throw new TranslatorException(e);
}
}

protected BinaryWSProcedureExecution invokeHTTP(String method,
String uri, Object payload, Map<String, String> headers, boolean streaming)
String uri, Object payload, Map<String, String> headers)
throws TranslatorException {

Map<String, List<String>> targetHeaders = new HashMap<String, List<String>>();
Expand All @@ -393,7 +400,7 @@ protected BinaryWSProcedureExecution invokeHTTP(String method,
parameters.add(new Argument(Direction.IN, new Literal(method, TypeFacility.RUNTIME_TYPES.STRING), null));
parameters.add(new Argument(Direction.IN, new Literal(payload, TypeFacility.RUNTIME_TYPES.OBJECT), null));
parameters.add(new Argument(Direction.IN, new Literal(uri, TypeFacility.RUNTIME_TYPES.STRING), null));
parameters.add(new Argument(Direction.IN, new Literal(streaming, TypeFacility.RUNTIME_TYPES.BOOLEAN), null));
parameters.add(new Argument(Direction.IN, new Literal(true, TypeFacility.RUNTIME_TYPES.BOOLEAN), null));
//the engine currently always associates out params at resolve time even if the
// values are not directly read by the call
parameters.add(new Argument(Direction.OUT, TypeFacility.RUNTIME_TYPES.STRING, null));
Expand Down Expand Up @@ -460,8 +467,15 @@ public List<?> next() throws TranslatorException, DataNotAvailableException {
ClobImpl clob = new ClobImpl(isf, -1);
clob.setCharset(Charset.forName(this.ef.getEncoding()));
value = new ClobType(clob);
if (!streaming) {
value = new InputStreamFactory.ClobInputStreamFactory(clob);
}
} else {
value = new BlobType(contents);
if (streaming) {
value = new BlobType(contents);
} else {
value = isf;
}
}

String lastModified = getHeader("Last-Modified");
Expand Down
Expand Up @@ -43,16 +43,8 @@
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.core.types.BlobImpl;
import org.teiid.core.types.BlobType;
import org.teiid.core.types.DataTypeManager;
import org.teiid.core.types.InputStreamFactory;
import org.teiid.core.types.*;
import org.teiid.core.types.InputStreamFactory.StorageMode;
import org.teiid.core.types.SQLXMLImpl;
import org.teiid.core.types.StandardXMLTranslator;
import org.teiid.core.types.Streamable;
import org.teiid.core.types.TransformationException;
import org.teiid.core.types.XMLType;
import org.teiid.core.util.Assertion;
import org.teiid.core.util.ReaderInputStream;
import org.teiid.dqp.internal.process.SaveOnReadInputStream;
Expand Down Expand Up @@ -633,26 +625,31 @@ private List<?> correctTypes(List row) throws TeiidException {

static Object convertToRuntimeType(BufferManager bm, Object value, Class<?> desiredType, CommandContext context) throws TransformationException {
if (desiredType != DataTypeManager.DefaultDataClasses.XML || !(value instanceof Source)) {
if (value instanceof InputStreamFactory) {
return new BlobType(new BlobImpl((InputStreamFactory)value));
}
if (value instanceof DataSource) {
final DataSource ds = (DataSource)value;
try {
//Teiid uses the datasource interface in a degenerate way that
//reuses the stream, so we test for that here
InputStream initial = ds.getInputStream();
InputStream other = ds.getInputStream();
if (initial != other) {
InputStream other = null;
try {
other = ds.getInputStream();
} catch (IOException e) {
//likely streaming
}
if (other != null && initial != other) {
initial.close();
other.close();
return new BlobType(new BlobImpl(new InputStreamFactory() {
if (value instanceof InputStreamFactory) {
return asLob((InputStreamFactory)value, desiredType);
}
return asLob(new InputStreamFactory() {

@Override
public InputStream getInputStream() throws IOException {
return ds.getInputStream();
}
}));
}, desiredType);
}
FileStore fs = bm.createFileStore("bytes"); //$NON-NLS-1$
//TODO: guess at the encoding from the content type
Expand All @@ -662,11 +659,14 @@ public InputStream getInputStream() throws IOException {
if (context != null) {
context.addCreatedLob(fsisf);
}
return new BlobType(new BlobImpl(is.getInputStreamFactory()));
return asLob(is.getInputStreamFactory(), desiredType);
} catch (IOException e) {
throw new TransformationException(QueryPlugin.Event.TEIID30500, e, e.getMessage());
}
}
if (value instanceof InputStreamFactory) {
return asLob((InputStreamFactory)value, desiredType);
}
if (value instanceof GeometryInputSource) {
GeometryInputSource gis = (GeometryInputSource)value;
try {
Expand Down Expand Up @@ -736,6 +736,14 @@ public InputStream getInputStream() throws IOException {
return DataTypeManager.convertToRuntimeType(value, desiredType != DataTypeManager.DefaultDataClasses.OBJECT);
}

private static Object asLob(InputStreamFactory value, Class<?> desiredType) {
if (desiredType == DataTypeManager.DefaultDataClasses.CLOB) {
//assumes UTF-8
return new ClobType(new ClobImpl(value, -1));
}
return new BlobType(new BlobImpl(value));
}

@Override
public int hashCode() {
final int prime = 31;
Expand Down
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.StringReader;
import java.sql.Clob;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -283,6 +284,21 @@ public Xid getXid() {
assertArrayEquals(bytes, ObjectConverterUtil.convertToByteArray(blob.getBinaryStream()));
}

@Test public void testTypeConversionClob() throws Exception {
BufferManager bm = BufferManagerFactory.getStandaloneBufferManager();

String str = "hello world";

Clob clob = (Clob) ConnectorWorkItem.convertToRuntimeType(bm, new InputStreamFactory() {
@Override
public InputStream getInputStream() throws IOException {
return new ByteArrayInputStream(str.getBytes(Streamable.CHARSET));
}
}, DataTypeManager.DefaultDataClasses.CLOB, null);

assertEquals(str, clob.getSubString(1, str.length()));
}

@Test public void testLobs() throws Exception {
BufferManager bm = BufferManagerFactory.getStandaloneBufferManager();
final List<Object> result = Arrays.asList(AutoGenDataService.CLOB_VAL);
Expand Down

0 comments on commit b1c4496

Please sign in to comment.