Skip to content

Commit

Permalink
Fix lost memory pool assignment requests
Browse files Browse the repository at this point in the history
Workers ignore pool assignment requests, if the coordinator restarts,
until the sequence number reaches the previous sequence. This fixes it,
by including the coordinator id in the request. Also, disable the
cluster memory manager on the workers, as they would then race with the
coordinator to increment the pool assignment sequence number.
  • Loading branch information
cberner committed May 19, 2015
1 parent a35dedf commit 179a216
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 11 deletions.
1 change: 1 addition & 0 deletions presto-docs/src/main/sphinx/release/release-0.104.rst
Expand Up @@ -9,6 +9,7 @@ General Changes
* Fix CLI hang when server becomes unreachable during a query.
* Add :func:`covar_pop`, :func:`covar_samp`, :func:`corr`, :func:`regr_slope`,
and :func:`regr_intercept` functions.
* Fix potential deadlock in cluster memory manager.

Hive Changes
------------
Expand Down
Expand Up @@ -46,8 +46,7 @@ public class QueryIdGenerator
private static final long BASE_SYSTEM_TIME_MILLIS = System.currentTimeMillis();
private static final long BASE_NANO_TIME = System.nanoTime();

@VisibleForTesting
protected final String coordinatorId;
private final String coordinatorId;
@GuardedBy("this")
private long lastTimeInDays;
@GuardedBy("this")
Expand All @@ -66,6 +65,11 @@ public QueryIdGenerator()
this.coordinatorId = coordinatorId.toString();
}

public String getCoordinatorId()
{
return coordinatorId;
}

/**
* Generate next queryId using the following format:
* <tt>YYYYMMdd_HHmmss_index_coordId</tt>
Expand Down
Expand Up @@ -84,6 +84,8 @@ public class SqlTaskManager

@GuardedBy("this")
private long currentMemoryPoolAssignmentVersion;
@GuardedBy("this")
private String coordinatorId;

@Inject
public SqlTaskManager(
Expand Down Expand Up @@ -151,10 +153,14 @@ public SqlTask load(TaskId taskId)
@Override
public synchronized void updateMemoryPoolAssignments(MemoryPoolAssignmentsRequest assignments)
{
if (assignments.getVersion() <= currentMemoryPoolAssignmentVersion) {
if (coordinatorId != null && coordinatorId.equals(assignments.getCoordinatorId()) && assignments.getVersion() <= currentMemoryPoolAssignmentVersion) {
return;
}
currentMemoryPoolAssignmentVersion = assignments.getVersion();
if (coordinatorId != null && !coordinatorId.equals(assignments.getCoordinatorId())) {
log.warn("Switching coordinator affinity from " + coordinatorId + " to " + assignments.getCoordinatorId());
}
coordinatorId = assignments.getCoordinatorId();

for (MemoryPoolAssignment assignment : assignments.getAssignments()) {
queryContexts.getUnchecked(assignment.getQueryId()).setMemoryPool(localMemoryManager.getPool(assignment.getPoolId()));
Expand Down
Expand Up @@ -16,6 +16,8 @@
import com.facebook.presto.ExceededMemoryLimitException;
import com.facebook.presto.execution.LocationFactory;
import com.facebook.presto.execution.QueryExecution;
import com.facebook.presto.execution.QueryIdGenerator;
import com.facebook.presto.server.ServerConfig;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.NodeManager;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -60,6 +62,7 @@ public class ClusterMemoryManager
private final JsonCodec<MemoryPoolAssignmentsRequest> assignmentsRequestJsonCodec;
private final DataSize maxQueryMemory;
private final boolean enabled;
private final String coordinatorId;
private final AtomicLong memoryPoolAssignmentsVersion = new AtomicLong();
private final AtomicLong clusterMemoryUsageBytes = new AtomicLong();
private final AtomicLong clusterMemoryBytes = new AtomicLong();
Expand All @@ -76,6 +79,8 @@ public ClusterMemoryManager(
MBeanExporter exporter,
JsonCodec<MemoryInfo> memoryInfoCodec,
JsonCodec<MemoryPoolAssignmentsRequest> assignmentsRequestJsonCodec,
QueryIdGenerator queryIdGenerator,
ServerConfig serverConfig,
MemoryManagerConfig config)
{
requireNonNull(config, "config is null");
Expand All @@ -86,7 +91,8 @@ public ClusterMemoryManager(
this.memoryInfoCodec = requireNonNull(memoryInfoCodec, "memoryInfoCodec is null");
this.assignmentsRequestJsonCodec = requireNonNull(assignmentsRequestJsonCodec, "assignmentsRequestJsonCodec is null");
this.maxQueryMemory = config.getMaxQueryMemory();
this.enabled = config.isClusterMemoryManagerEnabled();
this.coordinatorId = queryIdGenerator.getCoordinatorId();
this.enabled = config.isClusterMemoryManagerEnabled() && serverConfig.isCoordinator();
}

public void process(Iterable<QueryExecution> queries)
Expand Down Expand Up @@ -151,7 +157,7 @@ private MemoryPoolAssignmentsRequest updateAssignments(Iterable<QueryExecution>
for (QueryExecution queryExecution : queries) {
assignments.add(new MemoryPoolAssignment(queryExecution.getQueryId(), queryExecution.getMemoryPool().getId()));
}
return new MemoryPoolAssignmentsRequest(version, assignments.build());
return new MemoryPoolAssignmentsRequest(coordinatorId, version, assignments.build());
}

private boolean allAssignmentsHavePropagated(Iterable<QueryExecution> queries)
Expand Down
Expand Up @@ -24,16 +24,24 @@

public class MemoryPoolAssignmentsRequest
{
private final String coordinatorId;
private final long version;
private final List<MemoryPoolAssignment> assignments;

@JsonCreator
public MemoryPoolAssignmentsRequest(@JsonProperty("version") long version, @JsonProperty("assignments") List<MemoryPoolAssignment> assignments)
public MemoryPoolAssignmentsRequest(@JsonProperty("coordinatorId") String coordinatorId, @JsonProperty("version") long version, @JsonProperty("assignments") List<MemoryPoolAssignment> assignments)
{
this.coordinatorId = requireNonNull(coordinatorId, "coordinatorId is null");
this.version = version;
this.assignments = ImmutableList.copyOf(requireNonNull(assignments, "assignments is null"));
}

@JsonProperty
public String getCoordinatorId()
{
return coordinatorId;
}

@JsonProperty
public long getVersion()
{
Expand Down
Expand Up @@ -62,11 +62,6 @@ private static class TestIdGenerator
{
private long now;

public String getCoordinatorId()
{
return coordinatorId;
}

public void setNow(long now)
{
this.now = now;
Expand Down

0 comments on commit 179a216

Please sign in to comment.