From cc0abdd4fb941004f8562f8eea9997d9ba427487 Mon Sep 17 00:00:00 2001 From: Thomas Segismont Date: Wed, 19 Jul 2023 18:41:45 +0200 Subject: [PATCH] Fix TagsCache concurrency bug Fixes #185 Use flyweight and cache only when running on standard context in event loop. Otherwise, the cache might return wrong tags (e.g. when invoked inside executeBlocking on standard context). Signed-off-by: Thomas Segismont --- .../micrometer/impl/meters/TagsCache.java | 27 +++---- .../micrometer/impl/meters/TagsCacheTest.java | 78 +++++++++++++++++++ 2 files changed, 89 insertions(+), 16 deletions(-) create mode 100644 src/test/java/io/vertx/micrometer/impl/meters/TagsCacheTest.java diff --git a/src/main/java/io/vertx/micrometer/impl/meters/TagsCache.java b/src/main/java/io/vertx/micrometer/impl/meters/TagsCache.java index 5af04462..6cc0708d 100644 --- a/src/main/java/io/vertx/micrometer/impl/meters/TagsCache.java +++ b/src/main/java/io/vertx/micrometer/impl/meters/TagsCache.java @@ -20,37 +20,31 @@ import io.micrometer.core.instrument.ImmutableTag; import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Tags; -import io.vertx.core.Vertx; import io.vertx.core.impl.ContextInternal; import io.vertx.micrometer.Label; import io.vertx.micrometer.impl.Labels; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.Map; +import java.util.*; import java.util.stream.StreamSupport; -import static java.util.stream.Collectors.*; +import static java.util.stream.Collectors.toList; /** - * Cache {@link Tags} in Vert.x contexts. - * Each Vert.x context gets its own cache to avoid concurrency complications. - * In case this cache is called on a non-Vert.x thread, a new instance of {@link Tags} is returned. + * Cache {@link Tags} in Vert.x EL contexts. + * Each Vert.x EL context gets its own cache to avoid concurrency complications. + * In case this cache is called on a non-Vert.x thread or a worker thread, a new instance of {@link Tags} is returned. */ public class TagsCache { // To avoid creating cache keys on each lookup, a flyweight is stored in the Vert.x context - // It is possible to have this flyweight because the caller is either on an event loop context or a worker context - // Never two workers will request a Tags instance on the same context concurrently + // It is possible to have this flyweight because the caller is on an event loop thread private static final Object TAGS_DATA_FLYWEIGHT = new Object(); // Used to store the per-context Tags cache private static final Object CACHE = new Object(); public static Tags getOrCreate(Iterable customTags, Label[] keys, String[] values) { - ContextInternal context = (ContextInternal) Vertx.currentContext(); - if (context == null) { + ContextInternal context = ContextInternal.current(); + if (context == null || context.isWorkerContext() || !context.inThread()) { return createTags(customTags, keys, values); } Map cache = cache(context); @@ -75,7 +69,8 @@ private static TagsData flyweightKey(ContextInternal context, Iterable cust return tagsData; } - private static Tags createTags(Iterable customTags, Label[] keys, String[] values) { + // Visible for testing + static Tags createTags(Iterable customTags, Label[] keys, String[] values) { return Labels.toTags(keys, values).and(customTags); } @@ -179,7 +174,7 @@ public int hashCode() { @Override public String toString() { return "TagsData{" + - "customTags=" + StreamSupport.stream(customTags.spliterator(), false).collect(toList()) + + "customTags=" + (customTags == null ? null : StreamSupport.stream(customTags.spliterator(), false).collect(toList())) + ", keys=" + Arrays.toString(keys) + ", values=" + Arrays.toString(values) + '}'; diff --git a/src/test/java/io/vertx/micrometer/impl/meters/TagsCacheTest.java b/src/test/java/io/vertx/micrometer/impl/meters/TagsCacheTest.java new file mode 100644 index 00000000..a4c727cf --- /dev/null +++ b/src/test/java/io/vertx/micrometer/impl/meters/TagsCacheTest.java @@ -0,0 +1,78 @@ +package io.vertx.micrometer.impl.meters; + +import io.micrometer.core.instrument.Tags; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import io.vertx.micrometer.Label; +import org.junit.After; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.util.concurrent.ThreadLocalRandom; + +@RunWith(VertxUnitRunner.class) +public class TagsCacheTest { + + private Vertx vertx = Vertx.vertx(); + + @After + public void tearDown(TestContext tc) throws Exception { + vertx.close().onComplete(tc.asyncAssertSuccess()); + } + + @Test + public void testCallsFromELThreadOnELContext(TestContext tc) { + Promise promise = Promise.promise(); + Label[] labels = Label.values(); + vertx.runOnContext(v -> { + Label label = labels[ThreadLocalRandom.current().nextInt(0, labels.length)]; + String value = String.valueOf(label.ordinal()); + Tags expectedTags = TagsCache.createTags(null, new Label[]{label}, new String[]{value}); + Tags first = TagsCache.getOrCreate(null, new Label[]{label}, new String[]{value}); + tc.assertEquals(expectedTags, first); + Tags second = TagsCache.getOrCreate(null, new Label[]{label}, new String[]{value}); + tc.assertTrue(first == second); + promise.complete(); + }); + promise.future().onComplete(tc.asyncAssertSuccess()); + } + + @Test + public void testConcurrentCallsFromWorkerThreadsOnELContext(TestContext tc) { + Async async = tc.async(10000); + vertx.deployVerticle(new MyAbstractVerticle(tc, async)).onComplete(tc.asyncAssertSuccess()); + async.awaitSuccess(); + } + + private static class MyAbstractVerticle extends AbstractVerticle { + + final TestContext tc; + final Async async; + final int count; + + MyAbstractVerticle(TestContext tc, Async async) { + this.tc = tc; + this.async = async; + count = async.count(); + } + + @Override + public void start() throws Exception { + Label[] labels = Label.values(); + for (int i = 0; i < count; i++) { + vertx.executeBlocking(() -> { + Label label = labels[ThreadLocalRandom.current().nextInt(0, labels.length)]; + String value = String.valueOf(label.ordinal()); + Tags expectedTags = TagsCache.createTags(null, new Label[]{label}, new String[]{value}); + Tags actual = TagsCache.getOrCreate(null, new Label[]{label}, new String[]{value}); + tc.assertEquals(expectedTags, actual); + return null; + }, false).onSuccess(v -> async.countDown()); + } + } + } +}