Skip to content

Commit

Permalink
Remove redundant sync & callback based applys
Browse files Browse the repository at this point in the history
  • Loading branch information
jianghaolu committed Aug 30, 2016
1 parent 7fb2d38 commit e132607
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,6 @@ public interface TaskGroup<T, U extends TaskItem<T>> {
*/
void prepare();

/**
* Executes the tasks in the group.
* <p>
* the order of execution of tasks ensure that a task gets selected for execution only after
* the execution of all the tasks it depends on
* @throws Exception the exception
*/
void execute() throws Exception;

/**
* Executes the tasks in the group asynchronously.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,31 +58,8 @@ public void prepare() {
}
}

@Override
public void execute() throws Exception {
DAGNode<U> nextNode = dag.getNext();
while (nextNode != null) {
nextNode.data().execute();
this.dag().reportedCompleted(nextNode);
nextNode = dag.getNext();
}
}

@Override
public Observable<T> executeAsync() {
return executeReadyTasksAsync().last();
}

@Override
public T taskResult(String taskId) {
return dag.getNodeData(taskId).result();
}

/**
* Executes all runnable tasks, a task is runnable when all the tasks its depends
* on are finished running.
*/
private Observable<T> executeReadyTasksAsync() {
DAGNode<U> nextNode = dag.getNext();
final List<Observable<T>> observables = new ArrayList<>();
while (nextNode != null) {
Expand All @@ -95,12 +72,17 @@ public Observable<T> call(T t) {
if (dag().isRootNode(thisNode)) {
return Observable.just(t);
} else {
return executeReadyTasksAsync();
return executeAsync();
}
}
}));
nextNode = dag.getNext();
}
return Observable.merge(observables);
}

@Override
public T taskResult(String taskId) {
return dag.getNodeData(taskId).result();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,6 @@ public interface TaskItem<U> {
*/
U result();

/**
* Executes the task.
* <p>
* once executed the result will be available through result getter
*
* @throws Exception exception
*/
void execute() throws Exception;

/**
* Executes the task asynchronously.
* <p>
Expand Down

0 comments on commit e132607

Please sign in to comment.