Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restructure vertx-cache to allow multiple applications with launched by multiple users #15541

Merged
merged 3 commits into from Mar 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -13,6 +13,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -51,6 +52,7 @@
public class VertxCoreRecorder {

private static final Logger LOGGER = Logger.getLogger(VertxCoreRecorder.class.getName());
public static final String VERTX_CACHE = "vertx-cache";

static volatile VertxSupplier vertx;

Expand All @@ -65,7 +67,7 @@ public class VertxCoreRecorder {
public Supplier<Vertx> configureVertx(VertxConfiguration config,
LaunchMode launchMode, ShutdownContext shutdown, List<Consumer<VertxOptions>> customizers) {
if (launchMode != LaunchMode.DEVELOPMENT) {
vertx = new VertxSupplier(config, customizers);
vertx = new VertxSupplier(config, customizers, shutdown);
// we need this to be part of the last shutdown tasks because closing it early (basically before Arc)
// could cause problem to beans that rely on Vert.x and contain shutdown tasks
shutdown.addLastShutdownTask(new Runnable() {
Expand All @@ -76,7 +78,7 @@ public void run() {
});
} else {
if (vertx == null) {
vertx = new VertxSupplier(config, customizers);
vertx = new VertxSupplier(config, customizers, shutdown);
} else if (vertx.v != null) {
tryCleanTccl(vertx.v);
}
Expand Down Expand Up @@ -172,12 +174,12 @@ public static Supplier<Vertx> getVertx() {
return vertx;
}

public static Vertx initialize(VertxConfiguration conf, VertxOptionsCustomizer customizer) {
public static Vertx initialize(VertxConfiguration conf, VertxOptionsCustomizer customizer, ShutdownContext shutdown) {

VertxOptions options = new VertxOptions();

if (conf != null) {
convertToVertxOptions(conf, options, true);
convertToVertxOptions(conf, options, true, shutdown);
}

// Allow extension customizers to do their thing
Expand All @@ -188,11 +190,14 @@ public static Vertx initialize(VertxConfiguration conf, VertxOptionsCustomizer c
Vertx vertx;
if (options.getEventBusOptions().isClustered()) {
CompletableFuture<Vertx> latch = new CompletableFuture<>();
Vertx.clusteredVertx(options, ar -> {
if (ar.failed()) {
latch.completeExceptionally(ar.cause());
} else {
latch.complete(ar.result());
Vertx.clusteredVertx(options, new Handler<AsyncResult<Vertx>>() {
@Override
public void handle(AsyncResult<Vertx> ar) {
if (ar.failed()) {
latch.completeExceptionally(ar.cause());
} else {
latch.complete(ar.result());
}
}
});
vertx = latch.join();
Expand All @@ -213,7 +218,8 @@ private static Vertx logVertxInitialization(Vertx vertx) {
return vertx;
}

private static VertxOptions convertToVertxOptions(VertxConfiguration conf, VertxOptions options, boolean allowClustering) {
private static VertxOptions convertToVertxOptions(VertxConfiguration conf, VertxOptions options, boolean allowClustering,
ShutdownContext shutdown) {

if (!conf.useAsyncDNS) {
System.setProperty(ResolverProvider.DISABLE_DNS_RESOLVER_PROP_NAME, "true");
Expand All @@ -227,8 +233,34 @@ private static VertxOptions convertToVertxOptions(VertxConfiguration conf, Vertx
initializeClusterOptions(conf, options);
}

String fileCacheDir = System.getProperty(CACHE_DIR_BASE_PROP_NAME,
System.getProperty("java.io.tmpdir", ".") + File.separator + "vertx-cache");
String fileCacheDir = System.getProperty(CACHE_DIR_BASE_PROP_NAME);
if (fileCacheDir == null) {
File tmp = new File(System.getProperty("java.io.tmpdir", ".") + File.separator + VERTX_CACHE);
if (!tmp.isDirectory()) {
if (!tmp.mkdirs()) {
LOGGER.warnf("Unable to create Vert.x cache directory : %s", tmp.getAbsolutePath());
}
if (!(tmp.setReadable(true, false) && tmp.setWritable(true, false))) {
LOGGER.warnf("Unable to make the Vert.x cache directory (%s) world readable and writable",
tmp.getAbsolutePath());
}
}

File cache = getRandomDirectory(tmp);
LOGGER.debugf("Vert.x Cache configured to: %s", cache.getAbsolutePath());
fileCacheDir = cache.getAbsolutePath();
if (shutdown != null) {
shutdown.addLastShutdownTask(new Runnable() {
@Override
public void run() {
// Recursively delete the created directory and all the files
deleteDirectory(cache);
// We do not delete the vertx-cache directory on purpose, as it could be used concurrently by
// another application. In the worse case, it's just an empty directory.
}
});
}
}

options.setFileSystemOptions(new FileSystemOptions()
.setFileCachingEnabled(conf.caching)
Expand Down Expand Up @@ -258,6 +290,16 @@ private static VertxOptions convertToVertxOptions(VertxConfiguration conf, Vertx
return options;
}

private static File getRandomDirectory(File tmp) {
long random = Math.abs(UUID.randomUUID().getMostSignificantBits());
File cache = new File(tmp, Long.toString(random));
if (cache.isDirectory()) {
// Do not reuse an existing directory.
return getRandomDirectory(tmp);
}
return cache;
}

private static int calculateDefaultIOThreads() {
//we only allow one event loop per 10mb of ram at the most
//its hard to say what this number should be, but it is also obvious
Expand All @@ -277,11 +319,14 @@ void destroy() {
FastThreadLocal.destroy();
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Throwable> problem = new AtomicReference<>();
vertx.v.close(ar -> {
if (ar.failed()) {
problem.set(ar.cause());
vertx.v.close(new Handler<AsyncResult<Void>>() {
@Override
public void handle(AsyncResult<Void> ar) {
if (ar.failed()) {
problem.set(ar.cause());
}
latch.countDown();
}
latch.countDown();
});
try {
latch.await();
Expand Down Expand Up @@ -393,24 +438,27 @@ public Integer get() {
}

public static Supplier<Vertx> recoverFailedStart(VertxConfiguration config) {
return vertx = new VertxSupplier(config, Collections.emptyList());
return vertx = new VertxSupplier(config, Collections.emptyList(), null);

}

static class VertxSupplier implements Supplier<Vertx> {
final VertxConfiguration config;
final VertxOptionsCustomizer customizer;
final ShutdownContext shutdown;
Vertx v;

VertxSupplier(VertxConfiguration config, List<Consumer<VertxOptions>> customizers) {
VertxSupplier(VertxConfiguration config, List<Consumer<VertxOptions>> customizers,
ShutdownContext shutdown) {
this.config = config;
this.customizer = new VertxOptionsCustomizer(customizers);
this.shutdown = shutdown;
}

@Override
public synchronized Vertx get() {
if (v == null) {
v = initialize(config, customizer);
v = initialize(config, customizer, shutdown);
}
return v;
}
Expand All @@ -434,4 +482,14 @@ VertxOptions customize(VertxOptions options) {
public static void setWebDeploymentId(String webDeploymentId) {
VertxCoreRecorder.webDeploymentId = webDeploymentId;
}

private static void deleteDirectory(File directory) {
File[] children = directory.listFiles();
if (children != null) {
for (File child : children) {
deleteDirectory(child);
}
}
directory.delete();
}
}
Expand Up @@ -55,7 +55,7 @@ public void shouldEnableClustering() {
configuration.cluster = cc;

try {
VertxCoreRecorder.initialize(configuration, null);
VertxCoreRecorder.initialize(configuration, null, null);
Assertions.fail("It should not have a cluster manager on the classpath, and so fail the creation");
} catch (IllegalStateException e) {
Assertions.assertTrue(e.getMessage().contains("No ClusterManagerFactory"),
Expand All @@ -82,7 +82,7 @@ public void accept(VertxOptions vertxOptions) {
}
}));

VertxCoreRecorder.initialize(configuration, customizers);
VertxCoreRecorder.initialize(configuration, customizers, null);
}

@Test
Expand All @@ -95,7 +95,7 @@ public void accept(VertxOptions vertxOptions) {
called.set(true);
}
}));
Vertx v = VertxCoreRecorder.initialize(createDefaultConfiguration(), customizers);
Vertx v = VertxCoreRecorder.initialize(createDefaultConfiguration(), customizers, null);
Assertions.assertTrue(called.get(), "Customizer should get called during initialization");
}

Expand Down
Expand Up @@ -28,7 +28,7 @@ public void tearDown() {

@Test
public void shouldNotFailWithoutConfig() {
verifyProducer(VertxCoreRecorder.initialize(null, null));
verifyProducer(VertxCoreRecorder.initialize(null, null, null));
}

private void verifyProducer(Vertx v) {
Expand Down