Skip to content

Commit

Permalink
Replace TaskId in output buffers with a generic OutputBufferId
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Jul 7, 2016
1 parent f604b4d commit 26da1f0
Show file tree
Hide file tree
Showing 20 changed files with 172 additions and 115 deletions.
74 changes: 62 additions & 12 deletions presto-main/src/main/java/com/facebook/presto/OutputBuffers.java
Expand Up @@ -13,9 +13,9 @@
*/ */
package com.facebook.presto; package com.facebook.presto;


import com.facebook.presto.execution.TaskId;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;


import java.util.HashMap; import java.util.HashMap;
Expand All @@ -26,6 +26,7 @@
import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;
import static java.lang.Integer.parseInt;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;


public final class OutputBuffers public final class OutputBuffers
Expand All @@ -39,14 +40,14 @@ public static OutputBuffers createInitialEmptyOutputBuffers()


private final long version; private final long version;
private final boolean noMoreBufferIds; private final boolean noMoreBufferIds;
private final Map<TaskId, Integer> buffers; private final Map<OutputBufferId, Integer> buffers;


// Visible only for Jackson... Use the "with" methods instead // Visible only for Jackson... Use the "with" methods instead
@JsonCreator @JsonCreator
public OutputBuffers( public OutputBuffers(
@JsonProperty("version") long version, @JsonProperty("version") long version,
@JsonProperty("noMoreBufferIds") boolean noMoreBufferIds, @JsonProperty("noMoreBufferIds") boolean noMoreBufferIds,
@JsonProperty("buffers") Map<TaskId, Integer> buffers) @JsonProperty("buffers") Map<OutputBufferId, Integer> buffers)
{ {
this.version = version; this.version = version;
this.buffers = ImmutableMap.copyOf(requireNonNull(buffers, "buffers is null")); this.buffers = ImmutableMap.copyOf(requireNonNull(buffers, "buffers is null"));
Expand All @@ -66,7 +67,7 @@ public boolean isNoMoreBufferIds()
} }


@JsonProperty @JsonProperty
public Map<TaskId, Integer> getBuffers() public Map<OutputBufferId, Integer> getBuffers()
{ {
return buffers; return buffers;
} }
Expand All @@ -88,7 +89,7 @@ public void checkValidTransition(OutputBuffers newOutputBuffers)
} }


// assure we have not changed the buffer assignments // assure we have not changed the buffer assignments
for (Entry<TaskId, Integer> entry : buffers.entrySet()) { for (Entry<OutputBufferId, Integer> entry : buffers.entrySet()) {
if (!entry.getValue().equals(newOutputBuffers.buffers.get(entry.getKey()))) { if (!entry.getValue().equals(newOutputBuffers.buffers.get(entry.getKey()))) {
throw new IllegalArgumentException("newOutputBuffers has changed the assignment for task " + entry.getKey()); throw new IllegalArgumentException("newOutputBuffers has changed the assignment for task " + entry.getKey());
} }
Expand Down Expand Up @@ -126,7 +127,7 @@ public String toString()
.toString(); .toString();
} }


public OutputBuffers withBuffer(TaskId bufferId, int partition) public OutputBuffers withBuffer(OutputBufferId bufferId, int partition)
{ {
requireNonNull(bufferId, "bufferId is null"); requireNonNull(bufferId, "bufferId is null");


Expand All @@ -141,19 +142,19 @@ public OutputBuffers withBuffer(TaskId bufferId, int partition)
return new OutputBuffers( return new OutputBuffers(
version + 1, version + 1,
false, false,
ImmutableMap.<TaskId, Integer>builder() ImmutableMap.<OutputBufferId, Integer>builder()
.putAll(buffers) .putAll(buffers)
.put(bufferId, partition) .put(bufferId, partition)
.build()); .build());
} }


public OutputBuffers withBuffers(Map<TaskId, Integer> buffers) public OutputBuffers withBuffers(Map<OutputBufferId, Integer> buffers)
{ {
requireNonNull(buffers, "buffers is null"); requireNonNull(buffers, "buffers is null");


Map<TaskId, Integer> newBuffers = new HashMap<>(); Map<OutputBufferId, Integer> newBuffers = new HashMap<>();
for (Entry<TaskId, Integer> entry : buffers.entrySet()) { for (Entry<OutputBufferId, Integer> entry : buffers.entrySet()) {
TaskId bufferId = entry.getKey(); OutputBufferId bufferId = entry.getKey();
int partition = entry.getValue(); int partition = entry.getValue();


// it is ok to have a duplicate buffer declaration but it must have the same page partition // it is ok to have a duplicate buffer declaration but it must have the same page partition
Expand Down Expand Up @@ -188,12 +189,61 @@ public OutputBuffers withNoMoreBufferIds()
return new OutputBuffers(version + 1, true, buffers); return new OutputBuffers(version + 1, true, buffers);
} }


private void checkHasBuffer(TaskId bufferId, int partition) private void checkHasBuffer(OutputBufferId bufferId, int partition)
{ {
checkArgument(Objects.equals(buffers.get(bufferId), partition), checkArgument(Objects.equals(buffers.get(bufferId), partition),
"OutputBuffers already contains task %s, but partition is set to %s not %s", "OutputBuffers already contains task %s, but partition is set to %s not %s",
bufferId, bufferId,
buffers.get(bufferId), buffers.get(bufferId),
partition); partition);
} }

public static class OutputBufferId
{
// this is needed by JAX-RS
public static OutputBufferId fromString(String id)
{
return new OutputBufferId(parseInt(id));
}

private final int id;

@JsonCreator
public OutputBufferId(int id)
{
checkArgument(id >= 0, "id is negative");
this.id = id;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
OutputBufferId that = (OutputBufferId) o;
return id == that.id;
}

@JsonValue
public int getId()
{
return id;
}

@Override
public int hashCode()
{
return id;
}

@Override
public String toString()
{
return String.valueOf(id);
}
}
} }
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.execution; package com.facebook.presto.execution;


import com.facebook.presto.OutputBuffers; import com.facebook.presto.OutputBuffers;
import com.facebook.presto.OutputBuffers.OutputBufferId;
import com.facebook.presto.Session; import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties; import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.execution.StateMachine.StateChangeListener; import com.facebook.presto.execution.StateMachine.StateChangeListener;
Expand Down Expand Up @@ -70,7 +71,7 @@ public final class SqlQueryExecution
implements QueryExecution implements QueryExecution
{ {
private static final OutputBuffers ROOT_OUTPUT_BUFFERS = createInitialEmptyOutputBuffers() private static final OutputBuffers ROOT_OUTPUT_BUFFERS = createInitialEmptyOutputBuffers()
.withBuffer(new TaskId("output", "buffer", 0), BROADCAST_PARTITION_ID) .withBuffer(new OutputBufferId(0), BROADCAST_PARTITION_ID)
.withNoMoreBufferIds(); .withNoMoreBufferIds();


private final QueryStateMachine stateMachine; private final QueryStateMachine stateMachine;
Expand Down
Expand Up @@ -313,6 +313,8 @@ public synchronized Set<RemoteTask> scheduleSplits(Node node, int partition, Mul


private synchronized RemoteTask scheduleTask(Node node, int partition, Multimap<PlanNodeId, Split> sourceSplits) private synchronized RemoteTask scheduleTask(Node node, int partition, Multimap<PlanNodeId, Split> sourceSplits)
{ {
// The output buffer depends on the task id starting from 0 and being sequential, since each
// task is assigned a private buffer based on task id.
TaskId taskId = new TaskId(stateMachine.getStageId(), nextTaskId.getAndIncrement()); TaskId taskId = new TaskId(stateMachine.getStageId(), nextTaskId.getAndIncrement());


ImmutableMultimap.Builder<PlanNodeId, Split> initialSplits = ImmutableMultimap.builder(); ImmutableMultimap.Builder<PlanNodeId, Split> initialSplits = ImmutableMultimap.builder();
Expand Down Expand Up @@ -363,7 +365,8 @@ public void recordGetSplitTime(long start)


private static Split createRemoteSplitFor(TaskId taskId, URI taskLocation) private static Split createRemoteSplitFor(TaskId taskId, URI taskLocation)
{ {
URI splitLocation = uriBuilderFrom(taskLocation).appendPath("results").appendPath(taskId.toString()).build(); // Fetch the results from the buffer assigned to the task based on id
URI splitLocation = uriBuilderFrom(taskLocation).appendPath("results").appendPath(String.valueOf(taskId.getId())).build();
return new Split("remote", new RemoteTransactionHandle(), new RemoteSplit(splitLocation)); return new Split("remote", new RemoteTransactionHandle(), new RemoteSplit(splitLocation));
} }


Expand Down
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.execution; package com.facebook.presto.execution;


import com.facebook.presto.OutputBuffers; import com.facebook.presto.OutputBuffers;
import com.facebook.presto.OutputBuffers.OutputBufferId;
import com.facebook.presto.Session; import com.facebook.presto.Session;
import com.facebook.presto.TaskSource; import com.facebook.presto.TaskSource;
import com.facebook.presto.execution.StateMachine.StateChangeListener; import com.facebook.presto.execution.StateMachine.StateChangeListener;
Expand Down Expand Up @@ -320,20 +321,20 @@ public TaskInfo updateTask(Session session, Optional<PlanFragment> fragment, Lis
return getTaskInfo(); return getTaskInfo();
} }


public CompletableFuture<BufferResult> getTaskResults(TaskId outputName, long startingSequenceId, DataSize maxSize) public CompletableFuture<BufferResult> getTaskResults(OutputBufferId bufferId, long startingSequenceId, DataSize maxSize)
{ {
requireNonNull(outputName, "outputName is null"); requireNonNull(bufferId, "bufferId is null");
checkArgument(maxSize.toBytes() > 0, "maxSize must be at least 1 byte"); checkArgument(maxSize.toBytes() > 0, "maxSize must be at least 1 byte");


return outputBuffer.get(outputName, startingSequenceId, maxSize); return outputBuffer.get(bufferId, startingSequenceId, maxSize);
} }


public TaskInfo abortTaskResults(TaskId outputId) public TaskInfo abortTaskResults(OutputBufferId bufferId)
{ {
requireNonNull(outputId, "outputId is null"); requireNonNull(bufferId, "bufferId is null");


log.debug("Aborting task %s output %s", taskId, outputId); log.debug("Aborting task %s output %s", taskId, bufferId);
outputBuffer.abort(outputId); outputBuffer.abort(bufferId);


return getTaskInfo(); return getTaskInfo();
} }
Expand Down
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.execution; package com.facebook.presto.execution;


import com.facebook.presto.OutputBuffers; import com.facebook.presto.OutputBuffers;
import com.facebook.presto.OutputBuffers.OutputBufferId;
import com.facebook.presto.Session; import com.facebook.presto.Session;
import com.facebook.presto.TaskSource; import com.facebook.presto.TaskSource;
import com.facebook.presto.event.query.QueryMonitor; import com.facebook.presto.event.query.QueryMonitor;
Expand Down Expand Up @@ -321,23 +322,23 @@ public TaskInfo updateTask(Session session, TaskId taskId, Optional<PlanFragment
} }


@Override @Override
public CompletableFuture<BufferResult> getTaskResults(TaskId taskId, TaskId outputName, long startingSequenceId, DataSize maxSize) public CompletableFuture<BufferResult> getTaskResults(TaskId taskId, OutputBufferId bufferId, long startingSequenceId, DataSize maxSize)
{ {
requireNonNull(taskId, "taskId is null"); requireNonNull(taskId, "taskId is null");
requireNonNull(outputName, "outputName is null"); requireNonNull(bufferId, "bufferId is null");
Preconditions.checkArgument(startingSequenceId >= 0, "startingSequenceId is negative"); Preconditions.checkArgument(startingSequenceId >= 0, "startingSequenceId is negative");
requireNonNull(maxSize, "maxSize is null"); requireNonNull(maxSize, "maxSize is null");


return tasks.getUnchecked(taskId).getTaskResults(outputName, startingSequenceId, maxSize); return tasks.getUnchecked(taskId).getTaskResults(bufferId, startingSequenceId, maxSize);
} }


@Override @Override
public TaskInfo abortTaskResults(TaskId taskId, TaskId outputId) public TaskInfo abortTaskResults(TaskId taskId, OutputBufferId bufferId)
{ {
requireNonNull(taskId, "taskId is null"); requireNonNull(taskId, "taskId is null");
requireNonNull(outputId, "outputId is null"); requireNonNull(bufferId, "bufferId is null");


return tasks.getUnchecked(taskId).abortTaskResults(outputId); return tasks.getUnchecked(taskId).abortTaskResults(bufferId);
} }


@Override @Override
Expand Down
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.execution; package com.facebook.presto.execution;


import com.facebook.presto.OutputBuffers; import com.facebook.presto.OutputBuffers;
import com.facebook.presto.OutputBuffers.OutputBufferId;
import com.facebook.presto.Session; import com.facebook.presto.Session;
import com.facebook.presto.TaskSource; import com.facebook.presto.TaskSource;
import com.facebook.presto.execution.StateMachine.StateChangeListener; import com.facebook.presto.execution.StateMachine.StateChangeListener;
Expand Down Expand Up @@ -104,7 +105,7 @@ public interface TaskManager
* NOTE: this design assumes that only tasks and buffers that will * NOTE: this design assumes that only tasks and buffers that will
* eventually exist are queried. * eventually exist are queried.
*/ */
CompletableFuture<BufferResult> getTaskResults(TaskId taskId, TaskId outputName, long startingSequenceId, DataSize maxSize); CompletableFuture<BufferResult> getTaskResults(TaskId taskId, OutputBufferId bufferId, long startingSequenceId, DataSize maxSize);


/** /**
* Aborts a result buffer for a task. If the task or buffer has not been * Aborts a result buffer for a task. If the task or buffer has not been
Expand All @@ -114,7 +115,7 @@ public interface TaskManager
* NOTE: this design assumes that only tasks and buffers that will * NOTE: this design assumes that only tasks and buffers that will
* eventually exist are queried. * eventually exist are queried.
*/ */
TaskInfo abortTaskResults(TaskId taskId, TaskId outputId); TaskInfo abortTaskResults(TaskId taskId, OutputBufferId bufferId);


/** /**
* Adds a state change listener to the specified task. * Adds a state change listener to the specified task.
Expand Down
Expand Up @@ -13,7 +13,7 @@
*/ */
package com.facebook.presto.execution.buffer; package com.facebook.presto.execution.buffer;


import com.facebook.presto.execution.TaskId; import com.facebook.presto.OutputBuffers.OutputBufferId;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;


Expand All @@ -25,7 +25,7 @@


public class BufferInfo public class BufferInfo
{ {
private final TaskId bufferId; private final OutputBufferId bufferId;
private final boolean finished; private final boolean finished;
private final int bufferedPages; private final int bufferedPages;


Expand All @@ -34,7 +34,7 @@ public class BufferInfo


@JsonCreator @JsonCreator
public BufferInfo( public BufferInfo(
@JsonProperty("bufferId") TaskId bufferId, @JsonProperty("bufferId") OutputBufferId bufferId,
@JsonProperty("finished") boolean finished, @JsonProperty("finished") boolean finished,
@JsonProperty("bufferedPages") int bufferedPages, @JsonProperty("bufferedPages") int bufferedPages,
@JsonProperty("pagesSent") long pagesSent, @JsonProperty("pagesSent") long pagesSent,
Expand All @@ -51,7 +51,7 @@ public BufferInfo(
} }


@JsonProperty @JsonProperty
public TaskId getBufferId() public OutputBufferId getBufferId()
{ {
return bufferId; return bufferId;
} }
Expand Down
Expand Up @@ -14,8 +14,8 @@
package com.facebook.presto.execution.buffer; package com.facebook.presto.execution.buffer;


import com.facebook.presto.OutputBuffers; import com.facebook.presto.OutputBuffers;
import com.facebook.presto.OutputBuffers.OutputBufferId;
import com.facebook.presto.execution.StateMachine.StateChangeListener; import com.facebook.presto.execution.StateMachine.StateChangeListener;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.spi.Page; import com.facebook.presto.spi.Page;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.DataSize; import io.airlift.units.DataSize;
Expand Down Expand Up @@ -58,12 +58,12 @@ public interface OutputBuffer
* If the buffer result is marked as complete, the client must call abort to acknowledge * If the buffer result is marked as complete, the client must call abort to acknowledge
* receipt of the final state. * receipt of the final state.
*/ */
CompletableFuture<BufferResult> get(TaskId outputId, long token, DataSize maxSize); CompletableFuture<BufferResult> get(OutputBufferId bufferId, long token, DataSize maxSize);


/** /**
* Closes the specified output buffer. * Closes the specified output buffer.
*/ */
void abort(TaskId outputId); void abort(OutputBufferId bufferId);


/** /**
* Adds a page to an unpartitioned buffer. If no-more-pages has been set, the enqueue * Adds a page to an unpartitioned buffer. If no-more-pages has been set, the enqueue
Expand Down

0 comments on commit 26da1f0

Please sign in to comment.