Skip to content

Commit

Permalink
Merge pull request #683 from uwescience/profile-subqueries
Browse files Browse the repository at this point in the history
profiling of multi-subquery queries
  • Loading branch information
dhalperi committed Feb 5, 2015
2 parents 5dbcfd1 + 67555e1 commit 6573fca
Show file tree
Hide file tree
Showing 12 changed files with 395 additions and 164 deletions.
17 changes: 9 additions & 8 deletions src/edu/washington/escience/myria/MyriaConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,9 @@ public final class MyriaConstants {
/**
* The schema of the {@link #EVENT_PROFILING_RELATION}.
*/
public static final Schema EVENT_PROFILING_SCHEMA = Schema.ofFields(Type.LONG_TYPE, Type.INT_TYPE, Type.INT_TYPE,
Type.LONG_TYPE, Type.LONG_TYPE, Type.LONG_TYPE, "queryId", "fragmentId", "opId", "startTime", "endTime",
"numTuples");
public static final Schema EVENT_PROFILING_SCHEMA = Schema.ofFields("queryId", Type.LONG_TYPE, "subQueryId",
Type.INT_TYPE, "fragmentId", Type.INT_TYPE, "opId", Type.INT_TYPE, "startTime", Type.LONG_TYPE, "endTime",
Type.LONG_TYPE, "numTuples", Type.LONG_TYPE);

/**
* The relation that stores profiling information about sent tuples.
Expand All @@ -259,8 +259,9 @@ public final class MyriaConstants {
/**
* The schema of the {@link #SENT_PROFILING_RELATION}.
*/
public static final Schema SENT_PROFILING_SCHEMA = Schema.ofFields(Type.LONG_TYPE, Type.INT_TYPE, Type.LONG_TYPE,
Type.LONG_TYPE, Type.INT_TYPE, "queryId", "fragmentId", "nanoTime", "numTuples", "destWorkerId");
public static final Schema SENT_PROFILING_SCHEMA = Schema.ofFields("queryId", Type.LONG_TYPE, "subQueryId",
Type.INT_TYPE, "fragmentId", Type.INT_TYPE, "nanoTime", Type.LONG_TYPE, "numTuples", Type.LONG_TYPE,
"destWorkerId", Type.INT_TYPE);

/**
* The relation that stores resource profiling information.
Expand All @@ -270,9 +271,9 @@ public final class MyriaConstants {
/**
* The schema of the {@link #RESOURCE_PROFILING_RELATION}.
*/
public static final Schema RESOURCE_PROFILING_SCHEMA = Schema.ofFields(Type.LONG_TYPE, Type.INT_TYPE,
Type.STRING_TYPE, Type.LONG_TYPE, Type.LONG_TYPE, Type.LONG_TYPE, "timestamp", "opId", "measurement", "value",
"queryId", "subqueryId");
public static final Schema RESOURCE_PROFILING_SCHEMA = Schema.ofFields("timestamp", Type.LONG_TYPE, "opId",
Type.INT_TYPE, "measurement", Type.STRING_TYPE, "value", Type.LONG_TYPE, "queryId", Type.LONG_TYPE, "subqueryId",
Type.LONG_TYPE);

/**
* For how long cached versions of the profiling data should be valid.
Expand Down
51 changes: 35 additions & 16 deletions src/edu/washington/escience/myria/api/LogResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import edu.washington.escience.myria.MyriaConstants;
import edu.washington.escience.myria.TupleWriter;
import edu.washington.escience.myria.parallel.Server;
import edu.washington.escience.myria.parallel.SubQueryId;

/**
* Class that handles logs.
Expand All @@ -43,6 +44,7 @@ public final class LogResource {
* Get profiling logs of a query.
*
* @param queryId query id.
* @param subqueryId subquery id.
* @param fragmentId the fragment id.
* @param start the earliest time where we need data
* @param end the latest time
Expand All @@ -55,6 +57,7 @@ public final class LogResource {
@GET
@Path("profiling")
public Response getProfileLogs(@QueryParam("queryId") final Long queryId,
@DefaultValue("0") @QueryParam("subqueryId") final long subqueryId,
@QueryParam("fragmentId") final Long fragmentId, @QueryParam("start") final Long start,
@QueryParam("end") final Long end, @DefaultValue("0") @QueryParam("minLength") final Long minLength,
@DefaultValue("false") @QueryParam("onlyRootOp") final boolean onlyRootOp, @Context final Request request)
Expand All @@ -66,7 +69,8 @@ public Response getProfileLogs(@QueryParam("queryId") final Long queryId,
Preconditions.checkArgument(end != null, "Missing required field end.");
Preconditions.checkArgument(minLength >= 0, "MinLength has to be greater than or equal to 0");

EntityTag eTag = new EntityTag(Integer.toString(Joiner.on('-').join("profiling", queryId, fragmentId).hashCode()));
EntityTag eTag =
new EntityTag(Integer.toString(Joiner.on('-').join("profiling", queryId, subqueryId, fragmentId).hashCode()));
Object obj = checkAndAddCache(request, eTag);
if (obj instanceof Response) {
return (Response) obj;
Expand All @@ -88,14 +92,16 @@ public Response getProfileLogs(@QueryParam("queryId") final Long queryId,
PipedStreamingOutput entity = new PipedStreamingOutput(input);
response.entity(entity);

server.startLogDataStream(queryId, fragmentId, start, end, minLength, onlyRootOp, writer);
server.startLogDataStream(new SubQueryId(queryId, subqueryId), fragmentId, start, end, minLength, onlyRootOp,
writer);
return response.build();
}

/**
* Get contribution of each operator to runtime.
*
* @param queryId query id.
* @param subqueryId subquery id.
* @param fragmentId the fragment id, default is all.
* @param request the current request.
* @return the contributions across all workers
Expand All @@ -104,13 +110,14 @@ public Response getProfileLogs(@QueryParam("queryId") final Long queryId,
@GET
@Path("contribution")
public Response getContributions(@QueryParam("queryId") final Long queryId,
@DefaultValue("0") @QueryParam("subqueryId") final long subqueryId,
@DefaultValue("-1") @QueryParam("fragmentId") final Long fragmentId, @Context final Request request)
throws DbException {

Preconditions.checkArgument(queryId != null, "Missing required field queryId.");

EntityTag eTag =
new EntityTag(Integer.toString(Joiner.on('-').join("contribution", queryId, fragmentId).hashCode()));
new EntityTag(Integer.toString(Joiner.on('-').join("contribution", queryId, subqueryId, fragmentId).hashCode()));
Object obj = checkAndAddCache(request, eTag);
if (obj instanceof Response) {
return (Response) obj;
Expand All @@ -132,7 +139,7 @@ public Response getContributions(@QueryParam("queryId") final Long queryId,
PipedStreamingOutput entity = new PipedStreamingOutput(input);
response.entity(entity);

server.startContributionsStream(queryId, fragmentId, writer);
server.startContributionsStream(new SubQueryId(queryId, subqueryId), fragmentId, writer);

return response.build();
}
Expand All @@ -141,6 +148,7 @@ public Response getContributions(@QueryParam("queryId") final Long queryId,
* Get information about where tuples were sent.
*
* @param queryId query id.
* @param subqueryId subquery id.
* @param fragmentId the fragment id. < 0 means all
* @param request the current request.
* @return the profiling logs of the query across all workers
Expand All @@ -149,12 +157,14 @@ public Response getContributions(@QueryParam("queryId") final Long queryId,
@GET
@Path("sent")
public Response getSentLogs(@QueryParam("queryId") final Long queryId,
@DefaultValue("0") @QueryParam("subqueryId") final long subqueryId,
@DefaultValue("-1") @QueryParam("fragmentId") final long fragmentId, @Context final Request request)
throws DbException {

Preconditions.checkArgument(queryId != null, "Missing required field queryId.");

EntityTag eTag = new EntityTag(Integer.toString(Joiner.on('-').join("sent", queryId, fragmentId).hashCode()));
EntityTag eTag =
new EntityTag(Integer.toString(Joiner.on('-').join("sent", queryId, subqueryId, fragmentId).hashCode()));
Object obj = checkAndAddCache(request, eTag);
if (obj instanceof Response) {
return (Response) obj;
Expand All @@ -176,7 +186,7 @@ public Response getSentLogs(@QueryParam("queryId") final Long queryId,
PipedStreamingOutput entity = new PipedStreamingOutput(input);
response.entity(entity);

server.startSentLogDataStream(queryId, fragmentId, writer);
server.startSentLogDataStream(new SubQueryId(queryId, subqueryId), fragmentId, writer);

return response.build();
}
Expand All @@ -185,18 +195,21 @@ public Response getSentLogs(@QueryParam("queryId") final Long queryId,
* Get aggregated summary of all data sent.
*
* @param queryId query id.
* @param subqueryId subquery id.
* @param request the current request.
* @return the profiling logs of the query across all workers
* @throws DbException if there is an error in the database.
*/
@GET
@Path("aggregated_sent")
public Response getAggregatedSentLogs(@QueryParam("queryId") final Long queryId, @Context final Request request)
public Response getAggregatedSentLogs(@QueryParam("queryId") final Long queryId,
@DefaultValue("0") @QueryParam("subqueryId") final long subqueryId, @Context final Request request)
throws DbException {

Preconditions.checkArgument(queryId != null, "Missing required field queryId.");

EntityTag eTag = new EntityTag(Integer.toString(Joiner.on('-').join("aggregated-sent", queryId).hashCode()));
EntityTag eTag =
new EntityTag(Integer.toString(Joiner.on('-').join("aggregated-sent", queryId, subqueryId).hashCode()));
Object obj = checkAndAddCache(request, eTag);
if (obj instanceof Response) {
return (Response) obj;
Expand All @@ -218,27 +231,30 @@ public Response getAggregatedSentLogs(@QueryParam("queryId") final Long queryId,
PipedStreamingOutput entity = new PipedStreamingOutput(input);
response.entity(entity);

server.startAggregatedSentLogDataStream(queryId, writer);
server.startAggregatedSentLogDataStream(new SubQueryId(queryId, subqueryId), writer);

return response.build();
}

/**
* @param queryId query id.
* @param subqueryId subquery id.
* @param fragmentId the fragment id.
* @param request the current request.
* @return the range for which we have profiling info
* @throws DbException if there is an error in the database.
*/
@GET
@Path("range")
public Response getRange(@QueryParam("queryId") final Long queryId, @QueryParam("fragmentId") final Long fragmentId,
@Context final Request request) throws DbException {
public Response getRange(@QueryParam("queryId") final Long queryId,
@DefaultValue("0") @QueryParam("subqueryId") final long subqueryId,
@QueryParam("fragmentId") final Long fragmentId, @Context final Request request) throws DbException {

Preconditions.checkArgument(queryId != null, "Missing required field queryId.");
Preconditions.checkArgument(fragmentId != null, "Missing required field fragmentId.");

EntityTag eTag = new EntityTag(Integer.toString(Joiner.on('-').join("range", queryId, fragmentId).hashCode()));
EntityTag eTag =
new EntityTag(Integer.toString(Joiner.on('-').join("range", subqueryId, queryId, fragmentId).hashCode()));
Object obj = checkAndAddCache(request, eTag);
if (obj instanceof Response) {
return (Response) obj;
Expand All @@ -260,7 +276,7 @@ public Response getRange(@QueryParam("queryId") final Long queryId, @QueryParam(
PipedStreamingOutput entity = new PipedStreamingOutput(input);
response.entity(entity);

server.startRangeDataStream(queryId, fragmentId, writer);
server.startRangeDataStream(new SubQueryId(queryId, subqueryId), fragmentId, writer);

return response.build();
}
Expand All @@ -269,6 +285,7 @@ public Response getRange(@QueryParam("queryId") final Long queryId, @QueryParam(
* Get the number of workers working on a fragment based on profiling logs of a query for the root operators.
*
* @param queryId query id.
* @param subqueryId subquery id.
* @param fragmentId the fragment id.
* @param start the start of the range
* @param end the end of the range
Expand All @@ -281,6 +298,7 @@ public Response getRange(@QueryParam("queryId") final Long queryId, @QueryParam(
@GET
@Path("histogram")
public Response getHistogram(@QueryParam("queryId") final Long queryId,
@DefaultValue("0") @QueryParam("subqueryId") final long subqueryId,
@QueryParam("fragmentId") final Long fragmentId, @QueryParam("start") final Long start,
@QueryParam("end") final Long end, @QueryParam("step") final Long step,
@DefaultValue("true") @QueryParam("onlyRootOp") final boolean onlyRootOp, @Context final Request request)
Expand All @@ -293,8 +311,8 @@ public Response getHistogram(@QueryParam("queryId") final Long queryId,
Preconditions.checkArgument(step != null, "Missing required field step.");

EntityTag eTag =
new EntityTag(Integer.toString(Joiner.on('-').join("histogram", queryId, fragmentId, start, end, step,
onlyRootOp).hashCode()));
new EntityTag(Integer.toString(Joiner.on('-').join("histogram", queryId, subqueryId, fragmentId, start, end,
step, onlyRootOp).hashCode()));

Object obj = checkAndAddCache(request, eTag);
if (obj instanceof Response) {
Expand All @@ -317,7 +335,8 @@ public Response getHistogram(@QueryParam("queryId") final Long queryId,
PipedStreamingOutput entity = new PipedStreamingOutput(input);
response.entity(entity);

server.startHistogramDataStream(queryId, fragmentId, start, end, step, onlyRootOp, writer);
server.startHistogramDataStream(new SubQueryId(queryId, subqueryId), fragmentId, start, end, step, onlyRootOp,
writer);

return response.build();
}
Expand Down
25 changes: 25 additions & 0 deletions src/edu/washington/escience/myria/api/QueryResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;

import edu.washington.escience.myria.DbException;
import edu.washington.escience.myria.api.encoding.QueryEncoding;
import edu.washington.escience.myria.api.encoding.QuerySearchResults;
import edu.washington.escience.myria.api.encoding.QueryStatusEncoding;
import edu.washington.escience.myria.coordinator.catalog.CatalogException;
import edu.washington.escience.myria.parallel.QueryFuture;
import edu.washington.escience.myria.parallel.Server;
import edu.washington.escience.myria.parallel.SubQueryId;

/**
* Class that handles queries.
Expand Down Expand Up @@ -151,6 +153,29 @@ public Response getQueryStatus(@PathParam("queryId") final long queryId, @Contex
return response.build();
}

/**
* Get the cached execution plan for a specific subquery.
*
* @param queryId the query id.
* @param subQueryId the query id.
* @param uriInfo the URL of the current request.
* @return the cached execution plan for the specified subquery.
* @throws DbException if there is an error in the catalog.
*/
@GET
@Path("query-{queryId:\\d+}/subquery-{subQueryId:\\d+}")
public Response getQueryPlan(@PathParam("queryId") final long queryId,
@PathParam("subQueryId") final long subQueryId, @Context final UriInfo uriInfo) throws DbException {
final String queryPlan = server.getQueryPlan(new SubQueryId(queryId, subQueryId));
final URI uri = uriInfo.getAbsolutePath();
if (queryPlan == null) {
return Response.status(Status.NOT_FOUND).contentLocation(uri).entity(
"Query " + queryId + "." + subQueryId + " has no saved execution plan.").build();
}
ResponseBuilder response = Response.ok().location(uri).entity(queryPlan);
return response.build();
}

/**
* Cancel a running query.
*
Expand Down
Loading

0 comments on commit 6573fca

Please sign in to comment.