diff --git a/CHANGELOG.md b/CHANGELOG.md index cfc553e8..6de7e0f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ +v5.1.2 +------ + v5.1.1 ------ +* Add "transformWith" to Task API which performs transformation by executing a task v5.1.0 ------ diff --git a/gradle.properties b/gradle.properties index 132e0243..86e1e8e3 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -version=5.1.0 +version=5.1.1 group=com.linkedin.parseq org.gradle.parallel=true diff --git a/subprojects/parseq/src/main/java/com/linkedin/parseq/Task.java b/subprojects/parseq/src/main/java/com/linkedin/parseq/Task.java index 7e275703..fcd8b186 100644 --- a/subprojects/parseq/src/main/java/com/linkedin/parseq/Task.java +++ b/subprojects/parseq/src/main/java/com/linkedin/parseq/Task.java @@ -845,6 +845,82 @@ default Task transform(final Function1, Try> func) { return transform("transform: " + _taskDescriptor.getDescription(func.getClass().getName()), func); } + /** + * Create a new task that will transform the result of this task. + * Returned task will complete with value calculated by a task returned by the function. + * + * This is similar to {@link #transform(String, Function1)} except the transformation is done by executing the task returned + * by the function. + * + *
+   * boolean writeToDB(String content) {...}
+   *
+   * Task{@code } pictureBase64= ...
+   *
+   * // this task will complete with either complete successfully
+   * // with uploadResult being true or false, or fail with  MyLibException
+   * Task{@code } uploadResult = pictureBase64.transformWith("transformUsingATask", t {@code ->} {
+   *   if (!t.isFailed()) {
+   *     return Task.blocking(() -> writeToDB(t.get()), executor));
+   *   }
+   *   return Task.failure(new MyLibException(t.getError());
+   * });
+   * 
+   *
+   * @param desc description
+   * @param func function to be applied to the result of this task which returns new task
+   *    to be executed
+   * @param  value type of the returned task returned by function func<
+   * @return a new task which will apply given function on result of either successful and failed completion of this task
+   *     to get instance of a task which will be executed next
+   */
+  default  Task transformWith(final String desc, final Function1, Task> func) {
+    ArgumentUtil.requireNotNull(func, "function");
+    final Task that = this;
+    Task transformWithTask = async(desc, context -> {
+      final SettablePromise result = Promises.settable();
+      final Task transform = async("transform", ctx -> {
+        final SettablePromise transformResult = Promises.settable();
+        if (that.isFailed() && (Exceptions.isCancellation(that.getError()))) {
+          //cancellations will not be propagated as other errors to the function to get the task to execute.
+          transformResult.fail(that.getError());
+        }
+        else {
+          final Try tryT = Promises.toTry(that);
+          try {
+            Task r = func.apply(tryT);
+            if (r == null) {
+              throw new RuntimeException(desc + " returned null");
+            }
+            Promises.propagateResult(r, transformResult);
+            ctx.run(r);
+          } catch (Throwable t) {
+            transformResult.fail(t);
+          }
+        }
+        return transformResult;
+      });
+      transform.getShallowTraceBuilder().setSystemHidden(true);
+      transform.getShallowTraceBuilder().setTaskType(TaskType.TRANSFORM.getName());
+      Promises.propagateResult(transform, result);
+      context.after(that).run(transform);
+      context.run(that);
+      return result;
+    });
+    transformWithTask.getShallowTraceBuilder().setTaskType(TaskType.WITH_TRANSFORM.getName());
+    return transformWithTask;
+
+  }
+
+  /**
+   * Equivalent to {@code transformWith("transformWith", func)}.
+   * @see #transformWith(String, Function1)
+   */
+  default  Task transformWith(final Function1, Task> func) {
+    return transformWith("transform: " + _taskDescriptor.getDescription(func.getClass().getName()), func);
+  }
+
+
   /**
    * Creates a new task that will handle failure of this task.
    * Early completion due to cancellation is not considered to be a failure so it will not be recovered.
diff --git a/subprojects/parseq/src/main/java/com/linkedin/parseq/TaskType.java b/subprojects/parseq/src/main/java/com/linkedin/parseq/TaskType.java
index 4e385e9a..d8b63ac4 100644
--- a/subprojects/parseq/src/main/java/com/linkedin/parseq/TaskType.java
+++ b/subprojects/parseq/src/main/java/com/linkedin/parseq/TaskType.java
@@ -16,6 +16,8 @@ public enum TaskType {
   WITH_TIMEOUT ("withTimeout"),
   RECOVER ("recover"),
   WITH_RECOVER ("withRecover"),
+  TRANSFORM ("transform"),
+  WITH_TRANSFORM ("withTransform"),
   WITH_DELAY ("withDelay");
 
   private final String _name;
diff --git a/subprojects/parseq/src/main/java/com/linkedin/parseq/doc-files/transformWith-1.png b/subprojects/parseq/src/main/java/com/linkedin/parseq/doc-files/transformWith-1.png
new file mode 100644
index 00000000..bc51bd78
Binary files /dev/null and b/subprojects/parseq/src/main/java/com/linkedin/parseq/doc-files/transformWith-1.png differ
diff --git a/subprojects/parseq/src/test/java/com/linkedin/parseq/AbstractTaskTest.java b/subprojects/parseq/src/test/java/com/linkedin/parseq/AbstractTaskTest.java
index dedfe288..afaafa19 100644
--- a/subprojects/parseq/src/test/java/com/linkedin/parseq/AbstractTaskTest.java
+++ b/subprojects/parseq/src/test/java/com/linkedin/parseq/AbstractTaskTest.java
@@ -439,6 +439,62 @@ public void testTransformFailureToFailure() {
     assertSame(transformed.getError(), failureReason);
   }
 
+  @Test
+  public void testTransformWith_SuccessToFailure() {
+    String msg = "transform failed";
+    Task success = getSuccessTask();
+    Task transformed = success.transformWith(tryT ->
+      Task.callable(() -> {throw new RuntimeException(msg);}));
+    try {
+      runAndWait("AbstractTaskTest.testTransformWith_SuccessToFailure", transformed);
+    } catch (Exception ex) {
+      assertEquals(ex.getCause().getMessage(), msg);
+    }
+  }
+
+  @Test
+  public void testTransformWith_SuccessToSuccess() {
+    Task success = getSuccessTask();
+    Task transformed = success.transformWith(tryT ->
+        Task.callable(() -> tryT.get().length()));
+    runAndWait("AbstractTaskTest.testTransformWith_SuccessToSuccess", transformed);
+    assertEquals(transformed.get().intValue(), success.get().length());
+  }
+
+  @Test
+  public void testTransformWith_FailureToSuccess() {
+    int returnValue = 100;
+    Task failed = getFailureTask();
+    Task transformed = failed.transformWith(tryT ->
+        Task.callable(() -> returnValue));
+    runAndWait("AbstractTaskTest.testTransformWith_FailureToSuccess", transformed);
+  }
+
+  @Test
+  public void testTransformWith_FailureToFailure() {
+    String msg = "transform failed";
+    Task failed = getFailureTask();
+    Task transformed = failed.transformWith(tryT ->
+        Task.callable(() -> {throw new RuntimeException(msg);}));
+    try {
+      runAndWait("AbstractTaskTest.testTransformWith_FailureToFailure", transformed);
+    } catch (Exception ex) {
+      assertEquals(ex.getCause().getMessage(), msg);
+    }
+  }
+
+  @Test
+  public void testTransformWith_Cancelled() {
+    Task cancelled = getCancelledTask().recoverWith(e -> Task.callable("recover success", () -> "recovered"));
+    try {
+      runAndWait("AbstractTaskTest.testTransformWith_Cancelled", cancelled);
+      fail("should have failed");
+    } catch (Exception ex) {
+      assertTrue(cancelled.isFailed());
+      assertTrue(Exceptions.isCancellation(cancelled.getError()));
+    }
+  }
+
   @Test
   public void testFlatten() {
     Task> nested = Task.callable(() -> getSuccessTask());