Skip to content

Commit

Permalink
Add scan hook and processing
Browse files Browse the repository at this point in the history
Fixes #51 and #42

Signed-off-by: Clement Escoffier <clement.escoffier@gmail.com>
  • Loading branch information
cescoffier committed Aug 6, 2018
1 parent 7c47b42 commit a6dcc91
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 1 deletion.
9 changes: 9 additions & 0 deletions vertx-config/src/main/asciidoc/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,15 @@ set of handlers you are notified:
{@link examples.ConfigExamples#stream}
----
== Processing the configuration
You can configure a _processor_ that can validate and update the configuration. This is done using the
{@link io.vertx.config.ConfigRetriever#setConfigurationProcessor(java.util.function.Function)} method.
The prcessing must not return `null`. It takes the retrieved configuration and returns the processed one. If the processor
does not update the configuration, it must return the input configuration. If the processor can throw an exception (for
example for validation purpose).
== Retrieving the configuration as a Future
The {@link io.vertx.config.ConfigRetriever} provide a way to retrieve the configuration as a
Expand Down
23 changes: 23 additions & 0 deletions vertx-config/src/main/java/io/vertx/config/ConfigRetriever.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package io.vertx.config;

import io.vertx.codegen.annotations.CacheReturn;
import io.vertx.codegen.annotations.Fluent;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.config.impl.ConfigRetrieverImpl;
import io.vertx.config.spi.ConfigStore;
Expand All @@ -28,6 +29,8 @@
import io.vertx.core.json.JsonObject;
import io.vertx.core.streams.ReadStream;

import java.util.function.Function;

/**
* Defines a configuration retriever that read configuration from
* {@link ConfigStore}
Expand Down Expand Up @@ -105,6 +108,26 @@ static Future<JsonObject> getConfigAsFuture(ConfigRetriever retriever) {
*/
void listen(Handler<ConfigChange> listener);

/**
* Registers a handler called before every scan. This method is mostly used for logging purpose.
* @param function the function, must not be {@code null}
* @return the current config retriever
*/
@Fluent
ConfigRetriever setBeforeScanHandler(Handler<Void> function);

/**
* Registers a handler that process the configuration before being injected into {@link #getConfig(Handler)} or {@link #listen(Handler)}. This allows
* the code to customize the configuration.
*
* @param processor the processor, must not be {@code null}. The method must not return {@code null}. The returned configuration is used. If the processor
* does not update the configuration, it must return the input configuration. If the processor throws an exception, the failure is passed
* to the {@link #getConfig(Handler)} handler.
* @return the current config retriever
*/
@Fluent
ConfigRetriever setConfigurationProcessor(Function<JsonObject, JsonObject> processor);

/**
* @return the stream of configurations. It's single stream (unicast) and that delivers the last known config
* and the successors periodically.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import java.io.File;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
Expand All @@ -55,6 +56,9 @@ public class ConfigRetrieverImpl implements ConfigRetriever {

private JsonObject current = new JsonObject();

private Handler<Void> beforeScan;
private Function<JsonObject, JsonObject> processor;

public ConfigRetrieverImpl(Vertx vertx, ConfigRetrieverOptions options) {
this.vertx = vertx;
this.options = options;
Expand Down Expand Up @@ -202,12 +206,27 @@ public void listen(Handler<ConfigChange> listener) {
listeners.add(listener);
}

@Override
public ConfigRetriever setBeforeScanHandler(Handler<Void> function) {
this.beforeScan = Objects.requireNonNull(function, "The function must not be `null`");
return this;
}

@Override
public ConfigRetriever setConfigurationProcessor(Function<JsonObject, JsonObject> processor) {
this.processor = Objects.requireNonNull(processor, "The processor must not be `null`");
return this;
}

@Override
public ReadStream<JsonObject> configStream() {
return streamOfConfiguration;
}

private void scan() {
if (beforeScan != null) {
beforeScan.handle(null);
}
compute(ar -> {
if (ar.failed()) {
streamOfConfiguration.fail(ar.cause());
Expand All @@ -218,6 +237,7 @@ private void scan() {
if (!current.equals(ar.result())) {
JsonObject prev = current;
current = ar.result();

listeners.forEach(l -> l.handle(new ConfigChange(prev, current)));
try {
streamOfConfiguration.handle(current);
Expand Down Expand Up @@ -267,7 +287,12 @@ private void compute(Handler<AsyncResult<JsonObject>> completionHandler) {
JsonObject json = new JsonObject();
futures.forEach(future -> json.mergeIn((JsonObject) future.result(), true));
try {
completionHandler.handle(Future.succeededFuture(json));
JsonObject computed = json;
if (processor != null) {
processConfigurationAndReport(completionHandler, json);
} else {
completionHandler.handle(Future.succeededFuture(computed));
}
} catch (Throwable e) {
// Report the error on the context exception handler.
if (vertx.exceptionHandler() != null) {
Expand All @@ -280,6 +305,16 @@ private void compute(Handler<AsyncResult<JsonObject>> completionHandler) {
});
}

private void processConfigurationAndReport(Handler<AsyncResult<JsonObject>> completionHandler, JsonObject json) {
JsonObject computed;
try {
computed = processor.apply(json);
completionHandler.handle(Future.succeededFuture(computed));
} catch (Throwable e) {
completionHandler.handle(Future.failedFuture(e));
}
}

/**
* @return the list of providers. For introspection purpose.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,41 @@ public void testLoading(TestContext tc) {
});
}

@Test
public void testLoadingWithProcessor(TestContext tc) {
retriever = ConfigRetriever.create(vertx,
addStores(new ConfigRetrieverOptions()))
.setConfigurationProcessor(json -> {
if (json.containsKey("foo")) {
json.put("foo", json.getString("foo").toUpperCase());
}
return json;
});
Async async = tc.async();

retriever.getConfig(ar -> {
ConfigChecker.check(ar);
assertThat(ar.result().getString("foo")).isEqualToIgnoringCase("BAR");
ConfigChecker.check(retriever.getCachedConfig());
async.complete();
});
}

@Test
public void testLoadingWithProcessorFailure(TestContext tc) {
retriever = ConfigRetriever.create(vertx,
addStores(new ConfigRetrieverOptions()))
.setConfigurationProcessor(json -> {
throw new RuntimeException("failed");
});
Async async = tc.async();

retriever.getConfig(ar -> {
tc.assertTrue(ar.failed());
async.complete();
});
}

@Test
public void testLoadingWithFuturePolyglotVersion(TestContext tc) {
retriever = ConfigRetriever.create(vertx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,45 @@ public void testScanning() {
await().untilAtomic(done, is(true));
}

@Test
public void testScanningWithBeforeAndAfterFunctions() {
AtomicBoolean done = new AtomicBoolean();
AtomicInteger before = new AtomicInteger();
vertx.runOnContext(v -> {
retriever = ConfigRetriever.create(vertx,
new ConfigRetrieverOptions().setScanPeriod(1000).setStores(stores()))
.setBeforeScanHandler(x -> before.incrementAndGet())
.setConfigurationProcessor(json -> {
if (json.containsKey("some-key")) {
json.put("some-key", json.getString("some-key").toUpperCase());
}
return json;
});

AtomicReference<JsonObject> current = new AtomicReference<>();
retriever.getConfig(json -> {
retriever.listen(change -> {
if (current.get() != null && !current.get().equals(change.getPreviousConfiguration())) {
throw new IllegalStateException("Previous configuration not correct");
}
current.set(change.getNewConfiguration());
});
current.set(json.result());
});

assertWaitUntil(() -> current.get() != null, x -> {
current.set(null);
http.put("some-key", "some-value");
assertWaitUntil(() -> current.get() != null, x2 -> {
assertThat(current.get().getString("some-key")).isEqualTo("SOME-VALUE");
done.set(true);
});
});
});
await().untilAtomic(done, is(true));
assertThat(before.get()).isGreaterThanOrEqualTo(1);
}

private void assertWaitUntil(Callable<Boolean> condition, Handler<AsyncResult<Void>> next) {
assertWaitUntil(new AtomicInteger(), condition, next);
}
Expand Down

0 comments on commit a6dcc91

Please sign in to comment.