Skip to content

Commit

Permalink
Gracefully clean up resources on shutdown (#334)
Browse files Browse the repository at this point in the history
* Gracefully clean up resources on shutdown

Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>

* Extract commond logic into static methods

Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>

* Apply suggestion

Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>

* Increase close timeout

Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>

* Close vertx when there is an exception

Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
  • Loading branch information
pierDipi committed Nov 2, 2020
1 parent e1b46b9 commit 9a4f34d
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import dev.knative.eventing.kafka.broker.contract.DataPlaneContract;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
Expand All @@ -40,14 +41,15 @@
/**
* FileWatcher is the class responsible for watching a given file and reports update.
*/
public class FileWatcher {
public class FileWatcher implements Closeable, AutoCloseable {

private static final Logger logger = LoggerFactory.getLogger(FileWatcher.class);

private final Consumer<DataPlaneContract.Contract> contractConsumer;

private final WatchService watcher;
private final File toWatch;
private volatile boolean closed;

/**
* All args constructor.
Expand Down Expand Up @@ -129,6 +131,9 @@ public void watch() throws IOException, InterruptedException {
}

private void update() throws IOException {
if (closed) {
return;
}
try (
final var fileReader = new FileReader(toWatch);
final var bufferedReader = new BufferedReader(fileReader)) {
Expand All @@ -148,4 +153,9 @@ private void parseFromJson(final Reader content) throws IOException {
logger.warn("failed to parse from JSON", ex);
}
}

@Override
public void close() {
closed = true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package dev.knative.eventing.kafka.broker.core.utils;

import dev.knative.eventing.kafka.broker.contract.DataPlaneContract.Contract;
import io.vertx.core.Vertx;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Shutdown {

private static final Logger logger = LoggerFactory.getLogger(Shutdown.class);

public static Runnable run(final Vertx vertx, final Closeable fw, final Consumer<Contract> publisher) {
return () -> {
try {
fw.close();
} catch (final IOException e) {
logger.error("Failed to close file watcher", e);
}
publisher.accept(Contract.newBuilder().build());
closeSync(vertx).run();
};
}

public static Runnable closeSync(final Vertx vertx) {
return () -> {
final var wait = new CountDownLatch(1);
vertx.close(ignore -> wait.countDown());
try {
wait.await(2, TimeUnit.MINUTES);
} catch (InterruptedException e) {
logger.error("Timeout waiting for vertx close", e);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package dev.knative.eventing.kafka.broker.core.utils;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import dev.knative.eventing.kafka.broker.contract.DataPlaneContract.Contract;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import java.io.Closeable;
import java.io.IOException;
import java.util.function.Consumer;
import org.junit.jupiter.api.Test;

public class ShutdownTest {

@Test
public void run() throws IOException {
final var vertx = mockVertxClose();
final var closeable = mock(Closeable.class);
final Consumer<Contract> consumer = mock(Consumer.class);

Shutdown.run(vertx, closeable, consumer).run();

verify(vertx, times(1)).close(any());
verify(closeable).close();
verify(consumer).accept(Contract.newBuilder().build());
}

@Test
public void closeSync() {
final var vertx = mockVertxClose();

Shutdown.closeSync(vertx).run();

verify(vertx).close(any());
}

private Vertx mockVertxClose() {
final var vertx = mock(Vertx.class);

doAnswer(invocation -> {
final Handler<AsyncResult<Void>> callback = invocation.getArgument(0);
callback.handle(Future.succeededFuture());
return null;
})
.when(vertx).close(any());

return vertx;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import dev.knative.eventing.kafka.broker.core.metrics.MetricsOptionsProvider;
import dev.knative.eventing.kafka.broker.core.reconciler.impl.ResourcesReconcilerMessageHandler;
import dev.knative.eventing.kafka.broker.core.utils.Configurations;
import dev.knative.eventing.kafka.broker.core.utils.Shutdown;
import dev.knative.eventing.kafka.broker.dispatcher.http.HttpConsumerVerticleFactory;
import io.cloudevents.CloudEvent;
import io.vertx.core.Vertx;
Expand All @@ -32,8 +33,9 @@
import io.vertx.ext.web.client.WebClientOptions;
import io.vertx.micrometer.backends.BackendRegistries;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import net.logstash.logback.encoder.LogstashEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -72,52 +74,60 @@ public static void main(final String[] args) {
final var vertx = Vertx.vertx(
new VertxOptions().setMetricsOptions(MetricsOptionsProvider.get(env, METRICS_REGISTRY_NAME))
);
ContractMessageCodec.register(vertx.eventBus());

final var metricsRegistry = BackendRegistries.getNow(METRICS_REGISTRY_NAME);
final var eventsSentCounter = metricsRegistry.counter(HTTP_EVENTS_SENT_COUNT);

Runtime.getRuntime().addShutdownHook(new Thread(vertx::close));
try {

final var producerConfig = Configurations.getProperties(env.getProducerConfigFilePath());
final var consumerConfig = Configurations.getProperties(env.getConsumerConfigFilePath());
final var webClientConfig = Configurations.getPropertiesAsJson(env.getWebClientConfigFilePath());
ContractMessageCodec.register(vertx.eventBus());

final ConsumerRecordOffsetStrategyFactory<String, CloudEvent>
consumerRecordOffsetStrategyFactory = ConsumerRecordOffsetStrategyFactory.unordered(eventsSentCounter);
final var metricsRegistry = BackendRegistries.getNow(METRICS_REGISTRY_NAME);
final var eventsSentCounter = metricsRegistry.counter(HTTP_EVENTS_SENT_COUNT);

final var consumerVerticleFactory = new HttpConsumerVerticleFactory(
consumerRecordOffsetStrategyFactory,
consumerConfig,
WebClient.create(vertx, new WebClientOptions(webClientConfig)),
vertx,
producerConfig
);
final var producerConfig = Configurations.getProperties(env.getProducerConfigFilePath());
final var consumerConfig = Configurations.getProperties(env.getConsumerConfigFilePath());
final var webClientConfig = Configurations.getPropertiesAsJson(env.getWebClientConfigFilePath());

final var consumerDeployerVerticle = new ConsumerDeployerVerticle(
consumerVerticleFactory,
env.getEgressesInitialCapacity()
);
final ConsumerRecordOffsetStrategyFactory<String, CloudEvent>
consumerRecordOffsetStrategyFactory = ConsumerRecordOffsetStrategyFactory.unordered(eventsSentCounter);

vertx.deployVerticle(consumerDeployerVerticle)
.onSuccess(v -> logger.info("consumer deployer started"))
.onFailure(t -> {
// This is a catastrophic failure, close the application
logger.error("consumer deployer not started", t);
vertx.close(v -> System.exit(1));
});
final var consumerVerticleFactory = new HttpConsumerVerticleFactory(
consumerRecordOffsetStrategyFactory,
consumerConfig,
WebClient.create(vertx, new WebClientOptions(webClientConfig)),
vertx,
producerConfig
);

try {
final var fw = new FileWatcher(
FileSystems.getDefault().newWatchService(),
new ContractPublisher(vertx.eventBus(), ResourcesReconcilerMessageHandler.ADDRESS),
new File(env.getDataPlaneConfigFilePath())
final var consumerDeployerVerticle = new ConsumerDeployerVerticle(
consumerVerticleFactory,
env.getEgressesInitialCapacity()
);

final var waitConsumerDeployer = new CountDownLatch(1);
vertx.deployVerticle(consumerDeployerVerticle)
.onSuccess(v -> {
logger.info("Consumer deployer started");
waitConsumerDeployer.countDown();
})
.onFailure(t -> {
// This is a catastrophic failure, close the application
logger.error("Consumer deployer not started", t);
vertx.close(v -> System.exit(1));
});
waitConsumerDeployer.await(5, TimeUnit.SECONDS);

final var publisher = new ContractPublisher(vertx.eventBus(), ResourcesReconcilerMessageHandler.ADDRESS);
final var fs = FileSystems.getDefault().newWatchService();
var fw = new FileWatcher(fs, publisher, new File(env.getDataPlaneConfigFilePath()));

// Gracefully clean up resources.
Runtime.getRuntime().addShutdownHook(new Thread(Shutdown.run(vertx, fw, publisher)));

fw.watch(); // block forever

} catch (InterruptedException | IOException ex) {
logger.error("failed during filesystem watch", ex);
} catch (final Exception ex) {
logger.error("Failed during filesystem watch", ex);

Shutdown.closeSync(vertx).run();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@
import dev.knative.eventing.kafka.broker.core.metrics.MetricsOptionsProvider;
import dev.knative.eventing.kafka.broker.core.reconciler.impl.ResourcesReconcilerMessageHandler;
import dev.knative.eventing.kafka.broker.core.utils.Configurations;
import dev.knative.eventing.kafka.broker.core.utils.Shutdown;
import io.cloudevents.kafka.CloudEventSerializer;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.micrometer.backends.BackendRegistries;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import net.logstash.logback.encoder.LogstashEncoder;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
Expand Down Expand Up @@ -76,58 +78,67 @@ public static void main(final String[] args) {
final var vertx = Vertx.vertx(
new VertxOptions().setMetricsOptions(MetricsOptionsProvider.get(env, METRICS_REGISTRY_NAME))
);
ContractMessageCodec.register(vertx.eventBus());

final var metricsRegistry = BackendRegistries.getNow(METRICS_REGISTRY_NAME);
try {

final var badRequestCounter = metricsRegistry.counter(HTTP_REQUESTS_MALFORMED_COUNT);
final var produceEventsCounter = metricsRegistry.counter(HTTP_REQUESTS_PRODUCE_COUNT);
ContractMessageCodec.register(vertx.eventBus());

producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class);
producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
final var handler = new RequestMapper<>(
producerConfigs,
new CloudEventRequestToRecordMapper(),
properties -> KafkaProducer.create(vertx, properties),
badRequestCounter,
produceEventsCounter
);
final var metricsRegistry = BackendRegistries.getNow(METRICS_REGISTRY_NAME);

final var httpServerOptions = new HttpServerOptions(
Configurations.getPropertiesAsJson(env.getHttpServerConfigFilePath())
);
httpServerOptions.setPort(env.getIngressPort());

final var verticle = new ReceiverVerticle(
httpServerOptions,
handler,
h -> new SimpleProbeHandlerDecorator(
env.getLivenessProbePath(),
env.getReadinessProbePath(),
h
)
);
final var badRequestCounter = metricsRegistry.counter(HTTP_REQUESTS_MALFORMED_COUNT);
final var produceEventsCounter = metricsRegistry.counter(HTTP_REQUESTS_PRODUCE_COUNT);

vertx.deployVerticle(verticle)
.onSuccess(v -> logger.info("receiver started"))
.onFailure(t -> logger.error("receiver not started", t));
producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class);
producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
final var handler = new RequestMapper<>(
producerConfigs,
new CloudEventRequestToRecordMapper(),
properties -> KafkaProducer.create(vertx, properties),
badRequestCounter,
produceEventsCounter
);

try {
// TODO add a shutdown hook that calls objectsCreator.reconcile(Brokers.newBuilder().build()),
// so that producers flush their buffers.
// Note: reconcile(Brokers) isn't thread safe so we need to make sure to stop the watcher
// from calling reconcile first

final var fw = new FileWatcher(
FileSystems.getDefault().newWatchService(),
new ContractPublisher(vertx.eventBus(), ResourcesReconcilerMessageHandler.ADDRESS),
new File(env.getDataPlaneConfigFilePath())
final var httpServerOptions = new HttpServerOptions(
Configurations.getPropertiesAsJson(env.getHttpServerConfigFilePath())
);
httpServerOptions.setPort(env.getIngressPort());

final var verticle = new ReceiverVerticle(
httpServerOptions,
handler,
h -> new SimpleProbeHandlerDecorator(
env.getLivenessProbePath(),
env.getReadinessProbePath(),
h
)
);

final var waitVerticle = new CountDownLatch(1);
vertx.deployVerticle(verticle)
.onSuccess(v -> {
logger.info("Receiver started");
waitVerticle.countDown();
})
.onFailure(t -> {
// This is a catastrophic failure, close the application
logger.error("Consumer deployer not started", t);
vertx.close(v -> System.exit(1));
});
waitVerticle.await(5, TimeUnit.SECONDS);

final var publisher = new ContractPublisher(vertx.eventBus(), ResourcesReconcilerMessageHandler.ADDRESS);
final var fs = FileSystems.getDefault().newWatchService();
var fw = new FileWatcher(fs, publisher, new File(env.getDataPlaneConfigFilePath()));

// Gracefully clean up resources.
Runtime.getRuntime().addShutdownHook(new Thread(Shutdown.run(vertx, fw, publisher)));

fw.watch(); // block forever

} catch (InterruptedException | IOException ex) {
logger.error("failed during filesystem watch", ex);
} catch (final Exception ex) {
logger.error("Failed during filesystem watch", ex);

Shutdown.closeSync(vertx).run();
}
}
}

0 comments on commit 9a4f34d

Please sign in to comment.