Skip to content

Commit

Permalink
Add deadlock detection to queue manager
Browse files Browse the repository at this point in the history
  • Loading branch information
haozhun committed Mar 23, 2015
1 parent 465a7e0 commit d5c25d9
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 1 deletion.
6 changes: 6 additions & 0 deletions pom.xml
Expand Up @@ -627,6 +627,12 @@
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.jgrapht</groupId>
<artifactId>jgrapht-core</artifactId>
<version>0.9.0</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
5 changes: 5 additions & 0 deletions presto-main/pom.xml
Expand Up @@ -243,6 +243,11 @@
<artifactId>asm-all</artifactId>
</dependency>

<dependency>
<groupId>org.jgrapht</groupId>
<artifactId>jgrapht-core</artifactId>
</dependency>

<!-- for testing -->
<dependency>
<groupId>org.testng</groupId>
Expand Down
Expand Up @@ -50,6 +50,11 @@ public String getExpandedTemplate(Session session)
return SOURCE_PATTERN.matcher(expanded).replaceAll(nullToEmpty(session.getSource()));
}

String getTemplate()
{
return template;
}

public int getMaxConcurrent()
{
return maxConcurrent;
Expand Down
Expand Up @@ -85,4 +85,9 @@ public List<QueryQueueDefinition> match(Session session)

return queues;
}

List<QueryQueueDefinition> getQueues()
{
return queues;
}
}
Expand Up @@ -19,9 +19,16 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.jgrapht.DirectedGraph;
import org.jgrapht.GraphPath;
import org.jgrapht.Graphs;
import org.jgrapht.alg.FloydWarshallShortestPaths;
import org.jgrapht.graph.DefaultEdge;
import org.jgrapht.graph.DirectedPseudograph;
import org.weakref.jmx.MBeanExporter;
import org.weakref.jmx.ObjectNames;

Expand All @@ -45,6 +52,7 @@
import static com.facebook.presto.spi.StandardErrorCode.USER_ERROR;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.String.format;

@ThreadSafe
public class SqlQueryQueueManager
Expand Down Expand Up @@ -86,6 +94,61 @@ public SqlQueryQueueManager(QueryManagerConfig config, ObjectMapper mapper, MBea
}
}
this.rules = rules.build();
checkIsDAG(this.rules);
}

private void checkIsDAG(List<QueryQueueRule> rules)
{
DirectedPseudograph<String, DefaultEdge> graph = new DirectedPseudograph<String, DefaultEdge>(DefaultEdge.class);
for (QueryQueueRule rule : rules) {
String lastQueueName = null;
for (QueryQueueDefinition queue : rule.getQueues()) {
String currentQueueName = queue.getTemplate();
graph.addVertex(currentQueueName);
if (lastQueueName != null) {
graph.addEdge(lastQueueName, currentQueueName);
}
lastQueueName = currentQueueName;
}
}

List<String> shortestCycle = shortestCycle(graph);

if (shortestCycle != null) {
String s = Joiner.on(", ").join(shortestCycle);
throw new IllegalArgumentException(format("Queues must not contain a cycle. The shortest cycle found is [%s]", s));
}
}

private static List<String> shortestCycle(DirectedGraph<String, DefaultEdge> graph)
{
FloydWarshallShortestPaths<String, DefaultEdge> floyd = new FloydWarshallShortestPaths<>(graph);
int minDistance = Integer.MAX_VALUE;
String minSource = null;
String minDestination = null;
for (DefaultEdge edge : graph.edgeSet()) {
String src = graph.getEdgeSource(edge);
String dst = graph.getEdgeTarget(edge);
int dist = (int) Math.round(floyd.shortestDistance(dst, src)); // from dst to src
if (dist < 0) {
continue;
}
if (dist < minDistance) {
minDistance = dist;
minSource = src;
minDestination = dst;
}
}
if (minSource == null) {
return null;
}
GraphPath<String, DefaultEdge> shortestPath = floyd.getShortestPath(minDestination, minSource);
List<String> pathVertexList = Graphs.getPathVertexList(shortestPath);
// note: pathVertexList will be [a, a] instead of [a] when the shortest path is a loop edge
if (shortestPath.getStartVertex() != shortestPath.getEndVertex()) {
pathVertexList.add(pathVertexList.get(0));
}
return pathVertexList;
}

@Override
Expand Down
Expand Up @@ -18,15 +18,38 @@
import org.weakref.jmx.MBeanExporter;

import java.lang.management.ManagementFactory;
import java.util.regex.Pattern;

import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

public class TestSqlQueryQueueManager
{
@Test
public void testJsonParsing()
{
String path = this.getClass().getClassLoader().getResource("queue_config.json").getPath();
parse("queue_config.json");
assertFails("queue_config_bad_cycle.json", "Queues must not contain a cycle. The shortest cycle found is \\[q(.), q., q., q., q\\1\\]");
assertFails("queue_config_bad_selfcycle.json", "Queues must not contain a cycle. The shortest cycle found is \\[q1, q1\\]");
}

private void parse(String fileName)
{
String path = this.getClass().getClassLoader().getResource(fileName).getPath();
QueryManagerConfig config = new QueryManagerConfig();
config.setQueueConfigFile(path);
new SqlQueryQueueManager(config, new ObjectMapperProvider().get(), new MBeanExporter(ManagementFactory.getPlatformMBeanServer()));
}

private void assertFails(String fileName, String expectedPattern)
{
try {
parse(fileName);
fail("Expected to throw an IllegalArgumentException with message " + expectedPattern);
}
catch (IllegalArgumentException e) {
assertTrue(Pattern.matches(expectedPattern, e.getMessage()),
"\nExpected (re) :" + expectedPattern + "\nActual :" + e.getMessage());
}
}
}
47 changes: 47 additions & 0 deletions presto-main/src/test/resources/queue_config_bad_cycle.json
@@ -0,0 +1,47 @@
{
"queues": {
"q0": {
"maxConcurrent": 5,
"maxQueued": 20
},
"q1": {
"maxConcurrent": 5,
"maxQueued": 20
},
"q2": {
"maxConcurrent": 5,
"maxQueued": 20
},
"q3": {
"maxConcurrent": 5,
"maxQueued": 20
},
"q4": {
"maxConcurrent": 5,
"maxQueued": 20
},
"q5": {
"maxConcurrent": 5,
"maxQueued": 20
}
},
"rules": [
{
"user": "bob",
"queues": [
"q0",
"q1",
"q2",
"q3",
"q4"
]
},
{
"queues": [
"q4",
"q5",
"q2"
]
}
]
}
16 changes: 16 additions & 0 deletions presto-main/src/test/resources/queue_config_bad_selfcycle.json
@@ -0,0 +1,16 @@
{
"queues": {
"q1": {
"maxConcurrent": 5,
"maxQueued": 20
}
},
"rules": [
{
"queues": [
"q1",
"q1"
]
}
]
}

0 comments on commit d5c25d9

Please sign in to comment.