Skip to content

Commit

Permalink
Merge pull request #77 from rareddy/TEIID-2513
Browse files Browse the repository at this point in the history
TEIID-2517: adding support for return of complex types from procedure ex...
  • Loading branch information
rareddy committed May 28, 2013
2 parents 82dee89 + 2be737e commit dcf7d54
Show file tree
Hide file tree
Showing 6 changed files with 431 additions and 332 deletions.
Expand Up @@ -3,17 +3,17 @@
* See the COPYRIGHT.txt file distributed with this work for information
* regarding copyright ownership. Some portions may be licensed
* to Red Hat, Inc. under one or more contributor license agreements.
*
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
Expand Down Expand Up @@ -41,13 +41,18 @@
import org.odata4j.core.OEntity;
import org.odata4j.core.OError;
import org.odata4j.core.OProperty;
import org.odata4j.edm.EdmComplexType;
import org.odata4j.edm.EdmDataServices;
import org.odata4j.format.Entry;
import org.odata4j.format.Feed;
import org.odata4j.format.FormatParser;
import org.odata4j.format.FormatParserFactory;
import org.odata4j.format.FormatType;
import org.odata4j.format.Settings;
import org.odata4j.format.xml.AtomFeedFormatParser;
import org.odata4j.stax2.XMLEvent2;
import org.odata4j.stax2.XMLEventReader2;
import org.odata4j.stax2.util.StaxUtil;
import org.teiid.language.Argument;
import org.teiid.language.Argument.Direction;
import org.teiid.language.Call;
Expand All @@ -59,7 +64,6 @@
import org.teiid.translator.TranslatorException;
import org.teiid.translator.TypeFacility;
import org.teiid.translator.WSConnection;
import org.teiid.translator.odata.ODataSQLVisitor.Entity;
import org.teiid.translator.ws.BinaryWSProcedureExecution;

public class BaseQueryExecution {
Expand All @@ -68,14 +72,14 @@ public class BaseQueryExecution {
protected RuntimeMetadata metadata;
protected ExecutionContext executionContext;
protected EdmDataServices edsMetadata;

public BaseQueryExecution(ODataExecutionFactory translator, ExecutionContext executionContext,
RuntimeMetadata metadata, WSConnection connection, EdmDataServices edsMetadata) {
this.metadata = metadata;
this.executionContext = executionContext;
this.translator = translator;
this.connection = connection;
this.edsMetadata = edsMetadata;
this.edsMetadata = edsMetadata;
}

protected Feed parse(Blob blob, ODataVersion version, String entityTable) throws TranslatorException {
Expand All @@ -87,36 +91,77 @@ protected Feed parse(Blob blob, ODataVersion version, String entityTable) throws
return parser.parse(new InputStreamReader(blob.getBinaryStream()));
} catch (SQLException e) {
throw new TranslatorException(ODataPlugin.Event.TEIID17010, e, e.getMessage());
}
}
}

protected static ODataVersion getDataServiceVersion(String headerValue) {
ODataVersion version = ODataConstants.DATA_SERVICE_VERSION;
if (headerValue != null) {
String[] str = headerValue.split(";"); //$NON-NLS-1$
version = ODataVersion.parse(str[0]);
}
return version;
}
}

protected ODataEntitiesResponse executeWithReturnEntity(String method, String uri, String payload, String entityTable, String eTag, Status... expectedStatus) throws TranslatorException {
Map<String, List<String>> headers = getDefaultHeaders();
if (eTag != null) {
headers.put("If-Match", Arrays.asList(eTag)); //$NON-NLS-1$
}

if (payload != null) {
headers.put("Content-Type", Arrays.asList("application/atom+xml")); //$NON-NLS-1$ //$NON-NLS-2$
}

BinaryWSProcedureExecution execution = executeDirect(method, uri, payload, headers);
for (Status status:expectedStatus) {
if (status.getStatusCode() == execution.getResponseCode()) {
if (execution.getResponseCode() != Status.NO_CONTENT.getStatusCode()) {
Blob blob = (Blob)execution.getOutputParameterValues().get(0);
ODataVersion version = getDataServiceVersion((String)execution.getResponseHeader(ODataConstants.Headers.DATA_SERVICE_VERSION));
ODataVersion version = getDataServiceVersion((String)execution.getResponseHeader(ODataConstants.Headers.DATA_SERVICE_VERSION));
Feed feed = parse(blob, version, entityTable);
return new ODataEntitiesResponse(uri, feed, entityTable);
return new ODataEntitiesResponse(uri, feed, entityTable);
}
// this is success with no-data
return new ODataEntitiesResponse();
}
}
// throw an error
return new ODataEntitiesResponse(buildError(execution));
}

protected ODataEntitiesResponse executeWithComplexReturn(String method, String uri, String payload, String complexTypeName, String eTag, Status... expectedStatus) throws TranslatorException {
Map<String, List<String>> headers = getDefaultHeaders();
if (eTag != null) {
headers.put("If-Match", Arrays.asList(eTag)); //$NON-NLS-1$
}

if (payload != null) {
headers.put("Content-Type", Arrays.asList("application/atom+xml")); //$NON-NLS-1$ //$NON-NLS-2$
}

BinaryWSProcedureExecution execution = executeDirect(method, uri, payload, headers);
for (Status status:expectedStatus) {
if (status.getStatusCode() == execution.getResponseCode()) {
if (execution.getResponseCode() != Status.NO_CONTENT.getStatusCode()) {
Blob blob = (Blob)execution.getOutputParameterValues().get(0);
ODataVersion version = getDataServiceVersion((String)execution.getResponseHeader(ODataConstants.Headers.DATA_SERVICE_VERSION));

EdmComplexType complexType = null;
for (EdmComplexType ct:this.edsMetadata.getComplexTypes()) {
if (ct.getFullyQualifiedTypeName().equals(complexTypeName)) {
complexType = ct;
break;
}
}
if (complexType == null) {
throw new RuntimeException("Could not derive the complex name " + complexType);
}
try {
return parserComplex(StaxUtil.newXMLEventReader(new InputStreamReader(blob.getBinaryStream())), complexType);
} catch (SQLException e) {
throw new TranslatorException(ODataPlugin.Event.TEIID17010, e, e.getMessage());
}
}
// this is success with no-data
return new ODataEntitiesResponse();
Expand All @@ -126,6 +171,14 @@ protected ODataEntitiesResponse executeWithReturnEntity(String method, String ur
return new ODataEntitiesResponse(buildError(execution));
}

private ODataEntitiesResponse parserComplex(XMLEventReader2 reader, EdmComplexType complexType) {
XMLEvent2 event = reader.nextEvent();
while (!event.isStartElement()) {
event = reader.nextEvent();
}
return new ODataEntitiesResponse(AtomFeedFormatParser.parseProperties(reader, event.asStartElement(), this.edsMetadata, complexType).iterator());
}

protected TranslatorException buildError(BinaryWSProcedureExecution execution) {
// do some error handling
try {
Expand All @@ -138,7 +191,7 @@ protected TranslatorException buildError(BinaryWSProcedureExecution execution) {
return new TranslatorException(t);
}
}

protected BinaryWSProcedureExecution executeDirect(String method, String uri, String payload, Map<String, List<String>> headers) throws TranslatorException {
try {
LogManager.logDetail(LogConstants.CTX_ODATA, "Source-URL=", URLDecoder.decode(uri, "UTF-8")); //$NON-NLS-1$ //$NON-NLS-2$
Expand All @@ -152,60 +205,65 @@ protected BinaryWSProcedureExecution executeDirect(String method, String uri, St
parameters.add(new Argument(Direction.IN, new Literal(true, TypeFacility.RUNTIME_TYPES.BOOLEAN), TypeFacility.RUNTIME_TYPES.BOOLEAN, null));

Call call = this.translator.getLanguageFactory().createCall(ODataExecutionFactory.INVOKE_HTTP, parameters, null);

BinaryWSProcedureExecution execution = new BinaryWSProcedureExecution(call, this.metadata, this.executionContext, null, this.connection);
execution.setUseResponseContext(true);
execution.setAlwaysAllowPayloads(true);

for (String header:headers.keySet()) {
execution.addHeader(header, headers.get(header));
}
execution.execute();
execution.execute();
return execution;
}

protected Map<String, List<String>> getDefaultHeaders() {
Map<String, List<String>> headers = new HashMap<String, List<String>>();
headers.put("Accept", Arrays.asList(FormatType.ATOM.getAcceptableMediaTypes())); //$NON-NLS-1$
headers.put("Content-Type", Arrays.asList("application/xml")); //$NON-NLS-1$ //$NON-NLS-2$
return headers;
}
}

class ODataEntitiesResponse {
private Feed feed;
private String uri;
private Iterator<Entry> rowIter;
private String entityTypeName;
private TranslatorException exception;
private Status[] acceptedStatus;

private Iterator<OProperty<?>> complexValues;

public ODataEntitiesResponse(String uri, Feed feed, String entityTypeName, Status... accptedStatus) {
this.uri = uri;
this.feed = feed;
this.entityTypeName = entityTypeName;
this.rowIter = this.feed.getEntries().iterator();
this.acceptedStatus = accptedStatus;
}

public ODataEntitiesResponse(TranslatorException ex) {
this.exception = ex;
}

public ODataEntitiesResponse() {
}

}

public ODataEntitiesResponse(Iterator<OProperty<?>> complexValues) {
this.complexValues = complexValues;
}

public boolean hasRow() {
return (this.rowIter != null && this.rowIter.hasNext());
}

public boolean hasError() {
return exception != null;
return this.exception != null;
}

public TranslatorException getError() {
return this.exception;
}

public List<?> getNextRow(String[] columnNames, String[] embeddedColumnName, Class<?>[] expectedType) throws TranslatorException {
if (this.rowIter != null && this.rowIter.hasNext()) {
OEntity entity = this.rowIter.next().getEntity();
Expand All @@ -221,61 +279,73 @@ public List<?> getNextRow(String[] columnNames, String[] embeddedColumnName, Cla
}
}
}
results.add(translator.retrieveValue(value, expectedType[i]));
results.add(BaseQueryExecution.this.translator.retrieveValue(value, expectedType[i]));
}
fetchNextBatch(!this.rowIter.hasNext());
return results;
}

else if (this.complexValues != null) {
HashMap<String, Object> values = new HashMap<String, Object>();
while(this.complexValues.hasNext()) {
OProperty prop = this.complexValues.next();
values.put(prop.getName(), prop.getValue());
}
ArrayList results = new ArrayList();
for (int i = 0; i < columnNames.length; i++) {
results.add(BaseQueryExecution.this.translator.retrieveValue(values.get(columnNames[i]), expectedType[i]));
}
this.complexValues = null;
return results;
}
return null;
}

// TODO:there is possibility here to async execute this feed
private void fetchNextBatch(boolean fetch) throws TranslatorException {
if (!fetch) {
return;
}

String next = this.feed.getNext();
if (next == null) {
this.feed = null;
this.rowIter = null;
return;
}

int idx = next.indexOf("$skiptoken="); //$NON-NLS-1$
if (idx != -1) {

String skip = null;
try {
skip = next.substring(idx + 11);
skip = URLDecoder.decode(skip, Charsets.Upper.UTF_8);
} catch (UnsupportedEncodingException e) {
throw new TranslatorException(e);
}
String nextUri = uri;
if (uri.indexOf('?') == -1) {
nextUri = uri + "?$skiptoken="+skip; //$NON-NLS-1$

String nextUri = this.uri;
if (this.uri.indexOf('?') == -1) {
nextUri = this.uri + "?$skiptoken="+skip; //$NON-NLS-1$
}
else {
nextUri = uri + "&$skiptoken="+skip; //$NON-NLS-1$
nextUri = this.uri + "&$skiptoken="+skip; //$NON-NLS-1$
}
BinaryWSProcedureExecution execution = executeDirect("GET", nextUri, null, getDefaultHeaders()); //$NON-NLS-1$
validateResponse(execution);
Blob blob = (Blob)execution.getOutputParameterValues().get(0);
ODataVersion version = getDataServiceVersion((String)execution.getResponseHeader(ODataConstants.Headers.DATA_SERVICE_VERSION));
this.feed = parse(blob, version, this.entityTypeName);

this.feed = parse(blob, version, this.entityTypeName);
this.rowIter = this.feed.getEntries().iterator();

} else if (next.toLowerCase().startsWith("http")) { //$NON-NLS-1$
BinaryWSProcedureExecution execution = executeDirect("GET", next, null, getDefaultHeaders()); //$NON-NLS-1$
validateResponse(execution);
Blob blob = (Blob)execution.getOutputParameterValues().get(0);
ODataVersion version = getDataServiceVersion((String)execution.getResponseHeader(ODataConstants.Headers.DATA_SERVICE_VERSION));
this.feed = parse(blob, version, this.entityTypeName);

this.feed = parse(blob, version, this.entityTypeName);
this.rowIter = this.feed.getEntries().iterator();
} else {
throw new TranslatorException(ODataPlugin.Util.gs(ODataPlugin.Event.TEIID17001, next));
Expand All @@ -289,9 +359,9 @@ private void validateResponse(BinaryWSProcedureExecution execution) throws Trans
}
}
}

Iterator<Entry> getResultsIter(){
return this.rowIter;
}
}
}
}

0 comments on commit dcf7d54

Please sign in to comment.