Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gracefully clean up resources on shutdown #334

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import dev.knative.eventing.kafka.broker.contract.DataPlaneContract;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
Expand All @@ -40,14 +41,15 @@
/**
* FileWatcher is the class responsible for watching a given file and reports update.
*/
public class FileWatcher {
public class FileWatcher implements Closeable, AutoCloseable {

private static final Logger logger = LoggerFactory.getLogger(FileWatcher.class);

private final Consumer<DataPlaneContract.Contract> contractConsumer;

private final WatchService watcher;
private final File toWatch;
private volatile boolean closed;

/**
* All args constructor.
Expand Down Expand Up @@ -129,6 +131,9 @@ public void watch() throws IOException, InterruptedException {
}

private void update() throws IOException {
if (closed) {
return;
}
try (
final var fileReader = new FileReader(toWatch);
final var bufferedReader = new BufferedReader(fileReader)) {
Expand All @@ -148,4 +153,9 @@ private void parseFromJson(final Reader content) throws IOException {
logger.warn("failed to parse from JSON", ex);
}
}

@Override
public void close() {
closed = true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package dev.knative.eventing.kafka.broker.core.utils;

import dev.knative.eventing.kafka.broker.contract.DataPlaneContract.Contract;
import io.vertx.core.Vertx;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Shutdown {

private static final Logger logger = LoggerFactory.getLogger(Shutdown.class);

public static Runnable run(final Vertx vertx, final Closeable fw, final Consumer<Contract> publisher) {
return () -> {
try {
fw.close();
} catch (final IOException e) {
logger.error("Failed to close file watcher", e);
}
publisher.accept(Contract.newBuilder().build());
closeSync(vertx).run();
};
}

public static Runnable closeSync(final Vertx vertx) {
return () -> {
final var wait = new CountDownLatch(1);
vertx.close(ignore -> wait.countDown());
try {
wait.await(2, TimeUnit.MINUTES);
} catch (InterruptedException e) {
logger.error("Timeout waiting for vertx close", e);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package dev.knative.eventing.kafka.broker.core.utils;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import dev.knative.eventing.kafka.broker.contract.DataPlaneContract.Contract;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import java.io.Closeable;
import java.io.IOException;
import java.util.function.Consumer;
import org.junit.jupiter.api.Test;

public class ShutdownTest {

@Test
public void run() throws IOException {
final var vertx = mockVertxClose();
final var closeable = mock(Closeable.class);
final Consumer<Contract> consumer = mock(Consumer.class);

Shutdown.run(vertx, closeable, consumer).run();

verify(vertx, times(1)).close(any());
verify(closeable).close();
verify(consumer).accept(Contract.newBuilder().build());
}

@Test
public void closeSync() {
final var vertx = mockVertxClose();

Shutdown.closeSync(vertx).run();

verify(vertx).close(any());
}

private Vertx mockVertxClose() {
final var vertx = mock(Vertx.class);

doAnswer(invocation -> {
final Handler<AsyncResult<Void>> callback = invocation.getArgument(0);
callback.handle(Future.succeededFuture());
return null;
})
.when(vertx).close(any());

return vertx;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import dev.knative.eventing.kafka.broker.core.metrics.MetricsOptionsProvider;
import dev.knative.eventing.kafka.broker.core.reconciler.impl.ResourcesReconcilerMessageHandler;
import dev.knative.eventing.kafka.broker.core.utils.Configurations;
import dev.knative.eventing.kafka.broker.core.utils.Shutdown;
import dev.knative.eventing.kafka.broker.dispatcher.http.HttpConsumerVerticleFactory;
import io.cloudevents.CloudEvent;
import io.vertx.core.Vertx;
Expand All @@ -32,8 +33,9 @@
import io.vertx.ext.web.client.WebClientOptions;
import io.vertx.micrometer.backends.BackendRegistries;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import net.logstash.logback.encoder.LogstashEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -72,52 +74,60 @@ public static void main(final String[] args) {
final var vertx = Vertx.vertx(
new VertxOptions().setMetricsOptions(MetricsOptionsProvider.get(env, METRICS_REGISTRY_NAME))
);
ContractMessageCodec.register(vertx.eventBus());

final var metricsRegistry = BackendRegistries.getNow(METRICS_REGISTRY_NAME);
final var eventsSentCounter = metricsRegistry.counter(HTTP_EVENTS_SENT_COUNT);

Runtime.getRuntime().addShutdownHook(new Thread(vertx::close));
try {

final var producerConfig = Configurations.getProperties(env.getProducerConfigFilePath());
final var consumerConfig = Configurations.getProperties(env.getConsumerConfigFilePath());
final var webClientConfig = Configurations.getPropertiesAsJson(env.getWebClientConfigFilePath());
ContractMessageCodec.register(vertx.eventBus());

final ConsumerRecordOffsetStrategyFactory<String, CloudEvent>
consumerRecordOffsetStrategyFactory = ConsumerRecordOffsetStrategyFactory.unordered(eventsSentCounter);
final var metricsRegistry = BackendRegistries.getNow(METRICS_REGISTRY_NAME);
final var eventsSentCounter = metricsRegistry.counter(HTTP_EVENTS_SENT_COUNT);

final var consumerVerticleFactory = new HttpConsumerVerticleFactory(
consumerRecordOffsetStrategyFactory,
consumerConfig,
WebClient.create(vertx, new WebClientOptions(webClientConfig)),
vertx,
producerConfig
);
final var producerConfig = Configurations.getProperties(env.getProducerConfigFilePath());
final var consumerConfig = Configurations.getProperties(env.getConsumerConfigFilePath());
final var webClientConfig = Configurations.getPropertiesAsJson(env.getWebClientConfigFilePath());

final var consumerDeployerVerticle = new ConsumerDeployerVerticle(
consumerVerticleFactory,
env.getEgressesInitialCapacity()
);
final ConsumerRecordOffsetStrategyFactory<String, CloudEvent>
consumerRecordOffsetStrategyFactory = ConsumerRecordOffsetStrategyFactory.unordered(eventsSentCounter);

vertx.deployVerticle(consumerDeployerVerticle)
.onSuccess(v -> logger.info("consumer deployer started"))
.onFailure(t -> {
// This is a catastrophic failure, close the application
logger.error("consumer deployer not started", t);
vertx.close(v -> System.exit(1));
});
final var consumerVerticleFactory = new HttpConsumerVerticleFactory(
consumerRecordOffsetStrategyFactory,
consumerConfig,
WebClient.create(vertx, new WebClientOptions(webClientConfig)),
vertx,
producerConfig
);

try {
final var fw = new FileWatcher(
FileSystems.getDefault().newWatchService(),
new ContractPublisher(vertx.eventBus(), ResourcesReconcilerMessageHandler.ADDRESS),
new File(env.getDataPlaneConfigFilePath())
final var consumerDeployerVerticle = new ConsumerDeployerVerticle(
consumerVerticleFactory,
env.getEgressesInitialCapacity()
);

final var waitConsumerDeployer = new CountDownLatch(1);
vertx.deployVerticle(consumerDeployerVerticle)
.onSuccess(v -> {
logger.info("Consumer deployer started");
waitConsumerDeployer.countDown();
})
.onFailure(t -> {
// This is a catastrophic failure, close the application
logger.error("Consumer deployer not started", t);
vertx.close(v -> System.exit(1));
});
waitConsumerDeployer.await(5, TimeUnit.SECONDS);

final var publisher = new ContractPublisher(vertx.eventBus(), ResourcesReconcilerMessageHandler.ADDRESS);
final var fs = FileSystems.getDefault().newWatchService();
var fw = new FileWatcher(fs, publisher, new File(env.getDataPlaneConfigFilePath()));

// Gracefully clean up resources.
Runtime.getRuntime().addShutdownHook(new Thread(Shutdown.run(vertx, fw, publisher)));

fw.watch(); // block forever

} catch (InterruptedException | IOException ex) {
logger.error("failed during filesystem watch", ex);
} catch (final Exception ex) {
logger.error("Failed during filesystem watch", ex);

Shutdown.closeSync(vertx).run();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@
import dev.knative.eventing.kafka.broker.core.metrics.MetricsOptionsProvider;
import dev.knative.eventing.kafka.broker.core.reconciler.impl.ResourcesReconcilerMessageHandler;
import dev.knative.eventing.kafka.broker.core.utils.Configurations;
import dev.knative.eventing.kafka.broker.core.utils.Shutdown;
import io.cloudevents.kafka.CloudEventSerializer;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.micrometer.backends.BackendRegistries;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import net.logstash.logback.encoder.LogstashEncoder;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
Expand Down Expand Up @@ -76,58 +78,67 @@ public static void main(final String[] args) {
final var vertx = Vertx.vertx(
new VertxOptions().setMetricsOptions(MetricsOptionsProvider.get(env, METRICS_REGISTRY_NAME))
);
ContractMessageCodec.register(vertx.eventBus());

final var metricsRegistry = BackendRegistries.getNow(METRICS_REGISTRY_NAME);
try {

final var badRequestCounter = metricsRegistry.counter(HTTP_REQUESTS_MALFORMED_COUNT);
final var produceEventsCounter = metricsRegistry.counter(HTTP_REQUESTS_PRODUCE_COUNT);
ContractMessageCodec.register(vertx.eventBus());

producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class);
producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
final var handler = new RequestMapper<>(
producerConfigs,
new CloudEventRequestToRecordMapper(),
properties -> KafkaProducer.create(vertx, properties),
badRequestCounter,
produceEventsCounter
);
final var metricsRegistry = BackendRegistries.getNow(METRICS_REGISTRY_NAME);

final var httpServerOptions = new HttpServerOptions(
Configurations.getPropertiesAsJson(env.getHttpServerConfigFilePath())
);
httpServerOptions.setPort(env.getIngressPort());

final var verticle = new ReceiverVerticle(
httpServerOptions,
handler,
h -> new SimpleProbeHandlerDecorator(
env.getLivenessProbePath(),
env.getReadinessProbePath(),
h
)
);
final var badRequestCounter = metricsRegistry.counter(HTTP_REQUESTS_MALFORMED_COUNT);
final var produceEventsCounter = metricsRegistry.counter(HTTP_REQUESTS_PRODUCE_COUNT);

vertx.deployVerticle(verticle)
.onSuccess(v -> logger.info("receiver started"))
.onFailure(t -> logger.error("receiver not started", t));
producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class);
producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
final var handler = new RequestMapper<>(
producerConfigs,
new CloudEventRequestToRecordMapper(),
properties -> KafkaProducer.create(vertx, properties),
badRequestCounter,
produceEventsCounter
);

try {
// TODO add a shutdown hook that calls objectsCreator.reconcile(Brokers.newBuilder().build()),
// so that producers flush their buffers.
// Note: reconcile(Brokers) isn't thread safe so we need to make sure to stop the watcher
// from calling reconcile first

final var fw = new FileWatcher(
FileSystems.getDefault().newWatchService(),
new ContractPublisher(vertx.eventBus(), ResourcesReconcilerMessageHandler.ADDRESS),
new File(env.getDataPlaneConfigFilePath())
final var httpServerOptions = new HttpServerOptions(
Configurations.getPropertiesAsJson(env.getHttpServerConfigFilePath())
);
httpServerOptions.setPort(env.getIngressPort());

final var verticle = new ReceiverVerticle(
httpServerOptions,
handler,
h -> new SimpleProbeHandlerDecorator(
env.getLivenessProbePath(),
env.getReadinessProbePath(),
h
)
);

final var waitVerticle = new CountDownLatch(1);
vertx.deployVerticle(verticle)
.onSuccess(v -> {
logger.info("Receiver started");
waitVerticle.countDown();
})
.onFailure(t -> {
// This is a catastrophic failure, close the application
logger.error("Consumer deployer not started", t);
vertx.close(v -> System.exit(1));
});
waitVerticle.await(5, TimeUnit.SECONDS);

final var publisher = new ContractPublisher(vertx.eventBus(), ResourcesReconcilerMessageHandler.ADDRESS);
final var fs = FileSystems.getDefault().newWatchService();
var fw = new FileWatcher(fs, publisher, new File(env.getDataPlaneConfigFilePath()));

// Gracefully clean up resources.
Runtime.getRuntime().addShutdownHook(new Thread(Shutdown.run(vertx, fw, publisher)));

fw.watch(); // block forever

} catch (InterruptedException | IOException ex) {
logger.error("failed during filesystem watch", ex);
} catch (final Exception ex) {
logger.error("Failed during filesystem watch", ex);

Shutdown.closeSync(vertx).run();
}
}
}