Skip to content

Commit

Permalink
Merge pull request Azure#819 from jianghaolu/async
Browse files Browse the repository at this point in the history
Basic prototyping of async in fluent
  • Loading branch information
Martin Sawicki committed Jun 17, 2016
2 parents 999fb18 + 46a53cd commit 57288f4
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

package com.microsoft.azure;

import com.microsoft.rest.ServiceCall;
import com.microsoft.rest.ServiceCallback;

/**
* Represents a group of related tasks.
* <p>
Expand Down Expand Up @@ -39,6 +42,11 @@ public interface TaskGroup<T, U extends TaskItem<T>> {
*/
void merge(TaskGroup<T, U> parentTaskGroup);

/**
* Prepare the graph for execution.
*/
void prepare();

/**
* Executes the tasks in the group.
* <p>
Expand All @@ -48,6 +56,14 @@ public interface TaskGroup<T, U extends TaskItem<T>> {
*/
void execute() throws Exception;

/**
* Executes the tasks in the group asynchronously.
*
* @param callback the callback to call on failure or success
* @return the handle to the REST call
*/
ServiceCall executeAsync(ServiceCallback<Void> callback);

/**
* Gets the result of execution of a task in the group.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,30 @@

package com.microsoft.azure;

import com.microsoft.rest.ServiceCall;
import com.microsoft.rest.ServiceCallback;

/**
* The base implementation of TaskGroup interface.
*
* @param <T> the result type of the tasks in the group
* @param <U> type representing task in the group
*/
public abstract class TaskGroupBase<T, U extends TaskItem<T>>
implements TaskGroup<T, U> {
private DAGraph<U, DAGNode<U>> dag;
public abstract class TaskGroupBase<T>
implements TaskGroup<T, TaskItem<T>> {
private DAGraph<TaskItem<T>, DAGNode<TaskItem<T>>> dag;

/**
* Creates TaskGroupBase.
*
* @param rootTaskItemId the id of the root task in this task group
* @param rootTaskItem the root task
*/
public TaskGroupBase(String rootTaskItemId, U rootTaskItem) {
public TaskGroupBase(String rootTaskItemId, TaskItem<T> rootTaskItem) {
this.dag = new DAGraph<>(new DAGNode<>(rootTaskItemId, rootTaskItem));
}

@Override
public DAGraph<U, DAGNode<U>> dag() {
public DAGraph<TaskItem<T>, DAGNode<TaskItem<T>>> dag() {
return dag;
}

Expand All @@ -38,27 +40,51 @@ public boolean isRoot() {
}

@Override
public void merge(TaskGroup<T, U> parentTaskGroup) {
public void merge(TaskGroup<T, TaskItem<T>> parentTaskGroup) {
dag.merge(parentTaskGroup.dag());
}

@Override
public void execute() throws Exception {
public void prepare() {
if (isRoot()) {
dag.prepare();
DAGNode<U> nextNode = dag.getNext();
while (nextNode != null) {
if (dag.isRootNode(nextNode)) {
executeRootTask(nextNode.data());
} else {
// TaskGroupBase::execute will be called both in update and create
// scenarios, so run the task only if it not not executed already.
if (nextNode.data().result() == null) {
nextNode.data().execute();
}
}
dag.reportedCompleted(nextNode);
nextNode = dag.getNext();
}
}

@Override
public void execute() throws Exception {
DAGNode<TaskItem<T>> nextNode = dag.getNext();
if (nextNode == null) {
return;
}

if (dag.isRootNode(nextNode)) {
executeRootTask(nextNode.data());
} else {
// TaskGroupBase::execute will be called both in update and create
// scenarios, so run the task only if it not not executed already.
if (nextNode.data().result() == null) {
nextNode.data().execute(this, nextNode);
}
}
}

@Override
public ServiceCall executeAsync(final ServiceCallback<Void> callback) {
final DAGNode<TaskItem<T>> nextNode = dag.getNext();
if (nextNode == null) {
return null;
}

if (dag.isRootNode(nextNode)) {
return executeRootTaskAsync(nextNode.data(), callback);
} else {
// TaskGroupBase::execute will be called both in update and create
// scenarios, so run the task only if it not not executed already.
if (nextNode.data().result() == null) {
return nextNode.data().executeAsync(this, nextNode, callback);
} else {
return null;
}
}
}
Expand All @@ -78,5 +104,18 @@ public T taskResult(String taskId) {
* @param task the root task in this group
* @throws Exception the exception
*/
public abstract void executeRootTask(U task) throws Exception;
public abstract void executeRootTask(TaskItem<T> task) throws Exception;

/**
* executes the root task in this group asynchronously.
* <p>
* this method will be invoked when all the task dependencies of the root task are finished
* executing, at this point root task can be executed by consuming the result of tasks it
* depends on.
*
* @param task the root task in this group
* @param callback the callback when the task fails or succeeds
* @return the handle to the REST call
*/
public abstract ServiceCall executeRootTaskAsync(TaskItem<T> task, ServiceCallback<Void> callback);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

package com.microsoft.azure;

import com.microsoft.rest.ServiceCall;
import com.microsoft.rest.ServiceCallback;

/**
* Type representing a task in a task group {@link TaskGroup}.
*
Expand All @@ -22,7 +25,22 @@ public interface TaskItem<U> {
* Executes the task.
* <p>
* once executed the result will be available through result getter
*
* @param taskGroup the task group dispatching tasks
* @param node the node the task item is associated with
* @throws Exception exception
*/
void execute() throws Exception;
void execute(TaskGroup<U, TaskItem<U>> taskGroup, DAGNode<TaskItem<U>> node) throws Exception;

/**
* Executes the task asynchronously.
* <p>
* once executed the result will be available through result getter
* @param taskGroup the task group dispatching tasks
* @param node the node the task item is associated with
* @param callback callback to call on success or failure
* @return the handle of the REST call
*/
ServiceCall executeAsync(TaskGroup<U, TaskItem<U>> taskGroup, DAGNode<TaskItem<U>> node, ServiceCallback<Void> callback);
}

0 comments on commit 57288f4

Please sign in to comment.