Skip to content

Commit

Permalink
Merge commit '9934126e3d2115bedd890f8226cdf1c2c6d39a48' into unwrap
Browse files Browse the repository at this point in the history
  • Loading branch information
jianghaolu committed Sep 6, 2016
2 parents ac146e8 + 670b55e commit c78b9c5
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ public PagedList() {
*/
public PagedList(Page<E> page) {
this();
items.addAll(page.getItems());
List<E> retrievedItems = page.getItems();
if (retrievedItems != null && retrievedItems.size() != 0) {
items.addAll(retrievedItems);
}
nextPageLink = page.getNextPageLink();
currentPage = page;
}
Expand Down Expand Up @@ -138,14 +141,17 @@ public boolean hasNext() {
public E next() {
if (!itemsListItr.hasNext()) {
if (!hasNextPage()) {
throw new NoSuchElementException();
throw new NoSuchElementException();
} else {
int size = items.size();
loadNextPage();
itemsListItr = items.listIterator(size);
}
}
return itemsListItr.next();
if (itemsListItr.hasNext()) {
return itemsListItr.next();
}
return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,6 @@ public interface TaskGroup<T, U extends TaskItem<T>> {
*/
void prepare();

/**
* Executes the tasks in the group.
* <p>
* the order of execution of tasks ensure that a task gets selected for execution only after
* the execution of all the tasks it depends on
* @throws Exception the exception
*/
void execute() throws Exception;

/**
* Executes the tasks in the group asynchronously.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,31 +58,8 @@ public void prepare() {
}
}

@Override
public void execute() throws Exception {
DAGNode<U> nextNode = dag.getNext();
while (nextNode != null) {
nextNode.data().execute();
this.dag().reportedCompleted(nextNode);
nextNode = dag.getNext();
}
}

@Override
public Observable<T> executeAsync() {
return executeReadyTasksAsync().last();
}

@Override
public T taskResult(String taskId) {
return dag.getNodeData(taskId).result();
}

/**
* Executes all runnable tasks, a task is runnable when all the tasks its depends
* on are finished running.
*/
private Observable<T> executeReadyTasksAsync() {
DAGNode<U> nextNode = dag.getNext();
final List<Observable<T>> observables = new ArrayList<>();
while (nextNode != null) {
Expand All @@ -95,12 +72,17 @@ public Observable<T> call(T t) {
if (dag().isRootNode(thisNode)) {
return Observable.just(t);
} else {
return executeReadyTasksAsync();
return executeAsync();
}
}
}));
nextNode = dag.getNext();
}
return Observable.merge(observables);
}

@Override
public T taskResult(String taskId) {
return dag.getNodeData(taskId).result();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,6 @@ public interface TaskItem<U> {
*/
U result();

/**
* Executes the task.
* <p>
* once executed the result will be available through result getter
*
* @throws Exception exception
*/
void execute() throws Exception;

/**
* Executes the task asynchronously.
* <p>
Expand Down

0 comments on commit c78b9c5

Please sign in to comment.