Skip to content

Commit

Permalink
Fix TagsCache concurrency bug
Browse files Browse the repository at this point in the history
Fixes vert-x3#185

Use flyweight and cache only when running on standard context in event loop.
Otherwise, the cache might return wrong tags (e.g. when invoked inside executeBlocking on standard context).

Signed-off-by: Thomas Segismont <tsegismont@gmail.com>
  • Loading branch information
tsegismont committed Jul 19, 2023
1 parent 2b839de commit cc0abdd
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 16 deletions.
27 changes: 11 additions & 16 deletions src/main/java/io/vertx/micrometer/impl/meters/TagsCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,31 @@
import io.micrometer.core.instrument.ImmutableTag;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import io.vertx.micrometer.Label;
import io.vertx.micrometer.impl.Labels;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.*;
import java.util.stream.StreamSupport;

import static java.util.stream.Collectors.*;
import static java.util.stream.Collectors.toList;

/**
* Cache {@link Tags} in Vert.x contexts.
* Each Vert.x context gets its own cache to avoid concurrency complications.
* In case this cache is called on a non-Vert.x thread, a new instance of {@link Tags} is returned.
* Cache {@link Tags} in Vert.x EL contexts.
* Each Vert.x EL context gets its own cache to avoid concurrency complications.
* In case this cache is called on a non-Vert.x thread or a worker thread, a new instance of {@link Tags} is returned.
*/
public class TagsCache {

// To avoid creating cache keys on each lookup, a flyweight is stored in the Vert.x context
// It is possible to have this flyweight because the caller is either on an event loop context or a worker context
// Never two workers will request a Tags instance on the same context concurrently
// It is possible to have this flyweight because the caller is on an event loop thread
private static final Object TAGS_DATA_FLYWEIGHT = new Object();
// Used to store the per-context Tags cache
private static final Object CACHE = new Object();

public static Tags getOrCreate(Iterable<Tag> customTags, Label[] keys, String[] values) {
ContextInternal context = (ContextInternal) Vertx.currentContext();
if (context == null) {
ContextInternal context = ContextInternal.current();
if (context == null || context.isWorkerContext() || !context.inThread()) {
return createTags(customTags, keys, values);
}
Map<TagsData, Tags> cache = cache(context);
Expand All @@ -75,7 +69,8 @@ private static TagsData flyweightKey(ContextInternal context, Iterable<Tag> cust
return tagsData;
}

private static Tags createTags(Iterable<Tag> customTags, Label[] keys, String[] values) {
// Visible for testing
static Tags createTags(Iterable<Tag> customTags, Label[] keys, String[] values) {
return Labels.toTags(keys, values).and(customTags);
}

Expand Down Expand Up @@ -179,7 +174,7 @@ public int hashCode() {
@Override
public String toString() {
return "TagsData{" +
"customTags=" + StreamSupport.stream(customTags.spliterator(), false).collect(toList()) +
"customTags=" + (customTags == null ? null : StreamSupport.stream(customTags.spliterator(), false).collect(toList())) +
", keys=" + Arrays.toString(keys) +
", values=" + Arrays.toString(values) +
'}';
Expand Down
78 changes: 78 additions & 0 deletions src/test/java/io/vertx/micrometer/impl/meters/TagsCacheTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package io.vertx.micrometer.impl.meters;

import io.micrometer.core.instrument.Tags;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.micrometer.Label;
import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;

import java.util.concurrent.ThreadLocalRandom;

@RunWith(VertxUnitRunner.class)
public class TagsCacheTest {

private Vertx vertx = Vertx.vertx();

@After
public void tearDown(TestContext tc) throws Exception {
vertx.close().onComplete(tc.asyncAssertSuccess());
}

@Test
public void testCallsFromELThreadOnELContext(TestContext tc) {
Promise<Void> promise = Promise.promise();
Label[] labels = Label.values();
vertx.runOnContext(v -> {
Label label = labels[ThreadLocalRandom.current().nextInt(0, labels.length)];
String value = String.valueOf(label.ordinal());
Tags expectedTags = TagsCache.createTags(null, new Label[]{label}, new String[]{value});
Tags first = TagsCache.getOrCreate(null, new Label[]{label}, new String[]{value});
tc.assertEquals(expectedTags, first);
Tags second = TagsCache.getOrCreate(null, new Label[]{label}, new String[]{value});
tc.assertTrue(first == second);
promise.complete();
});
promise.future().onComplete(tc.asyncAssertSuccess());
}

@Test
public void testConcurrentCallsFromWorkerThreadsOnELContext(TestContext tc) {
Async async = tc.async(10000);
vertx.deployVerticle(new MyAbstractVerticle(tc, async)).onComplete(tc.asyncAssertSuccess());
async.awaitSuccess();
}

private static class MyAbstractVerticle extends AbstractVerticle {

final TestContext tc;
final Async async;
final int count;

MyAbstractVerticle(TestContext tc, Async async) {
this.tc = tc;
this.async = async;
count = async.count();
}

@Override
public void start() throws Exception {
Label[] labels = Label.values();
for (int i = 0; i < count; i++) {
vertx.executeBlocking(() -> {
Label label = labels[ThreadLocalRandom.current().nextInt(0, labels.length)];
String value = String.valueOf(label.ordinal());
Tags expectedTags = TagsCache.createTags(null, new Label[]{label}, new String[]{value});
Tags actual = TagsCache.getOrCreate(null, new Label[]{label}, new String[]{value});
tc.assertEquals(expectedTags, actual);
return null;
}, false).onSuccess(v -> async.countDown());
}
}
}
}

0 comments on commit cc0abdd

Please sign in to comment.