Skip to content

Commit

Permalink
Tracing support improvements (#1819)
Browse files Browse the repository at this point in the history
feat: support tracing on signals, updates, queries
  • Loading branch information
siavashkavousi committed Sep 25, 2023
1 parent 18162b7 commit d50d274
Show file tree
Hide file tree
Showing 25 changed files with 460 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,14 @@ public enum SpanOperationType {
START_CHILD_WORKFLOW("StartChildWorkflow"),
START_CONTINUE_AS_NEW_WORKFLOW("StartContinueAsNewWorkflow"),
START_ACTIVITY("StartActivity"),
RUN_ACTIVITY("RunActivity");
RUN_ACTIVITY("RunActivity"),
SIGNAL_EXTERNAL_WORKFLOW("SignalExternalWorkflow"),
QUERY_WORKFLOW("QueryWorkflow"),
SIGNAL_WORKFLOW("SignalWorkflow"),
UPDATE_WORKFLOW("UpdateWorkflow"),
HANDLE_QUERY("HandleQuery"),
HANDLE_SIGNAL("HandleSignal"),
HANDLE_UPDATE("HandleUpdate");

private final String defaultPrefix;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,20 @@ protected Map<String, String> getSpanTags(SpanCreationContext context) {
case RUN_WORKFLOW:
case START_ACTIVITY:
case RUN_ACTIVITY:
case SIGNAL_EXTERNAL_WORKFLOW:
case SIGNAL_WORKFLOW:
case UPDATE_WORKFLOW:
case QUERY_WORKFLOW:
case HANDLE_SIGNAL:
case HANDLE_UPDATE:
String runId = context.getRunId();
Preconditions.checkNotNull(
runId, "runId is expected to be not null for span operation type %s", operationType);
return ImmutableMap.of(
StandardTagNames.WORKFLOW_ID, context.getWorkflowId(),
StandardTagNames.RUN_ID, context.getRunId());
case HANDLE_QUERY:
return ImmutableMap.of();
}
throw new IllegalArgumentException("Unknown span operation type provided");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,27 @@ public WorkflowStartOutput start(WorkflowStartInput input) {
}
}

@Override
public WorkflowSignalOutput signal(WorkflowSignalInput input) {
Span workflowSignalSpan =
contextAccessor.writeSpanContextToHeader(
() ->
spanFactory
.createWorkflowSignalSpan(
tracer,
input.getSignalName(),
input.getWorkflowExecution().getWorkflowId(),
input.getWorkflowExecution().getRunId())
.start(),
input.getHeader(),
tracer);
try (Scope ignored = tracer.scopeManager().activate(workflowSignalSpan)) {
return super.signal(input);
} finally {
workflowSignalSpan.finish();
}
}

@Override
public WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInput input) {
WorkflowStartInput workflowStartInput = input.getWorkflowStartInput();
Expand All @@ -76,6 +97,48 @@ public WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInpu
}
}

@Override
public <R> QueryOutput<R> query(QueryInput<R> input) {
Span workflowQuerySpan =
contextAccessor.writeSpanContextToHeader(
() ->
spanFactory
.createWorkflowQuerySpan(
tracer,
input.getQueryType(),
input.getWorkflowExecution().getWorkflowId(),
input.getWorkflowExecution().getRunId())
.start(),
input.getHeader(),
tracer);
try (Scope ignored = tracer.scopeManager().activate(workflowQuerySpan)) {
return super.query(input);
} finally {
workflowQuerySpan.finish();
}
}

@Override
public <R> StartUpdateOutput<R> startUpdate(StartUpdateInput<R> input) {
Span workflowStartUpdateSpan =
contextAccessor.writeSpanContextToHeader(
() ->
spanFactory
.createWorkflowStartUpdateSpan(
tracer,
input.getUpdateName(),
input.getWorkflowExecution().getWorkflowId(),
input.getWorkflowExecution().getRunId())
.start(),
input.getHeader(),
tracer);
try (Scope ignored = tracer.scopeManager().activate(workflowStartUpdateSpan)) {
return super.startUpdate(input);
} finally {
workflowStartUpdateSpan.finish();
}
}

private Tracer.SpanBuilder createWorkflowStartSpanBuilder(
WorkflowStartInput input, SpanOperationType operationType) {
return spanFactory.createWorkflowStartSpan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public WorkflowOutput execute(WorkflowInput input) {
Workflow.getInfo().getRunId(),
rootSpanContext)
.start();
try (Scope scope = tracer.scopeManager().activate(workflowRunSpan)) {
try (Scope ignored = tracer.scopeManager().activate(workflowRunSpan)) {
return super.execute(input);
} catch (Throwable t) {
if (t instanceof DestroyWorkflowThreadError) {
Expand All @@ -82,4 +82,83 @@ public WorkflowOutput execute(WorkflowInput input) {
workflowRunSpan.finish();
}
}

@Override
public void handleSignal(SignalInput input) {
Tracer tracer = options.getTracer();
SpanContext rootSpanContext =
contextAccessor.readSpanContextFromHeader(input.getHeader(), tracer);
Span workflowSignalSpan =
spanFactory
.createWorkflowHandleSignalSpan(
tracer,
input.getSignalName(),
Workflow.getInfo().getWorkflowId(),
Workflow.getInfo().getRunId(),
rootSpanContext)
.start();
try (Scope ignored = tracer.scopeManager().activate(workflowSignalSpan)) {
super.handleSignal(input);
} catch (Throwable t) {
if (t instanceof DestroyWorkflowThreadError) {
spanFactory.logEviction(workflowSignalSpan);
} else {
spanFactory.logFail(workflowSignalSpan, t);
}
throw t;
} finally {
workflowSignalSpan.finish();
}
}

@Override
public QueryOutput handleQuery(QueryInput input) {
Tracer tracer = options.getTracer();
SpanContext rootSpanContext =
contextAccessor.readSpanContextFromHeader(input.getHeader(), tracer);
Span workflowQuerySpan =
spanFactory
.createWorkflowHandleQuerySpan(tracer, input.getQueryName(), rootSpanContext)
.start();
try (Scope ignored = tracer.scopeManager().activate(workflowQuerySpan)) {
return super.handleQuery(input);
} catch (Throwable t) {
if (t instanceof DestroyWorkflowThreadError) {
spanFactory.logEviction(workflowQuerySpan);
} else {
spanFactory.logFail(workflowQuerySpan, t);
}
throw t;
} finally {
workflowQuerySpan.finish();
}
}

@Override
public UpdateOutput executeUpdate(UpdateInput input) {
Tracer tracer = options.getTracer();
SpanContext rootSpanContext =
contextAccessor.readSpanContextFromHeader(input.getHeader(), tracer);
Span workflowSignalSpan =
spanFactory
.createWorkflowExecuteUpdateSpan(
tracer,
input.getUpdateName(),
Workflow.getInfo().getWorkflowId(),
Workflow.getInfo().getRunId(),
rootSpanContext)
.start();
try (Scope ignored = tracer.scopeManager().activate(workflowSignalSpan)) {
return super.executeUpdate(input);
} catch (Throwable t) {
if (t instanceof DestroyWorkflowThreadError) {
spanFactory.logEviction(workflowSignalSpan);
} else {
spanFactory.logFail(workflowSignalSpan, t);
}
throw t;
} finally {
workflowSignalSpan.finish();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,32 @@ public <R> ChildWorkflowOutput<R> executeChildWorkflow(ChildWorkflowInput<R> inp
}
}

@Override
public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) {
if (!WorkflowUnsafe.isReplaying()) {
WorkflowInfo workflowInfo = Workflow.getInfo();
Span childWorkflowStartSpan =
contextAccessor.writeSpanContextToHeader(
() ->
spanFactory
.createExternalWorkflowSignalSpan(
tracer,
input.getSignalName(),
workflowInfo.getWorkflowId(),
workflowInfo.getRunId())
.start(),
input.getHeader(),
tracer);
try (Scope ignored = tracer.scopeManager().activate(childWorkflowStartSpan)) {
return super.signalExternalWorkflow(input);
} finally {
childWorkflowStartSpan.finish();
}
} else {
return super.signalExternalWorkflow(input);
}
}

@Override
public void continueAsNew(ContinueAsNewInput input) {
if (!WorkflowUnsafe.isReplaying()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,46 @@ public Tracer.SpanBuilder createChildWorkflowStartSpan(
return createSpan(context, tracer, null, References.CHILD_OF);
}

public Tracer.SpanBuilder createExternalWorkflowSignalSpan(
Tracer tracer, String signalName, String workflowId, String runId) {
SpanCreationContext context =
SpanCreationContext.newBuilder()
.setSpanOperationType(SpanOperationType.SIGNAL_EXTERNAL_WORKFLOW)
.setActionName(signalName)
.setWorkflowId(workflowId)
.setRunId(runId)
.build();
return createSpan(context, tracer, null, References.FOLLOWS_FROM);
}

public Tracer.SpanBuilder createWorkflowSignalSpan(
Tracer tracer, String signalName, String workflowId, String runId) {
SpanCreationContext context =
SpanCreationContext.newBuilder()
.setSpanOperationType(SpanOperationType.SIGNAL_WORKFLOW)
.setActionName(signalName)
.setWorkflowId(workflowId)
.setRunId(runId)
.build();
return createSpan(context, tracer, null, References.FOLLOWS_FROM);
}

public Tracer.SpanBuilder createWorkflowHandleSignalSpan(
Tracer tracer,
String signalName,
String workflowId,
String runId,
SpanContext workflowSignalSpanContext) {
SpanCreationContext context =
SpanCreationContext.newBuilder()
.setSpanOperationType(SpanOperationType.HANDLE_SIGNAL)
.setActionName(signalName)
.setWorkflowId(workflowId)
.setRunId(runId)
.build();
return createSpan(context, tracer, workflowSignalSpanContext, References.FOLLOWS_FROM);
}

public Tracer.SpanBuilder createContinueAsNewWorkflowStartSpan(
Tracer tracer, String continueAsNewWorkflowType, String workflowId, String parentRunId) {
SpanCreationContext context =
Expand Down Expand Up @@ -133,6 +173,56 @@ public Tracer.SpanBuilder createActivityRunSpan(
return createSpan(context, tracer, activityStartSpanContext, References.FOLLOWS_FROM);
}

public Tracer.SpanBuilder createWorkflowStartUpdateSpan(
Tracer tracer, String updateName, String workflowId, String runId) {
SpanCreationContext context =
SpanCreationContext.newBuilder()
.setSpanOperationType(SpanOperationType.UPDATE_WORKFLOW)
.setActionName(updateName)
.setWorkflowId(workflowId)
.setRunId(runId)
.build();
return createSpan(context, tracer, null, References.FOLLOWS_FROM);
}

public Tracer.SpanBuilder createWorkflowExecuteUpdateSpan(
Tracer tracer,
String updateName,
String workflowId,
String runId,
SpanContext workflowUpdateSpanContext) {
SpanCreationContext context =
SpanCreationContext.newBuilder()
.setSpanOperationType(SpanOperationType.HANDLE_UPDATE)
.setActionName(updateName)
.setWorkflowId(workflowId)
.setRunId(runId)
.build();
return createSpan(context, tracer, workflowUpdateSpanContext, References.FOLLOWS_FROM);
}

public Tracer.SpanBuilder createWorkflowQuerySpan(
Tracer tracer, String updateName, String workflowId, String runId) {
SpanCreationContext context =
SpanCreationContext.newBuilder()
.setSpanOperationType(SpanOperationType.QUERY_WORKFLOW)
.setActionName(updateName)
.setWorkflowId(workflowId)
.setRunId(runId)
.build();
return createSpan(context, tracer, null, References.FOLLOWS_FROM);
}

public Tracer.SpanBuilder createWorkflowHandleQuerySpan(
Tracer tracer, String queryName, SpanContext workflowQuerySpanContext) {
SpanCreationContext context =
SpanCreationContext.newBuilder()
.setSpanOperationType(SpanOperationType.HANDLE_QUERY)
.setActionName(queryName)
.build();
return createSpan(context, tracer, workflowQuerySpanContext, References.FOLLOWS_FROM);
}

@SuppressWarnings("deprecation")
public void logFail(Span toSpan, Throwable failReason) {
toSpan.setTag(StandardTagNames.FAILED, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,24 @@ public void signalWithStart() {
assertEquals(clientSpan.context().spanId(), workflowStartSpan.parentId());
assertEquals("SignalWithStartWorkflow:TestWorkflow", workflowStartSpan.operationName());

List<MockSpan> workflowRunSpans = spansHelper.getByParentSpan(workflowStartSpan);
assertEquals(1, workflowRunSpans.size());

MockSpan workflowRunSpan = workflowRunSpans.get(0);
assertEquals(workflowStartSpan.context().spanId(), workflowRunSpan.parentId());
assertEquals("RunWorkflow:TestWorkflow", workflowRunSpan.operationName());
if (SDKTestWorkflowRule.useExternalService) {
List<MockSpan> workflowSpans = spansHelper.getByParentSpan(workflowStartSpan);
assertEquals(2, workflowSpans.size());

MockSpan workflowSignalSpan = workflowSpans.get(0);
assertEquals(workflowStartSpan.context().spanId(), workflowSignalSpan.parentId());
assertEquals("HandleSignal:signal", workflowSignalSpan.operationName());

MockSpan workflowRunSpan = workflowSpans.get(1);
assertEquals(workflowStartSpan.context().spanId(), workflowRunSpan.parentId());
assertEquals("RunWorkflow:TestWorkflow", workflowRunSpan.operationName());
} else {
List<MockSpan> workflowRunSpans = spansHelper.getByParentSpan(workflowStartSpan);
assertEquals(1, workflowRunSpans.size());

MockSpan workflowRunSpan = workflowRunSpans.get(0);
assertEquals(workflowStartSpan.context().spanId(), workflowRunSpan.parentId());
assertEquals("RunWorkflow:TestWorkflow", workflowRunSpan.operationName());
}
}
}

0 comments on commit d50d274

Please sign in to comment.