Skip to content

Commit

Permalink
Apply suggestion
Browse files Browse the repository at this point in the history
Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
  • Loading branch information
pierDipi committed Nov 2, 2020
1 parent 95ca75c commit bb6373d
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import io.vertx.ext.web.client.WebClientOptions;
import io.vertx.micrometer.backends.BackendRegistries;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -60,7 +59,7 @@ public class Main {
*
* @param args command line arguments.
*/
public static void main(final String[] args) throws IOException, InterruptedException {
public static void main(final String[] args) {
// HACK HACK HACK
// maven-shade-plugin doesn't include the LogstashEncoder class, neither by specifying the
// dependency with scope `provided` nor `runtime`, and adding include rules to
Expand All @@ -75,61 +74,60 @@ public static void main(final String[] args) throws IOException, InterruptedExce
final var vertx = Vertx.vertx(
new VertxOptions().setMetricsOptions(MetricsOptionsProvider.get(env, METRICS_REGISTRY_NAME))
);
Runtime.getRuntime().addShutdownHook(new Thread(Shutdown.closeSync(vertx)));

ContractMessageCodec.register(vertx.eventBus());

final var metricsRegistry = BackendRegistries.getNow(METRICS_REGISTRY_NAME);
final var eventsSentCounter = metricsRegistry.counter(HTTP_EVENTS_SENT_COUNT);

final var producerConfig = Configurations.getProperties(env.getProducerConfigFilePath());
final var consumerConfig = Configurations.getProperties(env.getConsumerConfigFilePath());
final var webClientConfig = Configurations.getPropertiesAsJson(env.getWebClientConfigFilePath());

final ConsumerRecordOffsetStrategyFactory<String, CloudEvent>
consumerRecordOffsetStrategyFactory = ConsumerRecordOffsetStrategyFactory.unordered(eventsSentCounter);

final var consumerVerticleFactory = new HttpConsumerVerticleFactory(
consumerRecordOffsetStrategyFactory,
consumerConfig,
WebClient.create(vertx, new WebClientOptions(webClientConfig)),
vertx,
producerConfig
);

final var consumerDeployerVerticle = new ConsumerDeployerVerticle(
consumerVerticleFactory,
env.getEgressesInitialCapacity()
);

final var waitConsumerDeployer = new CountDownLatch(1);
vertx.deployVerticle(consumerDeployerVerticle)
.onSuccess(v -> {
logger.info("Consumer deployer started");
waitConsumerDeployer.countDown();
})
.onFailure(t -> {
// This is a catastrophic failure, close the application
logger.error("Consumer deployer not started", t);
vertx.close(v -> System.exit(1));
});
waitConsumerDeployer.await(5, TimeUnit.SECONDS);

final var publisher = new ContractPublisher(vertx.eventBus(), ResourcesReconcilerMessageHandler.ADDRESS);
final var fs = FileSystems.getDefault().newWatchService();
var fw = new FileWatcher(fs, publisher, new File(env.getDataPlaneConfigFilePath()));

// Gracefully clean up resources.
Runtime.getRuntime().addShutdownHook(new Thread(Shutdown.run(vertx, fw, publisher)));

try {

ContractMessageCodec.register(vertx.eventBus());

final var metricsRegistry = BackendRegistries.getNow(METRICS_REGISTRY_NAME);
final var eventsSentCounter = metricsRegistry.counter(HTTP_EVENTS_SENT_COUNT);

final var producerConfig = Configurations.getProperties(env.getProducerConfigFilePath());
final var consumerConfig = Configurations.getProperties(env.getConsumerConfigFilePath());
final var webClientConfig = Configurations.getPropertiesAsJson(env.getWebClientConfigFilePath());

final ConsumerRecordOffsetStrategyFactory<String, CloudEvent>
consumerRecordOffsetStrategyFactory = ConsumerRecordOffsetStrategyFactory.unordered(eventsSentCounter);

final var consumerVerticleFactory = new HttpConsumerVerticleFactory(
consumerRecordOffsetStrategyFactory,
consumerConfig,
WebClient.create(vertx, new WebClientOptions(webClientConfig)),
vertx,
producerConfig
);

final var consumerDeployerVerticle = new ConsumerDeployerVerticle(
consumerVerticleFactory,
env.getEgressesInitialCapacity()
);

final var waitConsumerDeployer = new CountDownLatch(1);
vertx.deployVerticle(consumerDeployerVerticle)
.onSuccess(v -> {
logger.info("Consumer deployer started");
waitConsumerDeployer.countDown();
})
.onFailure(t -> {
// This is a catastrophic failure, close the application
logger.error("Consumer deployer not started", t);
vertx.close(v -> System.exit(1));
});
waitConsumerDeployer.await(5, TimeUnit.SECONDS);

final var publisher = new ContractPublisher(vertx.eventBus(), ResourcesReconcilerMessageHandler.ADDRESS);
final var fs = FileSystems.getDefault().newWatchService();
var fw = new FileWatcher(fs, publisher, new File(env.getDataPlaneConfigFilePath()));

// Gracefully clean up resources.
Runtime.getRuntime().addShutdownHook(new Thread(Shutdown.run(vertx, fw, publisher)));

fw.watch(); // block forever

} catch (final Exception ex) {
logger.error("Failed during filesystem watch", ex);
Shutdown.closeSync(vertx).run();
throw ex;
}

Shutdown.closeSync(vertx).run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.micrometer.backends.BackendRegistries;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -62,7 +61,7 @@ public class Main {
*
* @param args command line arguments.
*/
public static void main(final String[] args) throws IOException, InterruptedException {
public static void main(final String[] args) {
final var env = new ReceiverEnv(System::getenv);

// HACK HACK HACK
Expand All @@ -79,68 +78,67 @@ public static void main(final String[] args) throws IOException, InterruptedExce
final var vertx = Vertx.vertx(
new VertxOptions().setMetricsOptions(MetricsOptionsProvider.get(env, METRICS_REGISTRY_NAME))
);
Runtime.getRuntime().addShutdownHook(new Thread(Shutdown.closeSync(vertx)));

ContractMessageCodec.register(vertx.eventBus());

final var metricsRegistry = BackendRegistries.getNow(METRICS_REGISTRY_NAME);

final var badRequestCounter = metricsRegistry.counter(HTTP_REQUESTS_MALFORMED_COUNT);
final var produceEventsCounter = metricsRegistry.counter(HTTP_REQUESTS_PRODUCE_COUNT);

producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class);
producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
final var handler = new RequestMapper<>(
producerConfigs,
new CloudEventRequestToRecordMapper(),
properties -> KafkaProducer.create(vertx, properties),
badRequestCounter,
produceEventsCounter
);

final var httpServerOptions = new HttpServerOptions(
Configurations.getPropertiesAsJson(env.getHttpServerConfigFilePath())
);
httpServerOptions.setPort(env.getIngressPort());

final var verticle = new ReceiverVerticle(
httpServerOptions,
handler,
h -> new SimpleProbeHandlerDecorator(
env.getLivenessProbePath(),
env.getReadinessProbePath(),
h
)
);

final var waitVerticle = new CountDownLatch(1);
vertx.deployVerticle(verticle)
.onSuccess(v -> {
logger.info("Receiver started");
waitVerticle.countDown();
})
.onFailure(t -> {
// This is a catastrophic failure, close the application
logger.error("Consumer deployer not started", t);
vertx.close(v -> System.exit(1));
});
waitVerticle.await(5, TimeUnit.SECONDS);

final var publisher = new ContractPublisher(vertx.eventBus(), ResourcesReconcilerMessageHandler.ADDRESS);
final var fs = FileSystems.getDefault().newWatchService();
var fw = new FileWatcher(fs, publisher, new File(env.getDataPlaneConfigFilePath()));

// Gracefully clean up resources.
Runtime.getRuntime().addShutdownHook(new Thread(Shutdown.run(vertx, fw, publisher)));

try {

ContractMessageCodec.register(vertx.eventBus());

final var metricsRegistry = BackendRegistries.getNow(METRICS_REGISTRY_NAME);

final var badRequestCounter = metricsRegistry.counter(HTTP_REQUESTS_MALFORMED_COUNT);
final var produceEventsCounter = metricsRegistry.counter(HTTP_REQUESTS_PRODUCE_COUNT);

producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class);
producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
final var handler = new RequestMapper<>(
producerConfigs,
new CloudEventRequestToRecordMapper(),
properties -> KafkaProducer.create(vertx, properties),
badRequestCounter,
produceEventsCounter
);

final var httpServerOptions = new HttpServerOptions(
Configurations.getPropertiesAsJson(env.getHttpServerConfigFilePath())
);
httpServerOptions.setPort(env.getIngressPort());

final var verticle = new ReceiverVerticle(
httpServerOptions,
handler,
h -> new SimpleProbeHandlerDecorator(
env.getLivenessProbePath(),
env.getReadinessProbePath(),
h
)
);

final var waitVerticle = new CountDownLatch(1);
vertx.deployVerticle(verticle)
.onSuccess(v -> {
logger.info("Receiver started");
waitVerticle.countDown();
})
.onFailure(t -> {
// This is a catastrophic failure, close the application
logger.error("Consumer deployer not started", t);
vertx.close(v -> System.exit(1));
});
waitVerticle.await(5, TimeUnit.SECONDS);

final var publisher = new ContractPublisher(vertx.eventBus(), ResourcesReconcilerMessageHandler.ADDRESS);
final var fs = FileSystems.getDefault().newWatchService();
var fw = new FileWatcher(fs, publisher, new File(env.getDataPlaneConfigFilePath()));

// Gracefully clean up resources.
Runtime.getRuntime().addShutdownHook(new Thread(Shutdown.run(vertx, fw, publisher)));

fw.watch(); // block forever

} catch (final Exception ex) {
logger.error("Failed during filesystem watch", ex);
Shutdown.closeSync(vertx).run();
throw ex;
}

Shutdown.closeSync(vertx).run();
}
}

0 comments on commit bb6373d

Please sign in to comment.