Skip to content

Commit

Permalink
Add async signal to untypedstub (#527)
Browse files Browse the repository at this point in the history
  • Loading branch information
vancexu committed Sep 18, 2020
1 parent 7c0e2ab commit d98d33f
Show file tree
Hide file tree
Showing 10 changed files with 297 additions and 17 deletions.
5 changes: 5 additions & 0 deletions src/main/java/com/uber/cadence/client/WorkflowStub.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ static <T> WorkflowStub fromTyped(T typed) {

void signal(String signalName, Object... args);

CompletableFuture<Void> signalAsync(String signalName, Object... args);

CompletableFuture<Void> signalAsyncWithTimeout(
long timeout, TimeUnit unit, String signalName, Object... args);

WorkflowExecution start(Object... args);

CompletableFuture<WorkflowExecution> startAsync(Object... args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ CompletableFuture<WorkflowExecution> startWorkflowAsync(

void signalWorkflowExecution(SignalExternalWorkflowParameters signalParameters);

CompletableFuture<Void> signalWorkflowExecutionAsync(
SignalExternalWorkflowParameters signalParameters);

CompletableFuture<Void> signalWorkflowExecutionAsync(
SignalExternalWorkflowParameters signalParameters, Long timeoutInMillis);

WorkflowExecution signalWithStartWorkflowExecution(
SignalWithStartWorkflowExecutionParameters parameters);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,15 +269,8 @@ private RetryPolicy toRetryPolicy(RetryParameters retryParameters) {

@Override
public void signalWorkflowExecution(SignalExternalWorkflowParameters signalParameters) {
SignalWorkflowExecutionRequest request = new SignalWorkflowExecutionRequest();
request.setDomain(domain);
SignalWorkflowExecutionRequest request = getSignalRequest(signalParameters);

request.setInput(signalParameters.getInput());
request.setSignalName(signalParameters.getSignalName());
WorkflowExecution execution = new WorkflowExecution();
execution.setRunId(signalParameters.getRunId());
execution.setWorkflowId(signalParameters.getWorkflowId());
request.setWorkflowExecution(execution);
try {
Retryer.retry(
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS,
Expand All @@ -287,6 +280,55 @@ public void signalWorkflowExecution(SignalExternalWorkflowParameters signalParam
}
}

@Override
public CompletableFuture<Void> signalWorkflowExecutionAsync(
SignalExternalWorkflowParameters signalParameters) {
return signalWorkflowExecutionAsync(signalParameters, Long.MAX_VALUE);
}

@Override
public CompletableFuture<Void> signalWorkflowExecutionAsync(
SignalExternalWorkflowParameters signalParameters, Long timeoutInMillis) {
SignalWorkflowExecutionRequest request = getSignalRequest(signalParameters);
return Retryer.retryWithResultAsync(
getRetryOptionsWithExpiration(
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS, timeoutInMillis),
() -> {
CompletableFuture<Void> result = new CompletableFuture<>();
try {
service.SignalWorkflowExecution(
request,
new AsyncMethodCallback() {
@Override
public void onComplete(Object response) {
result.complete(null);
}

@Override
public void onError(Exception exception) {
result.completeExceptionally(exception);
}
});
} catch (TException e) {
result.completeExceptionally(e);
}
return result;
});
}

private SignalWorkflowExecutionRequest getSignalRequest(
SignalExternalWorkflowParameters signalParameters) {
SignalWorkflowExecutionRequest request = new SignalWorkflowExecutionRequest();
request.setDomain(domain);
request.setInput(signalParameters.getInput());
request.setSignalName(signalParameters.getSignalName());
WorkflowExecution execution = new WorkflowExecution();
execution.setRunId(signalParameters.getRunId());
execution.setWorkflowId(signalParameters.getWorkflowId());
request.setWorkflowExecution(execution);
return request;
}

@Override
public WorkflowExecution signalWithStartWorkflowExecution(
SignalWithStartWorkflowExecutionParameters parameters) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,15 @@ public void SignalWorkflowExecution(
impl.SignalWorkflowExecution(signalRequest, resultHandler);
}

@Override
public void SignalWorkflowExecutionWithTimeout(
SignalWorkflowExecutionRequest signalRequest,
AsyncMethodCallback resultHandler,
Long timeoutInMillis)
throws TException {
impl.SignalWorkflowExecutionWithTimeout(signalRequest, resultHandler, timeoutInMillis);
}

@Override
public void SignalWithStartWorkflowExecution(
SignalWithStartWorkflowExecutionRequest signalWithStartRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,15 @@ public void SignalWorkflowExecution(
impl.SignalWorkflowExecution(signalRequest, resultHandler);
}

@Override
public void SignalWorkflowExecutionWithTimeout(
SignalWorkflowExecutionRequest signalRequest,
AsyncMethodCallback resultHandler,
Long timeoutInMillis)
throws TException {
impl.SignalWorkflowExecutionWithTimeout(signalRequest, resultHandler, timeoutInMillis);
}

@Override
public void SignalWithStartWorkflowExecution(
SignalWithStartWorkflowExecutionRequest signalWithStartRequest,
Expand Down Expand Up @@ -806,6 +815,17 @@ public void signal(String signalName, Object... args) {
next.signal(signalName, args);
}

@Override
public CompletableFuture<Void> signalAsync(String signalName, Object... args) {
return next.signalAsync(signalName, args);
}

@Override
public CompletableFuture<Void> signalAsyncWithTimeout(
long timeout, TimeUnit unit, String signalName, Object... args) {
return next.signalAsyncWithTimeout(timeout, unit, signalName, args);
}

@Override
public WorkflowExecution start(Object... args) {
return next.start(args);
Expand Down
28 changes: 23 additions & 5 deletions src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,28 @@ class WorkflowStubImpl implements WorkflowStub {

@Override
public void signal(String signalName, Object... input) {
SignalExternalWorkflowParameters p = getSignalExternalWorkflowParameters(signalName, input);
try {
genericClient.signalWorkflowExecution(p);
} catch (Exception e) {
throw new WorkflowServiceException(execution.get(), workflowType, e);
}
}

@Override
public CompletableFuture<Void> signalAsync(String signalName, Object... input) {
return signalAsyncWithTimeout(Long.MAX_VALUE, TimeUnit.MILLISECONDS, signalName, input);
}

@Override
public CompletableFuture<Void> signalAsyncWithTimeout(
long timeout, TimeUnit unit, String signalName, Object... input) {
SignalExternalWorkflowParameters p = getSignalExternalWorkflowParameters(signalName, input);
return genericClient.signalWorkflowExecutionAsync(p, unit.toMillis(timeout));
}

private SignalExternalWorkflowParameters getSignalExternalWorkflowParameters(
String signalName, Object... input) {
checkStarted();
SignalExternalWorkflowParameters p = new SignalExternalWorkflowParameters();
p.setInput(dataConverter.toData(input));
Expand All @@ -105,11 +127,7 @@ public void signal(String signalName, Object... input) {
// TODO: Deal with signaling started workflow only, when requested
// Commented out to support signaling workflows that called continue as new.
// p.setRunId(execution.getRunId());
try {
genericClient.signalWorkflowExecution(p);
} catch (Exception e) {
throw new WorkflowServiceException(execution.get(), workflowType, e);
}
return p;
}

private WorkflowExecution startWithOptions(WorkflowOptions o, Object... args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -936,7 +936,24 @@ public void RequestCancelWorkflowExecution(
public void SignalWorkflowExecution(
SignalWorkflowExecutionRequest signalRequest, AsyncMethodCallback resultHandler)
throws TException {
throw new UnsupportedOperationException("not implemented");
SignalWorkflowExecutionWithTimeout(signalRequest, resultHandler, null);
}

@Override
public void SignalWorkflowExecutionWithTimeout(
SignalWorkflowExecutionRequest signalRequest,
AsyncMethodCallback resultHandler,
Long timeoutInMillis)
throws TException {
forkJoinPool.execute(
() -> {
try {
SignalWorkflowExecution(signalRequest);
resultHandler.onComplete(null);
} catch (TException e) {
resultHandler.onError(e);
}
});
}

@Override
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/com/uber/cadence/serviceclient/IWorkflowService.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.uber.cadence.GetWorkflowExecutionHistoryRequest;
import com.uber.cadence.GetWorkflowExecutionHistoryResponse;
import com.uber.cadence.SignalWorkflowExecutionRequest;
import com.uber.cadence.StartWorkflowExecutionRequest;
import com.uber.cadence.WorkflowService.AsyncIface;
import com.uber.cadence.WorkflowService.Iface;
Expand Down Expand Up @@ -69,4 +70,18 @@ void GetWorkflowExecutionHistoryWithTimeout(
AsyncMethodCallback resultHandler,
Long timeoutInMillis)
throws TException;
/**
* SignalWorkflowExecutionWithTimeout signal workflow same as SignalWorkflowExecution but with
* timeout
*
* @param signalRequest
* @param resultHandler
* @param timeoutInMillis
* @throws TException
*/
void SignalWorkflowExecutionWithTimeout(
SignalWorkflowExecutionRequest signalRequest,
AsyncMethodCallback resultHandler,
Long timeoutInMillis)
throws TException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2551,9 +2551,82 @@ public void RequestCancelWorkflowExecution(

@Override
public void SignalWorkflowExecution(
SignalWorkflowExecutionRequest signalRequest, AsyncMethodCallback resultHandler)
throws TException {
throw new UnsupportedOperationException("not implemented");
SignalWorkflowExecutionRequest signalRequest, AsyncMethodCallback resultHandler) {
signalWorkflowExecution(signalRequest, resultHandler, null);
}

@Override
public void SignalWorkflowExecutionWithTimeout(
SignalWorkflowExecutionRequest signalRequest,
AsyncMethodCallback resultHandler,
Long timeoutInMillis) {
signalWorkflowExecution(signalRequest, resultHandler, timeoutInMillis);
}

private void signalWorkflowExecution(
SignalWorkflowExecutionRequest signalRequest,
AsyncMethodCallback resultHandler,
Long timeoutInMillis) {

timeoutInMillis = validateAndUpdateTimeout(timeoutInMillis, options.getRpcTimeoutMillis());
ThriftRequest<WorkflowService.SignalWorkflowExecution_args> request =
buildThriftRequest(
"SignalWorkflowExecution",
new WorkflowService.SignalWorkflowExecution_args(signalRequest),
timeoutInMillis);
CompletableFuture<ThriftResponse<WorkflowService.SignalWorkflowExecution_result>> response =
doRemoteCallAsync(request);
response
.whenComplete(
(r, e) -> {
try {
if (e != null) {
resultHandler.onError(CheckedExceptionWrapper.wrap(e));
return;
}
WorkflowService.SignalWorkflowExecution_result result =
r.getBody(WorkflowService.SignalWorkflowExecution_result.class);
if (r.getResponseCode() == ResponseCode.OK) {
resultHandler.onComplete(null);
return;
}
if (result.isSetBadRequestError()) {
resultHandler.onError(result.getBadRequestError());
return;
}
if (result.isSetEntityNotExistError()) {
resultHandler.onError(result.getEntityNotExistError());
return;
}
if (result.isSetServiceBusyError()) {
resultHandler.onError(result.getServiceBusyError());
return;
}
if (result.isSetDomainNotActiveError()) {
resultHandler.onError(result.getDomainNotActiveError());
return;
}
if (result.isSetLimitExceededError()) {
resultHandler.onError(result.getLimitExceededError());
return;
}
if (result.isSetClientVersionNotSupportedError()) {
resultHandler.onError(result.getClientVersionNotSupportedError());
return;
}
resultHandler.onError(
new TException("SignalWorkflowExecution failed with unknown error:" + result));
} finally {
if (r != null) {
r.release();
}
}
})
.exceptionally(
(e) -> {
log.error("Unexpected error in SignalWorkflowExecution", e);
return null;
});
}

@Override
Expand Down

0 comments on commit d98d33f

Please sign in to comment.