From da04baca61d79bc46afb4648f9daae86c7186784 Mon Sep 17 00:00:00 2001 From: Masha Basmanova Date: Thu, 21 Sep 2023 07:12:27 -0400 Subject: [PATCH] Allow Presto clients to receive query results in binary format --- .../facebook/presto/cli/AbstractCliTest.java | 1 + .../facebook/presto/client/QueryResults.java | 18 ++++- .../main/sphinx/develop/client-protocol.rst | 5 ++ .../presto/jdbc/TestProgressMonitor.java | 1 + .../protocol/ExecutingStatementResource.java | 4 +- .../presto/server/protocol/Query.java | 75 +++++++++++++----- .../server/protocol/QueryResourceUtil.java | 1 + .../protocol/QueuedStatementResource.java | 50 +++++++----- .../facebook/presto/server/TestServer.java | 79 ++++++++++++++++++- 9 files changed, 190 insertions(+), 44 deletions(-) diff --git a/presto-cli/src/test/java/com/facebook/presto/cli/AbstractCliTest.java b/presto-cli/src/test/java/com/facebook/presto/cli/AbstractCliTest.java index 80b77846a93d..97df67ae1449 100644 --- a/presto-cli/src/test/java/com/facebook/presto/cli/AbstractCliTest.java +++ b/presto-cli/src/test/java/com/facebook/presto/cli/AbstractCliTest.java @@ -96,6 +96,7 @@ protected QueryResults createMockQueryResults() null, ImmutableList.of(new Column("_col0", BigintType.BIGINT)), ImmutableList.of(ImmutableList.of(123)), + null, StatementStats.builder().setState("FINISHED").build(), null, ImmutableList.of(), diff --git a/presto-client/src/main/java/com/facebook/presto/client/QueryResults.java b/presto-client/src/main/java/com/facebook/presto/client/QueryResults.java index b6da91b86f82..58ae6f50dbfd 100644 --- a/presto-client/src/main/java/com/facebook/presto/client/QueryResults.java +++ b/presto-client/src/main/java/com/facebook/presto/client/QueryResults.java @@ -41,6 +41,7 @@ public class QueryResults private final URI nextUri; private final List columns; private final Iterable> data; + private final Iterable binaryData; private final StatementStats stats; private final QueryError error; private final List warnings; @@ -55,6 +56,7 @@ public QueryResults( @JsonProperty("nextUri") URI nextUri, @JsonProperty("columns") List columns, @JsonProperty("data") List> data, + @JsonProperty("binaryData") List binaryData, @JsonProperty("stats") StatementStats stats, @JsonProperty("error") QueryError error, @JsonProperty("warnings") List warnings, @@ -68,6 +70,7 @@ public QueryResults( nextUri, columns, fixData(columns, data), + binaryData, stats, error, firstNonNull(warnings, ImmutableList.of()), @@ -82,6 +85,7 @@ public QueryResults( URI nextUri, List columns, Iterable> data, + Iterable binaryData, StatementStats stats, QueryError error, List warnings, @@ -94,7 +98,8 @@ public QueryResults( this.nextUri = nextUri; this.columns = (columns != null) ? ImmutableList.copyOf(columns) : null; this.data = (data != null) ? unmodifiableIterable(data) : null; - checkArgument(data == null || columns != null, "data present without columns"); + this.binaryData = (binaryData != null) ? unmodifiableIterable(binaryData) : null; + checkArgument((data == null && binaryData == null) || columns != null, "data present without columns"); this.stats = requireNonNull(stats, "stats is null"); this.error = error; this.warnings = ImmutableList.copyOf(requireNonNull(warnings, "warnings is null")); @@ -169,6 +174,16 @@ public Iterable> getData() return data; } + /** + * Returns an iterator to the payload (results) in binary format + */ + @Nullable + @JsonProperty + public Iterable getBinaryData() + { + return binaryData; + } + /** * Returns cumulative statistics on the query being executed * @return {@link com.facebook.presto.client.StatementStats} @@ -237,6 +252,7 @@ public String toString() .add("nextUri", nextUri) .add("columns", columns) .add("hasData", data != null) + .add("hasBinaryData", binaryData != null) .add("stats", stats) .add("error", error) .add("updateType", updateType) diff --git a/presto-docs/src/main/sphinx/develop/client-protocol.rst b/presto-docs/src/main/sphinx/develop/client-protocol.rst index 57fde6bfd1ce..c5ebed6765ee 100644 --- a/presto-docs/src/main/sphinx/develop/client-protocol.rst +++ b/presto-docs/src/main/sphinx/develop/client-protocol.rst @@ -35,6 +35,11 @@ the columns returned by the query. Most of the response headers should be treat browser cookies by the client, and echoed back as request headers in subsequent client requests, as documented below. +To request the results in binary format, include binaryResults=true query parameter in the initial +``/v1/statement`` ``POST`` request. The response JSON document will contain ``binaryData`` field +with a list of base64-encoded pages in :doc:`SerializedPage ` format. The +``data`` field will not be present. + If the JSON document returned by the ``POST`` to ``/v1/statement`` does not contain a ``nextUri`` link, the query has completed, either successfully or unsuccessfully, and no additional requests need to be made. If the ``nextUri`` link is present in the document, there are more query results to be fetched. The client should loop executing a ``GET`` request diff --git a/presto-jdbc/src/test/java/com/facebook/presto/jdbc/TestProgressMonitor.java b/presto-jdbc/src/test/java/com/facebook/presto/jdbc/TestProgressMonitor.java index 5adb1d59fd88..93e319e0b5c9 100644 --- a/presto-jdbc/src/test/java/com/facebook/presto/jdbc/TestProgressMonitor.java +++ b/presto-jdbc/src/test/java/com/facebook/presto/jdbc/TestProgressMonitor.java @@ -90,6 +90,7 @@ private String newQueryResults(Integer partialCancelId, Integer nextUriId, List< nextUriId == null ? null : server.url(format("/v1/statement/%s/%s", queryId, nextUriId)).uri(), responseColumns, data, + null, StatementStats.builder() .setState(state) .setWaitingForPrerequisites(state.equals("WAITING_FOR_PREREQUISITES")) diff --git a/presto-main/src/main/java/com/facebook/presto/server/protocol/ExecutingStatementResource.java b/presto-main/src/main/java/com/facebook/presto/server/protocol/ExecutingStatementResource.java index 97cdb16608df..b6f951bc58d7 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/protocol/ExecutingStatementResource.java +++ b/presto-main/src/main/java/com/facebook/presto/server/protocol/ExecutingStatementResource.java @@ -29,6 +29,7 @@ import javax.annotation.security.RolesAllowed; import javax.inject.Inject; import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.HeaderParam; import javax.ws.rs.Path; @@ -99,6 +100,7 @@ public void getQueryResults( @QueryParam("slug") String slug, @QueryParam("maxWait") Duration maxWait, @QueryParam("targetResultSize") DataSize targetResultSize, + @DefaultValue("false") @QueryParam("binaryResults") boolean binaryResults, @HeaderParam(X_FORWARDED_PROTO) String proto, @HeaderParam(PRESTO_PREFIX_URL) String xPrestoPrefixUrl, @Context UriInfo uriInfo, @@ -125,7 +127,7 @@ public void getQueryResults( acquirePermitAsync, acquirePermitTimeSeconds -> { queryRateLimiter.addRateLimiterBlockTime(new Duration(acquirePermitTimeSeconds, SECONDS)); - return query.waitForResults(token, uriInfo, effectiveFinalProto, wait, effectiveFinalTargetResultSize); + return query.waitForResults(token, uriInfo, effectiveFinalProto, wait, effectiveFinalTargetResultSize, binaryResults); }, responseExecutor); ListenableFuture queryResultsFuture = transform( diff --git a/presto-main/src/main/java/com/facebook/presto/server/protocol/Query.java b/presto-main/src/main/java/com/facebook/presto/server/protocol/Query.java index c9c5ef905140..0cc94459c2ee 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/protocol/Query.java +++ b/presto-main/src/main/java/com/facebook/presto/server/protocol/Query.java @@ -50,6 +50,7 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import io.airlift.slice.DynamicSliceOutput; import io.airlift.units.DataSize; import io.airlift.units.Duration; @@ -61,6 +62,7 @@ import javax.ws.rs.core.UriInfo; import java.net.URI; +import java.util.Base64; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -81,6 +83,7 @@ import static com.facebook.presto.execution.QueryState.WAITING_FOR_PREREQUISITES; import static com.facebook.presto.server.protocol.QueryResourceUtil.toStatementStats; import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; +import static com.facebook.presto.spi.page.PagesSerdeUtil.writeSerializedPage; import static com.facebook.presto.util.Failures.toFailure; import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Preconditions.checkArgument; @@ -94,6 +97,7 @@ class Query { private static final Logger log = Logger.get(Query.class); + private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder(); private final QueryManager queryManager; private final TransactionManager transactionManager; @@ -307,7 +311,7 @@ public synchronized Set getRemovedSessionFunctions() return removedSessionFunctions; } - public synchronized ListenableFuture waitForResults(long token, UriInfo uriInfo, String scheme, Duration wait, DataSize targetResultSize) + public synchronized ListenableFuture waitForResults(long token, UriInfo uriInfo, String scheme, Duration wait, DataSize targetResultSize, boolean binaryResults) { // before waiting, check if this request has already been processed and cached Optional cachedResult = getCachedResult(token); @@ -323,7 +327,7 @@ public synchronized ListenableFuture waitForResults(long token, Ur timeoutExecutor); // when state changes, fetch the next result - return Futures.transform(futureStateChange, ignored -> getNextResultWithRetry(token, uriInfo, scheme, targetResultSize), resultsProcessorExecutor); + return Futures.transform(futureStateChange, ignored -> getNextResultWithRetry(token, uriInfo, scheme, targetResultSize, binaryResults), resultsProcessorExecutor); } private synchronized ListenableFuture getFutureStateChange() @@ -376,9 +380,9 @@ private synchronized Optional getCachedResult(long token) return Optional.empty(); } - private synchronized QueryResults getNextResultWithRetry(long token, UriInfo uriInfo, String scheme, DataSize targetResultSize) + private synchronized QueryResults getNextResultWithRetry(long token, UriInfo uriInfo, String scheme, DataSize targetResultSize, boolean binaryResults) { - QueryResults queryResults = getNextResult(token, uriInfo, scheme, targetResultSize); + QueryResults queryResults = getNextResult(token, uriInfo, scheme, targetResultSize, binaryResults); if (queryResults.getError() == null || !queryResults.getError().isRetriable()) { return queryResults; } @@ -410,6 +414,7 @@ private synchronized QueryResults getNextResultWithRetry(long token, UriInfo uri createRetryUri(scheme, uriInfo), queryResults.getColumns(), null, + null, StatementStats.builder() .setState(WAITING_FOR_PREREQUISITES.toString()) .setWaitingForPrerequisites(true) @@ -420,7 +425,7 @@ private synchronized QueryResults getNextResultWithRetry(long token, UriInfo uri queryResults.getUpdateCount()); } - private synchronized QueryResults getNextResult(long token, UriInfo uriInfo, String scheme, DataSize targetResultSize) + private synchronized QueryResults getNextResult(long token, UriInfo uriInfo, String scheme, DataSize targetResultSize, boolean binaryResults) { // check if the result for the token have already been created Optional cachedResult = getCachedResult(token); @@ -442,25 +447,51 @@ private synchronized QueryResults getNextResult(long token, UriInfo uriInfo, Str // last page is removed. If another thread observes this state before the response is cached // the pages will be lost. Iterable> data = null; + List binaryData = null; try { - ImmutableList.Builder pages = ImmutableList.builder(); - long bytes = 0; long rows = 0; + long bytes = 0; long targetResultBytes = targetResultSize.toBytes(); - while (bytes < targetResultBytes) { - SerializedPage serializedPage = exchangeClient.pollPage(); - if (serializedPage == null) { - break; + if (binaryResults) { + ImmutableList.Builder pages = ImmutableList.builder(); + while (bytes < targetResultBytes) { + SerializedPage serializedPage = exchangeClient.pollPage(); + if (serializedPage == null) { + break; + } + + rows += serializedPage.getPositionCount(); + bytes += serializedPage.getSizeInBytes(); + + DynamicSliceOutput sliceOutput = new DynamicSliceOutput(1000); + writeSerializedPage(sliceOutput, serializedPage); + + String encodedPage = BASE64_ENCODER.encodeToString(sliceOutput.slice().byteArray()); + pages.add(encodedPage); + } + if (rows > 0) { + binaryData = pages.build(); + } + } + else { + ImmutableList.Builder pages = ImmutableList.builder(); + while (bytes < targetResultBytes) { + SerializedPage serializedPage = exchangeClient.pollPage(); + if (serializedPage == null) { + break; + } + + Page page = serde.deserialize(serializedPage); + bytes += page.getLogicalSizeInBytes(); + rows += page.getPositionCount(); + pages.add(new RowIterable(session.toConnectorSession(), types, page)); + } + if (rows > 0) { + // client implementations do not properly handle empty list of data + data = Iterables.concat(pages.build()); } - - Page page = serde.deserialize(serializedPage); - bytes += page.getLogicalSizeInBytes(); - rows += page.getPositionCount(); - pages.add(new RowIterable(session.toConnectorSession(), types, page)); } if (rows > 0) { - // client implementations do not properly handle empty list of data - data = Iterables.concat(pages.build()); hasProducedResult = true; } } @@ -508,7 +539,7 @@ private synchronized QueryResults getNextResult(long token, UriInfo uriInfo, Str URI nextResultsUri = null; if (nextToken.isPresent()) { - nextResultsUri = createNextResultsUri(scheme, uriInfo, nextToken.getAsLong()); + nextResultsUri = createNextResultsUri(scheme, uriInfo, nextToken.getAsLong(), binaryResults); } // update catalog, schema, and path @@ -542,6 +573,7 @@ private synchronized QueryResults getNextResult(long token, UriInfo uriInfo, Str nextResultsUri, columns, data, + binaryData, toStatementStats(queryInfo), toQueryError(queryInfo), queryInfo.getWarnings(), @@ -596,7 +628,7 @@ private ListenableFuture queryDoneFuture(QueryState currentState) return Futures.transformAsync(queryManager.getStateChange(queryId, currentState), this::queryDoneFuture, directExecutor()); } - private synchronized URI createNextResultsUri(String scheme, UriInfo uriInfo, long nextToken) + private synchronized URI createNextResultsUri(String scheme, UriInfo uriInfo, long nextToken, boolean binaryResults) { UriBuilder uri = uriInfo.getBaseUriBuilder() .scheme(scheme) @@ -605,6 +637,9 @@ private synchronized URI createNextResultsUri(String scheme, UriInfo uriInfo, lo .path(String.valueOf(nextToken)) .replaceQuery("") .queryParam("slug", this.slug); + if (binaryResults) { + uri.queryParam("binaryResults", "true"); + } Optional targetResultSize = getTargetResultSize(session); if (targetResultSize.isPresent()) { uri = uri.queryParam("targetResultSize", targetResultSize.get()); diff --git a/presto-main/src/main/java/com/facebook/presto/server/protocol/QueryResourceUtil.java b/presto-main/src/main/java/com/facebook/presto/server/protocol/QueryResourceUtil.java index 11ad9251ae35..d417402eae91 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/protocol/QueryResourceUtil.java +++ b/presto-main/src/main/java/com/facebook/presto/server/protocol/QueryResourceUtil.java @@ -158,6 +158,7 @@ public static Response toResponse(Query query, QueryResults queryResults, String prependUri(queryResults.getNextUri(), xPrestoPrefixUri), queryResults.getColumns(), prepareJsonData(queryResults.getColumns(), queryResults.getData()), + queryResults.getBinaryData(), queryResults.getStats(), queryResults.getError(), queryResults.getWarnings(), diff --git a/presto-main/src/main/java/com/facebook/presto/server/protocol/QueuedStatementResource.java b/presto-main/src/main/java/com/facebook/presto/server/protocol/QueuedStatementResource.java index 67f4eefcef8d..fa57238d6735 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/protocol/QueuedStatementResource.java +++ b/presto-main/src/main/java/com/facebook/presto/server/protocol/QueuedStatementResource.java @@ -47,6 +47,7 @@ import javax.inject.Inject; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.HeaderParam; import javax.ws.rs.POST; @@ -60,6 +61,7 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; +import javax.ws.rs.core.UriBuilder; import javax.ws.rs.core.UriInfo; import java.net.URI; @@ -129,7 +131,6 @@ public class QueuedStatementResource private final SessionPropertyManager sessionPropertyManager; // We may need some system default session property values at early query stage even before session is created. private final QueryBlockingRateLimiter queryRateLimiter; - private final TimeStat queuedRateLimiterBlockTime = new TimeStat(); @Inject public QueuedStatementResource( @@ -199,6 +200,7 @@ public void stop() @Produces(APPLICATION_JSON) public Response postStatement( String statement, + @DefaultValue("false") @QueryParam("binaryResults") boolean binaryResults, @HeaderParam(X_FORWARDED_PROTO) String xForwardedProto, @HeaderParam(PRESTO_PREFIX_URL) String xPrestoPrefixUrl, @Context HttpServletRequest servletRequest, @@ -220,7 +222,7 @@ public Response postStatement( Query query = new Query(statement, sessionContext, dispatchManager, queryResultsProvider, 0); queries.put(query.getQueryId(), query); - return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl)), compressionEnabled).build(); + return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults)), compressionEnabled).build(); } /** @@ -235,6 +237,7 @@ public Response postStatement( @Produces(APPLICATION_JSON) public Response retryFailedQuery( @PathParam("queryId") QueryId queryId, + @DefaultValue("false") @QueryParam("binaryResults") boolean binaryResults, @HeaderParam(X_FORWARDED_PROTO) String xForwardedProto, @HeaderParam(PRESTO_PREFIX_URL) String xPrestoPrefixUrl, @Context UriInfo uriInfo) @@ -268,7 +271,7 @@ public Response retryFailedQuery( } } - return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl)), compressionEnabled).build(); + return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults)), compressionEnabled).build(); } /** @@ -289,6 +292,7 @@ public void getStatus( @PathParam("token") long token, @QueryParam("slug") String slug, @QueryParam("maxWait") Duration maxWait, + @DefaultValue("false") @QueryParam("binaryResults") boolean binaryResults, @HeaderParam(X_FORWARDED_PROTO) String xForwardedProto, @HeaderParam(PRESTO_PREFIX_URL) String xPrestoPrefixUrl, @Context UriInfo uriInfo, @@ -314,7 +318,7 @@ public void getStatus( // when state changes, fetch the next result ListenableFuture queryResultsFuture = transformAsync( futureStateChange, - ignored -> query.toResponse(token, uriInfo, xForwardedProto, xPrestoPrefixUrl, WAIT_ORDERING.min(MAX_WAIT_TIME, maxWait), compressionEnabled), + ignored -> query.toResponse(token, uriInfo, xForwardedProto, xPrestoPrefixUrl, WAIT_ORDERING.min(MAX_WAIT_TIME, maxWait), compressionEnabled, binaryResults), responseExecutor); bindAsyncResponse(asyncResponse, queryResultsFuture, responseExecutor); } @@ -371,16 +375,19 @@ private static URI getQueryHtmlUri(QueryId queryId, UriInfo uriInfo, String xFor return QueryResourceUtil.prependUri(uri, xPrestoPrefixUrl); } - private static URI getQueuedUri(QueryId queryId, String slug, long token, UriInfo uriInfo, String xForwardedProto, String xPrestoPrefixUrl) + private static URI getQueuedUri(QueryId queryId, String slug, long token, UriInfo uriInfo, String xForwardedProto, String xPrestoPrefixUrl, boolean binaryResults) { - URI uri = uriInfo.getBaseUriBuilder() + UriBuilder uriBuilder = uriInfo.getBaseUriBuilder() .scheme(getScheme(xForwardedProto, uriInfo)) - .replacePath("/v1/statement/queued/") + .replacePath("/v1/statement/queued") .path(queryId.toString()) .path(String.valueOf(token)) .replaceQuery("") - .queryParam("slug", slug) - .build(); + .queryParam("slug", slug); + if (binaryResults) { + uriBuilder.queryParam("binaryResults", "true"); + } + URI uri = uriBuilder.build(); return QueryResourceUtil.prependUri(uri, xPrestoPrefixUrl); } @@ -408,6 +415,7 @@ private static QueryResults createQueryResults( nextUri, null, null, + null, StatementStats.builder() .setState(state.toString()) .setWaitingForPrerequisites(state == WAITING_FOR_PREREQUISITES) @@ -542,7 +550,7 @@ private ListenableFuture waitForDispatched() * @param xForwardedProto Forwarded protocol (http or https) * @return {@link com.facebook.presto.client.QueryResults} */ - public synchronized QueryResults getInitialQueryResults(UriInfo uriInfo, String xForwardedProto, String xPrestoPrefixUrl) + public synchronized QueryResults getInitialQueryResults(UriInfo uriInfo, String xForwardedProto, String xPrestoPrefixUrl, boolean binaryResults) { verify(lastToken.get() == 0); verify(querySubmissionFuture == null); @@ -551,10 +559,11 @@ public synchronized QueryResults getInitialQueryResults(UriInfo uriInfo, String uriInfo, xForwardedProto, xPrestoPrefixUrl, - DispatchInfo.waitingForPrerequisites(NO_DURATION, NO_DURATION)); + DispatchInfo.waitingForPrerequisites(NO_DURATION, NO_DURATION), + binaryResults); } - public ListenableFuture toResponse(long token, UriInfo uriInfo, String xForwardedProto, String xPrestoPrefixUrl, Duration maxWait, boolean compressionEnabled) + public ListenableFuture toResponse(long token, UriInfo uriInfo, String xForwardedProto, String xPrestoPrefixUrl, Duration maxWait, boolean compressionEnabled, boolean binaryResults) { long lastToken = this.lastToken.get(); // token should be the last token or the next token @@ -572,7 +581,8 @@ public ListenableFuture toResponse(long token, UriInfo uriInfo, String uriInfo, xForwardedProto, xPrestoPrefixUrl, - DispatchInfo.waitingForPrerequisites(NO_DURATION, NO_DURATION)); + DispatchInfo.waitingForPrerequisites(NO_DURATION, NO_DURATION), + binaryResults); return immediateFuture(withCompressionConfiguration(Response.ok(queryResults), compressionEnabled).build()); } } @@ -586,7 +596,7 @@ public ListenableFuture toResponse(long token, UriInfo uriInfo, String } if (!waitForDispatched().isDone()) { - return immediateFuture(withCompressionConfiguration(Response.ok(createQueryResults(token + 1, uriInfo, xForwardedProto, xPrestoPrefixUrl, dispatchInfo.get())), compressionEnabled).build()); + return immediateFuture(withCompressionConfiguration(Response.ok(createQueryResults(token + 1, uriInfo, xForwardedProto, xPrestoPrefixUrl, dispatchInfo.get(), binaryResults)), compressionEnabled).build()); } com.facebook.presto.server.protocol.Query query; @@ -594,12 +604,12 @@ public ListenableFuture toResponse(long token, UriInfo uriInfo, String query = queryProvider.getQuery(queryId, slug); } catch (WebApplicationException e) { - return immediateFuture(withCompressionConfiguration(Response.ok(createQueryResults(token + 1, uriInfo, xForwardedProto, xPrestoPrefixUrl, dispatchInfo.get())), compressionEnabled).build()); + return immediateFuture(withCompressionConfiguration(Response.ok(createQueryResults(token + 1, uriInfo, xForwardedProto, xPrestoPrefixUrl, dispatchInfo.get(), binaryResults)), compressionEnabled).build()); } // If this future completes successfully, the next URI will redirect to the executing statement endpoint. // Hence it is safe to hardcode the token to be 0. return transform( - query.waitForResults(0, uriInfo, getScheme(xForwardedProto, uriInfo), maxWait, TARGET_RESULT_SIZE), + query.waitForResults(0, uriInfo, getScheme(xForwardedProto, uriInfo), maxWait, TARGET_RESULT_SIZE, binaryResults), results -> QueryResourceUtil.toResponse(query, results, xPrestoPrefixUrl, compressionEnabled), directExecutor()); } @@ -609,9 +619,9 @@ public synchronized void cancel() querySubmissionFuture.addListener(() -> dispatchManager.cancelQuery(queryId), directExecutor()); } - private QueryResults createQueryResults(long token, UriInfo uriInfo, String xForwardedProto, String xPrestoPrefixUrl, DispatchInfo dispatchInfo) + private QueryResults createQueryResults(long token, UriInfo uriInfo, String xForwardedProto, String xPrestoPrefixUrl, DispatchInfo dispatchInfo, boolean binaryResults) { - URI nextUri = getNextUri(token, uriInfo, xForwardedProto, xPrestoPrefixUrl, dispatchInfo); + URI nextUri = getNextUri(token, uriInfo, xForwardedProto, xPrestoPrefixUrl, dispatchInfo, binaryResults); Optional queryError = dispatchInfo.getFailureInfo() .map(this::toQueryError); @@ -628,13 +638,13 @@ private QueryResults createQueryResults(long token, UriInfo uriInfo, String xFor dispatchInfo.getWaitingForPrerequisitesTime()); } - private URI getNextUri(long token, UriInfo uriInfo, String xForwardedProto, String xPrestoPrefixUrl, DispatchInfo dispatchInfo) + private URI getNextUri(long token, UriInfo uriInfo, String xForwardedProto, String xPrestoPrefixUrl, DispatchInfo dispatchInfo, boolean binaryResults) { // if failed, query is complete if (dispatchInfo.getFailureInfo().isPresent()) { return null; } - return getQueuedUri(queryId, slug, token, uriInfo, xForwardedProto, xPrestoPrefixUrl); + return getQueuedUri(queryId, slug, token, uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults); } private QueryError toQueryError(ExecutionFailureInfo executionFailureInfo) diff --git a/presto-main/src/test/java/com/facebook/presto/server/TestServer.java b/presto-main/src/test/java/com/facebook/presto/server/TestServer.java index 93e8990d51cb..5f05e67c8c22 100644 --- a/presto-main/src/test/java/com/facebook/presto/server/TestServer.java +++ b/presto-main/src/test/java/com/facebook/presto/server/TestServer.java @@ -23,18 +23,27 @@ import com.facebook.airlift.testing.Closeables; import com.facebook.presto.client.QueryError; import com.facebook.presto.client.QueryResults; +import com.facebook.presto.common.Page; +import com.facebook.presto.common.block.BlockEncodingManager; import com.facebook.presto.common.type.TimeZoneNotSupportedException; +import com.facebook.presto.execution.buffer.PagesSerdeFactory; import com.facebook.presto.server.testing.TestingPrestoServer; import com.facebook.presto.spi.QueryId; import com.facebook.presto.spi.function.SqlFunctionId; import com.facebook.presto.spi.function.SqlInvokedFunction; +import com.facebook.presto.spi.page.PagesSerde; +import com.facebook.presto.spi.page.SerializedPage; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.airlift.slice.BasicSliceInput; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.net.URI; +import java.util.Base64; import java.util.List; import static com.facebook.airlift.http.client.FullJsonResponseHandler.createFullJsonResponseHandler; @@ -59,10 +68,12 @@ import static com.facebook.presto.client.PrestoHeaders.PRESTO_TIME_ZONE; import static com.facebook.presto.client.PrestoHeaders.PRESTO_TRANSACTION_ID; import static com.facebook.presto.client.PrestoHeaders.PRESTO_USER; +import static com.facebook.presto.common.type.VarcharType.VARCHAR; import static com.facebook.presto.server.TestHttpRequestSessionContext.createFunctionAdd; import static com.facebook.presto.server.TestHttpRequestSessionContext.createSqlFunctionIdAdd; import static com.facebook.presto.server.TestHttpRequestSessionContext.urlEncode; import static com.facebook.presto.spi.StandardErrorCode.INCOMPATIBLE_CLIENT; +import static com.facebook.presto.spi.page.PagesSerdeUtil.readSerializedPage; import static java.lang.String.format; import static java.nio.charset.StandardCharsets.UTF_8; import static javax.ws.rs.core.HttpHeaders.CONTENT_TYPE; @@ -71,6 +82,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; +import static org.testng.Assert.fail; @Test(singleThreaded = true) public class TestServer @@ -135,6 +147,63 @@ public void testServerStarts() assertEquals(response.getStatusCode(), OK.getStatusCode()); } + @Test + public void testBinaryResults() + { + // start query + URI uri = HttpUriBuilder.uriBuilderFrom(server.getBaseUrl()) + .replacePath("/v1/statement") + .replaceParameter("binaryResults", "true") + .build(); + Request request = preparePost() + .setUri(uri) + .setBodyGenerator(createStaticBodyGenerator("show catalogs", UTF_8)) + .setHeader(PRESTO_USER, "user") + .setHeader(PRESTO_SOURCE, "source") + .setHeader(PRESTO_CATALOG, "catalog") + .setHeader(PRESTO_SCHEMA, "schema") + .setHeader(PRESTO_CLIENT_INFO, "{\"clientVersion\":\"testVersion\"}") + .build(); + + QueryResults queryResults = client.execute(request, createJsonResponseHandler(QUERY_RESULTS_CODEC)); + + ImmutableList.Builder data = ImmutableList.builder(); + while (queryResults.getNextUri() != null) { + Request nextRequest = prepareGet() + .setUri(queryResults.getNextUri()) + .build(); + queryResults = client.execute(nextRequest, createJsonResponseHandler(QUERY_RESULTS_CODEC)); + + assertNull(queryResults.getData()); + if (queryResults.getBinaryData() != null) { + data.addAll(queryResults.getBinaryData()); + } + } + + if (queryResults.getError() != null) { + fail(queryResults.getError().toString()); + } + + List encodedPages = data.build(); + + assertEquals(1, encodedPages.size()); + byte[] decodedPage = Base64.getDecoder().decode((String) encodedPages.get(0)); + + BlockEncodingManager blockEncodingSerde = new BlockEncodingManager(); + PagesSerde pagesSerde = new PagesSerdeFactory(blockEncodingSerde, false, false).createPagesSerde(); + BasicSliceInput pageInput = new BasicSliceInput(Slices.wrappedBuffer(decodedPage, 0, decodedPage.length)); + SerializedPage serializedPage = readSerializedPage(pageInput); + + Page page = pagesSerde.deserialize(serializedPage); + + assertEquals(1, page.getChannelCount()); + assertEquals(1, page.getPositionCount()); + + // only the system catalog exists by default + Slice slice = VARCHAR.getSlice(page.getBlock(0), 0); + assertEquals(slice.toStringUtf8(), "system"); + } + @Test public void testQuery() { @@ -163,7 +232,10 @@ public void testQuery() data.addAll(queryResults.getData()); } } - assertNull(queryResults.getError()); + + if (queryResults.getError() != null) { + fail(queryResults.getError().toString()); + } // get the query info BasicQueryInfo queryInfo = server.getQueryManager().getQueryInfo(new QueryId(queryResults.getId())); @@ -214,7 +286,10 @@ public void testTransactionSupport() } queryResults = client.execute(prepareGet().setUri(queryResults.getValue().getNextUri()).build(), createFullJsonResponseHandler(QUERY_RESULTS_CODEC)); } - assertNull(queryResults.getValue().getError()); + + if (queryResults.getValue().getError() != null) { + fail(queryResults.getValue().getError().toString()); + } assertNotNull(queryResults.getHeader(PRESTO_STARTED_TRANSACTION_ID)); }