Skip to content

Commit

Permalink
Extract commond logic into static methods
Browse files Browse the repository at this point in the history
Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
  • Loading branch information
pierDipi committed Oct 30, 2020
1 parent 445f82c commit af5e50b
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 32 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package dev.knative.eventing.kafka.broker.core.utils;

import dev.knative.eventing.kafka.broker.contract.DataPlaneContract.Contract;
import io.vertx.core.Vertx;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Shutdown {

private static final Logger logger = LoggerFactory.getLogger(Shutdown.class);

public static Runnable run(final Vertx vertx, final Closeable fw, final Consumer<Contract> publisher)
throws IOException {
return () -> {
try {
fw.close();
} catch (final IOException e) {
logger.error("Failed to close file watcher", e);
}
publisher.accept(Contract.newBuilder().build());
closeSync(vertx).run();
};
}

public static Runnable closeSync(final Vertx vertx) {
return () -> {
final var wait = new CountDownLatch(1);
vertx.close(ignore -> wait.countDown());
try {
wait.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.error("Timeout waiting for vertx close", e);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package dev.knative.eventing.kafka.broker.core.utils;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import dev.knative.eventing.kafka.broker.contract.DataPlaneContract.Contract;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import java.io.Closeable;
import java.io.IOException;
import java.util.function.Consumer;
import org.junit.jupiter.api.Test;

public class ShutdownTest {

@Test
public void run() throws IOException {
final var vertx = mockVertxClose();
final var closeable = mock(Closeable.class);
final Consumer<Contract> consumer = mock(Consumer.class);

Shutdown.run(vertx, closeable, consumer).run();

verify(vertx, times(1)).close(any());
verify(closeable).close();
verify(consumer).accept(Contract.newBuilder().build());
}

@Test
public void closeSync() {
final var vertx = mockVertxClose();

Shutdown.closeSync(vertx).run();

verify(vertx).close(any());
}

private Vertx mockVertxClose() {
final var vertx = mock(Vertx.class);

doAnswer(invocation -> {
final Handler<AsyncResult<Void>> callback = invocation.getArgument(0);
callback.handle(Future.succeededFuture());
return null;
})
.when(vertx).close(any());

return vertx;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@

import static net.logstash.logback.argument.StructuredArguments.keyValue;

import dev.knative.eventing.kafka.broker.contract.DataPlaneContract.Contract;
import dev.knative.eventing.kafka.broker.core.eventbus.ContractMessageCodec;
import dev.knative.eventing.kafka.broker.core.eventbus.ContractPublisher;
import dev.knative.eventing.kafka.broker.core.file.FileWatcher;
import dev.knative.eventing.kafka.broker.core.metrics.MetricsOptionsProvider;
import dev.knative.eventing.kafka.broker.core.reconciler.impl.ResourcesReconcilerMessageHandler;
import dev.knative.eventing.kafka.broker.core.utils.Configurations;
import dev.knative.eventing.kafka.broker.core.utils.Shutdown;
import dev.knative.eventing.kafka.broker.dispatcher.http.HttpConsumerVerticleFactory;
import io.cloudevents.CloudEvent;
import io.vertx.core.Vertx;
Expand Down Expand Up @@ -75,7 +75,7 @@ public static void main(final String[] args) throws IOException, InterruptedExce
final var vertx = Vertx.vertx(
new VertxOptions().setMetricsOptions(MetricsOptionsProvider.get(env, METRICS_REGISTRY_NAME))
);
Runtime.getRuntime().addShutdownHook(new Thread(vertx::close));
Runtime.getRuntime().addShutdownHook(new Thread(Shutdown.closeSync(vertx)));

ContractMessageCodec.register(vertx.eventBus());

Expand Down Expand Up @@ -120,27 +120,15 @@ public static void main(final String[] args) throws IOException, InterruptedExce
var fw = new FileWatcher(fs, publisher, new File(env.getDataPlaneConfigFilePath()));

// Gracefully clean up resources.
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
fw.close();
publisher.accept(Contract.newBuilder().build());
final var waitClose = new CountDownLatch(1);
vertx.close(ignore -> waitClose.countDown());
try {
waitClose.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
Runtime.getRuntime().addShutdownHook(new Thread(Shutdown.run(vertx, fw, publisher)));

try {

fw.watch(); // block forever

} catch (final Exception ex) {
logger.error("Failed during filesystem watch", ex);
final var wait = new CountDownLatch(1);
vertx.close(ignore -> wait.countDown());
wait.await(5, TimeUnit.SECONDS);
Shutdown.closeSync(vertx).run();
throw ex;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@

import static net.logstash.logback.argument.StructuredArguments.keyValue;

import dev.knative.eventing.kafka.broker.contract.DataPlaneContract.Contract;
import dev.knative.eventing.kafka.broker.core.eventbus.ContractMessageCodec;
import dev.knative.eventing.kafka.broker.core.eventbus.ContractPublisher;
import dev.knative.eventing.kafka.broker.core.file.FileWatcher;
import dev.knative.eventing.kafka.broker.core.metrics.MetricsOptionsProvider;
import dev.knative.eventing.kafka.broker.core.reconciler.impl.ResourcesReconcilerMessageHandler;
import dev.knative.eventing.kafka.broker.core.utils.Configurations;
import dev.knative.eventing.kafka.broker.core.utils.Shutdown;
import io.cloudevents.kafka.CloudEventSerializer;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
Expand Down Expand Up @@ -79,7 +79,7 @@ public static void main(final String[] args) throws IOException, InterruptedExce
final var vertx = Vertx.vertx(
new VertxOptions().setMetricsOptions(MetricsOptionsProvider.get(env, METRICS_REGISTRY_NAME))
);
Runtime.getRuntime().addShutdownHook(new Thread(vertx::close));
Runtime.getRuntime().addShutdownHook(new Thread(Shutdown.closeSync(vertx)));

ContractMessageCodec.register(vertx.eventBus());

Expand Down Expand Up @@ -131,27 +131,15 @@ public static void main(final String[] args) throws IOException, InterruptedExce
var fw = new FileWatcher(fs, publisher, new File(env.getDataPlaneConfigFilePath()));

// Gracefully clean up resources.
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
fw.close();
publisher.accept(Contract.newBuilder().build());
final var waitClose = new CountDownLatch(1);
vertx.close(ignore -> waitClose.countDown());
try {
waitClose.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
Runtime.getRuntime().addShutdownHook(new Thread(Shutdown.run(vertx, fw, publisher)));

try {

fw.watch(); // block forever

} catch (final Exception ex) {
logger.error("Failed during filesystem watch", ex);
final var wait = new CountDownLatch(1);
vertx.close(ignore -> wait.countDown());
wait.await(5, TimeUnit.SECONDS);
Shutdown.closeSync(vertx).run();
throw ex;
}
}
Expand Down

0 comments on commit af5e50b

Please sign in to comment.