Skip to content

Commit

Permalink
Making DAG to work correctly to handle more cases
Browse files Browse the repository at this point in the history
  • Loading branch information
anuchandy committed Jun 22, 2016
1 parent 45c3d88 commit ac633da
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package com.microsoft.azure;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
Expand All @@ -18,6 +19,7 @@
public class DAGNode<T> extends Node<T> {
private List<String> dependentKeys;
private int toBeResolved;
private boolean isPreparer;

/**
* Creates a DAG node.
Expand All @@ -34,7 +36,7 @@ public DAGNode(String key, T data) {
* @return a list of keys of nodes in {@link DAGraph} those are dependents on this node
*/
List<String> dependentKeys() {
return this.dependentKeys;
return Collections.unmodifiableList(this.dependentKeys);
}

/**
Expand Down Expand Up @@ -70,10 +72,27 @@ public boolean hasDependencies() {
}

/**
* prepare the node for traversal.
* Mark or un-mark this node as being preparer.
*
* @param isPreparer <tt>true</tt> if this node needs to be marked as preparer, <tt>false</tt> otherwise.
*/
public void setPreparer(boolean isPreparer) {
this.isPreparer = isPreparer;
}

/**
* @return <tt>true</tt> if this node is marked as preparer
*/
public boolean isPreparer() {
return isPreparer;
}

/**
* initialize the node so that traversal can be performed on the parent DAG.
*/
public void prepare() {
public void initialize() {
this.toBeResolved = this.dependencyKeys().size();
this.dependentKeys.clear();
}

/**
Expand Down
40 changes: 26 additions & 14 deletions azure-client-runtime/src/main/java/com/microsoft/azure/DAGraph.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class DAGraph<T, U extends DAGNode<T>> extends Graph<T, U> {
public DAGraph(U rootNode) {
this.rootNode = rootNode;
this.queue = new ArrayDeque<>();
this.rootNode.setPreparer(true);
this.addNode(rootNode);
}

Expand All @@ -52,6 +53,14 @@ public boolean isRootNode(U node) {
return this.rootNode == node;
}

/**
* @return <tt>true</tt> if this dag is the preparer responsible for
* preparing the DAG for traversal.
*/
public boolean isPreparer() {
return this.rootNode.isPreparer();
}

/**
* Merge this DAG with another DAG.
* <p>
Expand All @@ -63,7 +72,6 @@ public boolean isRootNode(U node) {
public void merge(DAGraph<T, U> parent) {
this.hasParent = true;
parent.rootNode.addDependency(this.rootNode.key());
this.rootNode.addDependent(parent.rootNode.key());
for (Map.Entry<String, U> entry: graph.entrySet()) {
String key = entry.getKey();
if (!parent.graph.containsKey(key)) {
Expand All @@ -77,13 +85,15 @@ public void merge(DAGraph<T, U> parent) {
* in the DAG with no dependencies.
*/
public void prepare() {
for (Map.Entry<String, U> entry: graph.entrySet()) {
entry.getValue().prepare();
}

initializeQueue();
if (queue.isEmpty()) {
throw new RuntimeException("Found circular dependency");
if (isPreparer()) {
for (Map.Entry<String, U> entry : graph.entrySet()) {
// Prepare each node for traversal
entry.getValue().initialize();
// Mark other sub-DAGs are non-preparer
entry.getValue().setPreparer(false);
}
initializeDependentKeys();
initializeQueue();
}
}

Expand Down Expand Up @@ -115,6 +125,7 @@ public T getNodeData(String key) {
* @param completed the node ready to be consumed
*/
public void reportedCompleted(U completed) {
completed.setPreparer(true);
String dependency = completed.key();
for (String dependentKey : graph.get(dependency).dependentKeys()) {
DAGNode<T> dependent = graph.get(dependentKey);
Expand All @@ -126,27 +137,25 @@ public void reportedCompleted(U completed) {
}

/**
* populate dependents of all nodes.
* Initializes dependents of all nodes.
* <p>
* the DAG will be explored in DFS order and all node's dependents will be identified,
* this prepares the DAG for traversal using getNext method, each call to getNext returns next node
* in the DAG with no dependencies.
*/
public void populateDependentKeys() {
this.queue.clear();
private void initializeDependentKeys() {
visit(new Visitor<U>() {
// This 'visit' will be called only once per each node.
@Override
public void visit(U node) {
if (node.dependencyKeys().isEmpty()) {
queue.add(node.key());
return;
}

String dependentKey = node.key();
for (String dependencyKey : node.dependencyKeys()) {
graph.get(dependencyKey)
.dependentKeys()
.add(dependentKey);
.addDependent(dependentKey);
}
}
});
Expand All @@ -163,5 +172,8 @@ private void initializeQueue() {
this.queue.add(entry.getKey());
}
}
if (queue.isEmpty()) {
throw new RuntimeException("Found circular dependency");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package com.microsoft.azure;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
Expand Down Expand Up @@ -57,7 +58,7 @@ public boolean hasChildren() {
* @return children (neighbours) of this node
*/
public List<String> children() {
return this.children;
return Collections.unmodifiableList(this.children);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,6 @@ public interface TaskGroup<T, U extends TaskItem<T>> {
*/
DAGraph<U, DAGNode<U>> dag();

/**
* @return <tt>true</tt> if this is a root (parent) task group composing other task groups.
*/
boolean isRoot();

/**
* Merges this task group with parent task group.
* <p>
Expand All @@ -42,6 +37,12 @@ public interface TaskGroup<T, U extends TaskItem<T>> {
*/
void merge(TaskGroup<T, U> parentTaskGroup);

/**
* @return <tt>true</tt> if the group is responsible for preparing execution of original task in
* this group and all tasks belong other task group it composes.
*/
boolean isPreparer();

/**
* Prepare the graph for execution.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public DAGraph<TaskItem<T>, DAGNode<TaskItem<T>>> dag() {
}

@Override
public boolean isRoot() {
return !dag.hasParent();
public boolean isPreparer() {
return dag.isPreparer();
}

@Override
Expand All @@ -46,7 +46,7 @@ public void merge(TaskGroup<T, TaskItem<T>> parentTaskGroup) {

@Override
public void prepare() {
if (isRoot()) {
if (isPreparer()) {
dag.prepare();
}
}
Expand All @@ -61,11 +61,7 @@ public void execute() throws Exception {
if (dag.isRootNode(nextNode)) {
executeRootTask(nextNode.data());
} else {
// TaskGroupBase::execute will be called both in update and create
// scenarios, so run the task only if it not not executed already.
if (nextNode.data().result() == null) {
nextNode.data().execute(this, nextNode);
}
nextNode.data().execute(this, nextNode);
}
}

Expand All @@ -79,13 +75,7 @@ public ServiceCall executeAsync(final ServiceCallback<Void> callback) {
if (dag.isRootNode(nextNode)) {
return executeRootTaskAsync(nextNode.data(), callback);
} else {
// TaskGroupBase::execute will be called both in update and create
// scenarios, so run the task only if it not not executed already.
if (nextNode.data().result() == null) {
return nextNode.data().executeAsync(this, nextNode, callback);
} else {
return null;
}
return nextNode.data().executeAsync(this, nextNode, callback);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void testDAGraphGetNext() {
dag.addNode(nodeH);
dag.addNode(nodeI);

dag.populateDependentKeys();
dag.prepare();
DAGNode<String> nextNode = dag.getNext();
int i = 0;
while (nextNode != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void testDAGraphGetNext() {
dag.addNode(nodeH);
dag.addNode(nodeI);

dag.populateDependentKeys();
dag.prepare();
DAGNode<String> nextNode = dag.getNext();
int i = 0;
while (nextNode != null) {
Expand Down

0 comments on commit ac633da

Please sign in to comment.