Skip to content

Commit

Permalink
change the implementation to be thread-safe and use a daemon thread f…
Browse files Browse the repository at this point in the history
…or the reporter
  • Loading branch information
jkwatson committed Feb 22, 2021
1 parent c0d0831 commit d9f20ec
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@

import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.api.config.Config;
import java.util.EnumMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -22,8 +21,7 @@ public class SupportabilityMetrics {
private final boolean agentDebugEnabled;
private final Consumer<String> reporter;

private final ConcurrentMap<String, EnumMap<SpanKind, AtomicInteger>> suppressionCounters =
new ConcurrentHashMap<>();
private final ConcurrentMap<String, KindCounters> suppressionCounters = new ConcurrentHashMap<>();

public SupportabilityMetrics(Config config) {
this(config, log::debug);
Expand All @@ -39,36 +37,87 @@ public void recordSuppressedSpan(SpanKind kind, String instrumentationName) {
if (!agentDebugEnabled) {
return;
}
// note: there's definitely a race here, but since this is just debug information, I think
// we can live with the possibility that we might lose a count or two.
EnumMap<SpanKind, AtomicInteger> countersByKind =
suppressionCounters.computeIfAbsent(
instrumentationName, s -> new EnumMap<>(SpanKind.class));

countersByKind.computeIfAbsent(kind, k -> new AtomicInteger()).incrementAndGet();
suppressionCounters
.computeIfAbsent(instrumentationName, s -> new KindCounters())
.increment(kind);
}

// visible for testing
void report() {
suppressionCounters.forEach(
(instrumentationName, countsByKind) -> {
countsByKind.forEach(
(spanKind, counter) -> {
reporter.accept(
"Suppressed Spans by '"
+ instrumentationName
+ "' ("
+ spanKind
+ ") : "
+ counter.getAndUpdate(operand -> 0));
});
for (SpanKind kind : SpanKind.values()) {
long value = countsByKind.getAndReset(kind);
if (value > 0) {
reporter.accept(
"Suppressed Spans by '" + instrumentationName + "' (" + kind + ") : " + value);
}
}
});
}

public SupportabilityMetrics start() {
if (agentDebugEnabled) {
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(this::report, 5, 5, TimeUnit.SECONDS);
Executors.newScheduledThreadPool(
1,
r -> {
Thread result = new Thread("supportability_metrics_reporter");
result.setDaemon(true);
return result;
})
.scheduleAtFixedRate(this::report, 5, 5, TimeUnit.SECONDS);
}
return this;
}

// this class is threadsafe.
private static class KindCounters {
private final LongAdder server = new LongAdder();
private final LongAdder client = new LongAdder();
private final LongAdder internal = new LongAdder();
private final LongAdder consumer = new LongAdder();
private final LongAdder producer = new LongAdder();

void increment(SpanKind kind) {
switch (kind) {
case INTERNAL:
internal.increment();
break;
case SERVER:
server.increment();
break;
case CLIENT:
client.increment();
break;
case PRODUCER:
producer.increment();
break;
case CONSUMER:
consumer.increment();
break;
default:
// in case a new kind gets added, we don't want to fail.
break;
}
}

long getAndReset(SpanKind kind) {
switch (kind) {
case INTERNAL:
return internal.sumThenReset();
case SERVER:
return server.sumThenReset();
case CLIENT:
return client.sumThenReset();
case PRODUCER:
return producer.sumThenReset();
case CONSUMER:
return consumer.sumThenReset();
default:
// in case a new kind gets added, we don't want to fail.
return 0;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package io.opentelemetry.instrumentation.api.tracer.utils;

import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;

import io.opentelemetry.api.trace.SpanKind;
Expand Down Expand Up @@ -71,10 +72,8 @@ void resetsCountsEachReport() {

assertThat(reports)
.isNotEmpty()
.hasSize(2)
.hasSize(1)
.hasSameElementsAs(
Arrays.asList(
"Suppressed Spans by 'favoriteInstrumentation' (CLIENT) : 1",
"Suppressed Spans by 'favoriteInstrumentation' (CLIENT) : 0"));
singletonList("Suppressed Spans by 'favoriteInstrumentation' (CLIENT) : 1"));
}
}

0 comments on commit d9f20ec

Please sign in to comment.