Skip to content

Commit

Permalink
Add APM tracing for Watcher executions
Browse files Browse the repository at this point in the history
Part of elastic#97411. Add tracing to the Watcher execution service so that it
is possible to get a clearer view in the APM UI of how a watch is
executed.
  • Loading branch information
pugnascotia committed Jul 10, 2023
1 parent bb7dd06 commit d0b1367
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public void startTrace(ThreadContext threadContext, SpanId spanId, String spanNa
spanBuilder.setParent(parentContext);
}

setSpanAttributes(threadContext, attributes, spanBuilder);
setSpanAttributes(spanName, threadContext, attributes, spanBuilder);

Instant startTime = threadContext.getTransient(Task.TRACE_START_TIME);
if (startTime != null) {
Expand Down Expand Up @@ -218,7 +218,7 @@ public void startTrace(String name, Map<String, Object> attributes) {
}

SpanBuilder spanBuilder = services.tracer.spanBuilder(name);
setSpanAttributes(attributes, spanBuilder);
setSpanAttributes(name, attributes, spanBuilder);
spanBuilder.startSpan();
}

Expand Down Expand Up @@ -290,7 +290,7 @@ public Releasable withScope(SpanId spanId) {
return () -> {};
}

private void setSpanAttributes(@Nullable Map<String, Object> spanAttributes, SpanBuilder spanBuilder) {
private void setSpanAttributes(String spanName, @Nullable Map<String, Object> spanAttributes, SpanBuilder spanBuilder) {
if (spanAttributes != null) {
for (Map.Entry<String, Object> entry : spanAttributes.entrySet()) {
final String key = entry.getKey();
Expand Down Expand Up @@ -319,7 +319,8 @@ private void setSpanAttributes(@Nullable Map<String, Object> spanAttributes, Spa
}

final boolean isHttpSpan = spanAttributes.keySet().stream().anyMatch(key -> key.startsWith("http."));
spanBuilder.setSpanKind(isHttpSpan ? SpanKind.SERVER : SpanKind.INTERNAL);
final boolean isWatcher = spanName.startsWith("watcher-");
spanBuilder.setSpanKind(isHttpSpan || isWatcher ? SpanKind.SERVER : SpanKind.INTERNAL);
} else {
spanBuilder.setSpanKind(SpanKind.INTERNAL);
}
Expand All @@ -328,8 +329,8 @@ private void setSpanAttributes(@Nullable Map<String, Object> spanAttributes, Spa
spanBuilder.setAttribute(org.elasticsearch.tracing.Tracer.AttributeKeys.CLUSTER_NAME, clusterName);
}

private void setSpanAttributes(ThreadContext threadContext, @Nullable Map<String, Object> spanAttributes, SpanBuilder spanBuilder) {
setSpanAttributes(spanAttributes, spanBuilder);
private void setSpanAttributes(String spanName, ThreadContext threadContext, @Nullable Map<String, Object> spanAttributes, SpanBuilder spanBuilder) {
setSpanAttributes(spanName, spanAttributes, spanBuilder);

final String xOpaqueId = threadContext.getHeader(Task.X_OPAQUE_ID_HTTP_HEADER);
if (xOpaqueId != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@
import static org.elasticsearch.xpack.core.watcher.support.Exceptions.illegalArgument;

/**
* A representation class of a watch id, its execution time and a random UUID
* A representation class of a watch id, its execution time and a random UUID.
* This class exists to be able to store several events from the same possible execution time and the same watch
* in the triggered store index or the history store
*
* in the triggered store index or the history store.
* <p>
* One 'specialty' of this class is the handling of the underscore in the value. Nothing except the watchId should contain an
* underscore, otherwise this class will not be able to extract the proper watch id, when a a single string is handed over in its ctor
*
* This is also the reason why UUID.randomUUID() is used instead of UUIDs.base64UUID(), as the latter one contains underscores. Also this
* is not dependant on having time based uuids here, as the time is already included in the value
* underscore, otherwise this class will not be able to extract the proper watch id, when a single string is handed over in its ctor.
* <p>
* This is also the reason why UUID.randomUUID() is used instead of UUIDs.base64UUID(), as the latter one contains underscores. Also, this
* is not dependent on having time based uuids here, as the time is already included in the value.
*/
public class Wid {

Expand All @@ -32,7 +32,7 @@ public class Wid {

public Wid(String watchId, ZonedDateTime executionTime) {
this.watchId = watchId;
this.value = watchId + "_" + UUID.randomUUID().toString() + "-" + formatter.format(executionTime);
this.value = watchId + "_" + UUID.randomUUID() + "-" + formatter.format(executionTime);
}

public Wid(String value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ public void afterBulk(long executionId, BulkRequest request, Exception failure)
final TriggeredWatchStore triggeredWatchStore = new TriggeredWatchStore(settings, client, triggeredWatchParser, bulkProcessor);

final WatcherSearchTemplateService watcherSearchTemplateService = new WatcherSearchTemplateService(scriptService, xContentRegistry);
final WatchExecutor watchExecutor = getWatchExecutor(threadPool);
final WatchExecutor watchExecutor = getWatchExecutor(threadPool, tracer);
final WatchParser watchParser = new WatchParser(triggerService, registry, inputRegistry, cryptoService, getClock());

final ExecutionService executionService = new ExecutionService(
Expand All @@ -518,7 +518,8 @@ public void afterBulk(long executionId, BulkRequest request, Exception failure)
watchParser,
clusterService,
client,
threadPool.generic()
threadPool.generic(),
tracer
);

final Consumer<Iterable<TriggerEvent>> triggerEngineListener = getTriggerEngineListener(executionService);
Expand Down Expand Up @@ -563,8 +564,8 @@ public void afterBulk(long executionId, BulkRequest request, Exception failure)
return new TickerScheduleTriggerEngine(settings, scheduleRegistry, clock);
}

protected WatchExecutor getWatchExecutor(ThreadPool threadPool) {
return new InternalWatchExecutor(threadPool);
protected WatchExecutor getWatchExecutor(ThreadPool threadPool, Tracer tracer) {
return new InternalWatchExecutor(threadPool, tracer);
}

protected Consumer<Iterable<TriggerEvent>> getTriggerEngineListener(ExecutionService executionService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.tracing.SpanId;
import org.elasticsearch.tracing.Tracer;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
Expand Down Expand Up @@ -108,6 +110,7 @@ public class ExecutionService {
private final Client client;
private final WatchExecutor executor;
private final ExecutorService genericExecutor;
private final Tracer tracer;

private AtomicReference<CurrentExecutions> currentExecutions = new AtomicReference<>();
private final AtomicBoolean paused = new AtomicBoolean(false);
Expand All @@ -121,7 +124,8 @@ public ExecutionService(
WatchParser parser,
ClusterService clusterService,
Client client,
ExecutorService genericExecutor
ExecutorService genericExecutor,
Tracer tracer
) {
this.historyStore = historyStore;
this.triggeredWatchStore = triggeredWatchStore;
Expand All @@ -135,6 +139,7 @@ public ExecutionService(
this.genericExecutor = genericExecutor;
this.indexDefaultTimeout = settings.getAsTime("xpack.watcher.internal.ops.index.default_timeout", TimeValue.timeValueSeconds(30));
this.currentExecutions.set(new CurrentExecutions());
this.tracer = tracer;
}

public void unPause() {
Expand Down Expand Up @@ -427,16 +432,23 @@ private void logWatchRecord(WatchExecutionContext ctx, Exception e) {
}

/*
The execution of an watch is split into two phases:
The execution of a watch is split into two phases:
1. the trigger part which just makes sure to store the associated watch record in the history
2. the actual processing of the watch
The reason this split is that we don't want to lose the fact watch was triggered. This way, even if the
thread pool that executes the watches is completely busy, we don't lose the fact that the watch was
triggered (it'll have its history record)
*/
private void executeAsync(WatchExecutionContext ctx, final TriggeredWatch triggeredWatch) {
final ThreadContext threadContext = clusterService.threadPool().getThreadContext();
this.tracer.startTrace(threadContext, SpanId.forBareString(""), "", Map.of());

try {
executor.execute(new WatchExecutionTask(ctx, () -> execute(ctx)));
executor.startTrace(ctx.id());
executor.execute(new WatchExecutionTask(ctx, () -> {
execute(ctx);
executor.stopTrace(ctx.id());
}));
} catch (EsRejectedExecutionException e) {
// Using the generic pool here since this can happen from a write thread and we don't want to block a write
// thread to kick off these additional write/delete requests.
Expand Down Expand Up @@ -465,6 +477,7 @@ private void executeAsync(WatchExecutionContext ctx, final TriggeredWatch trigge
exc
);
}
executor.stopTrace(ctx.id());
}));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@

import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.tracing.SpanId;
import org.elasticsearch.tracing.Tracer;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.watcher.execution.Wid;

import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.stream.Stream;

Expand All @@ -18,9 +22,11 @@ public class InternalWatchExecutor implements WatchExecutor {
public static final String THREAD_POOL_NAME = XPackField.WATCHER;

private final ThreadPool threadPool;
private final Tracer tracer;

public InternalWatchExecutor(ThreadPool threadPool) {
public InternalWatchExecutor(ThreadPool threadPool, Tracer tracer) {
this.threadPool = threadPool;
this.tracer = tracer;
}

@Override
Expand All @@ -43,6 +49,21 @@ public void execute(Runnable runnable) {
executor().execute(runnable);
}

@Override
public void startTrace(Wid id) {
this.tracer.startTrace(
threadPool.getThreadContext(),
SpanId.forBareString("watcher-" + id),
"Executing watch " + id.watchId(),
Map.of("watchId", id.watchId())
);
}

@Override
public void stopTrace(Wid id) {
this.tracer.stopTrace(SpanId.forBareString("watcher-" + id));
}

private EsThreadPoolExecutor executor() {
return (EsThreadPoolExecutor) threadPool.executor(THREAD_POOL_NAME);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
*/
package org.elasticsearch.xpack.watcher.execution;

import org.elasticsearch.xpack.core.watcher.execution.Wid;

import java.util.concurrent.BlockingQueue;
import java.util.stream.Stream;

Expand All @@ -19,4 +21,7 @@ public interface WatchExecutor {

void execute(Runnable runnable);

void startTrace(Wid id);

void stopTrace(Wid id);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.tracing.Tracer;
import org.elasticsearch.xcontent.ObjectPath;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentParser;
Expand Down Expand Up @@ -169,7 +170,8 @@ public void init() throws Exception {
parser,
clusterService,
client,
EsExecutors.DIRECT_EXECUTOR_SERVICE
EsExecutors.DIRECT_EXECUTOR_SERVICE,
Tracer.NOOP
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.plugins.ReloadablePlugin;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.tracing.Tracer;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.ssl.SSLService;
import org.elasticsearch.xpack.core.watcher.execution.Wid;
import org.elasticsearch.xpack.core.watcher.trigger.TriggerEvent;
import org.elasticsearch.xpack.core.watcher.watch.ClockMock;
import org.elasticsearch.xpack.watcher.Watcher;
Expand Down Expand Up @@ -65,7 +67,7 @@ protected Clock getClock() {
}

@Override
protected WatchExecutor getWatchExecutor(ThreadPool threadPool) {
protected WatchExecutor getWatchExecutor(ThreadPool threadPool, Tracer tracer) {
return new SameThreadExecutor();
}

Expand Down Expand Up @@ -103,5 +105,15 @@ public long largestPoolSize() {
public void execute(Runnable runnable) {
runnable.run();
}

@Override
public void startTrace(Wid id) {
// noop
}

@Override
public void stopTrace(Wid id) {
// noop
}
}
}

0 comments on commit d0b1367

Please sign in to comment.