Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
v5.1.2
------

v5.1.1
------
* Add "transformWith" to Task API which performs transformation by executing a task

v5.1.0
------
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
version=5.1.0
version=5.1.1
group=com.linkedin.parseq
org.gradle.parallel=true
76 changes: 76 additions & 0 deletions subprojects/parseq/src/main/java/com/linkedin/parseq/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,82 @@ default <R> Task<R> transform(final Function1<Try<T>, Try<R>> func) {
return transform("transform: " + _taskDescriptor.getDescription(func.getClass().getName()), func);
}

/**
* Create a new task that will transform the result of this task.
* Returned task will complete with value calculated by a task returned by the function.
*
* This is similar to {@link #transform(String, Function1)} except the transformation is done by executing the task returned
* by the function.
Comment thread
junchuanwang marked this conversation as resolved.
*
* <blockquote><pre>
* boolean writeToDB(String content) {...}
*
* Task{@code <String>} pictureBase64= ...
*
* // this task will complete with either complete successfully
* // with uploadResult being true or false, or fail with MyLibException
* Task{@code <Boolean>} uploadResult = pictureBase64.transformWith("transformUsingATask", t {@code ->} {
* if (!t.isFailed()) {
* return Task.blocking(() -> writeToDB(t.get()), executor));
* }
* return Task.failure(new MyLibException(t.getError());
* });
* <img src="doc-files/transformWith-1.png" height="90" width="296"/>
*
* @param desc description
* @param func function to be applied to the result of this task which returns new task
* to be executed
* @param <R> value type of the returned task returned by function <code>func<</code>
* @return a new task which will apply given function on result of either successful and failed completion of this task
* to get instance of a task which will be executed next
*/
default <R> Task<R> transformWith(final String desc, final Function1<Try<T>, Task<R>> func) {
ArgumentUtil.requireNotNull(func, "function");
final Task<T> that = this;
Task<R> transformWithTask = async(desc, context -> {
final SettablePromise<R> result = Promises.settable();
final Task<R> transform = async("transform", ctx -> {
final SettablePromise<R> transformResult = Promises.settable();
if (that.isFailed() && (Exceptions.isCancellation(that.getError()))) {
//cancellations will not be propagated as other errors to the function to get the task to execute.
transformResult.fail(that.getError());
}
else {
final Try<T> tryT = Promises.toTry(that);
try {
Task<R> r = func.apply(tryT);
if (r == null) {
throw new RuntimeException(desc + " returned null");
}
Promises.propagateResult(r, transformResult);
ctx.run(r);
} catch (Throwable t) {
transformResult.fail(t);
}
}
return transformResult;
});
transform.getShallowTraceBuilder().setSystemHidden(true);
transform.getShallowTraceBuilder().setTaskType(TaskType.TRANSFORM.getName());
Promises.propagateResult(transform, result);
context.after(that).run(transform);
context.run(that);
return result;
});
transformWithTask.getShallowTraceBuilder().setTaskType(TaskType.WITH_TRANSFORM.getName());
return transformWithTask;

}

/**
* Equivalent to {@code transformWith("transformWith", func)}.
* @see #transformWith(String, Function1)
*/
default <R> Task<R> transformWith(final Function1<Try<T>, Task<R>> func) {
return transformWith("transform: " + _taskDescriptor.getDescription(func.getClass().getName()), func);
}


/**
* Creates a new task that will handle failure of this task.
* Early completion due to cancellation is not considered to be a failure so it will not be recovered.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ public enum TaskType {
WITH_TIMEOUT ("withTimeout"),
RECOVER ("recover"),
WITH_RECOVER ("withRecover"),
TRANSFORM ("transform"),
WITH_TRANSFORM ("withTransform"),
WITH_DELAY ("withDelay");

private final String _name;
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,62 @@ public void testTransformFailureToFailure() {
assertSame(transformed.getError(), failureReason);
}

@Test
public void testTransformWith_SuccessToFailure() {
String msg = "transform failed";
Task<String> success = getSuccessTask();
Task<Integer> transformed = success.transformWith(tryT ->
Task.callable(() -> {throw new RuntimeException(msg);}));
try {
runAndWait("AbstractTaskTest.testTransformWith_SuccessToFailure", transformed);
} catch (Exception ex) {
assertEquals(ex.getCause().getMessage(), msg);
}
}

@Test
public void testTransformWith_SuccessToSuccess() {
Task<String> success = getSuccessTask();
Task<Integer> transformed = success.transformWith(tryT ->
Task.callable(() -> tryT.get().length()));
runAndWait("AbstractTaskTest.testTransformWith_SuccessToSuccess", transformed);
assertEquals(transformed.get().intValue(), success.get().length());
}

@Test
public void testTransformWith_FailureToSuccess() {
int returnValue = 100;
Task<String> failed = getFailureTask();
Task<Integer> transformed = failed.transformWith(tryT ->
Task.callable(() -> returnValue));
runAndWait("AbstractTaskTest.testTransformWith_FailureToSuccess", transformed);
}

@Test
public void testTransformWith_FailureToFailure() {
String msg = "transform failed";
Task<String> failed = getFailureTask();
Task<Integer> transformed = failed.transformWith(tryT ->
Task.callable(() -> {throw new RuntimeException(msg);}));
try {
runAndWait("AbstractTaskTest.testTransformWith_FailureToFailure", transformed);
} catch (Exception ex) {
assertEquals(ex.getCause().getMessage(), msg);
}
}

@Test
public void testTransformWith_Cancelled() {
Task<String> cancelled = getCancelledTask().recoverWith(e -> Task.callable("recover success", () -> "recovered"));
try {
runAndWait("AbstractTaskTest.testTransformWith_Cancelled", cancelled);
fail("should have failed");
} catch (Exception ex) {
assertTrue(cancelled.isFailed());
assertTrue(Exceptions.isCancellation(cancelled.getError()));
}
}

@Test
public void testFlatten() {
Task<Task<String>> nested = Task.callable(() -> getSuccessTask());
Expand Down