Skip to content

Commit

Permalink
Apply suggestion
Browse files Browse the repository at this point in the history
Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
  • Loading branch information
pierDipi committed Nov 2, 2020
1 parent 95ca75c commit 82e56f6
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,55 +75,54 @@ public static void main(final String[] args) throws IOException, InterruptedExce
final var vertx = Vertx.vertx(
new VertxOptions().setMetricsOptions(MetricsOptionsProvider.get(env, METRICS_REGISTRY_NAME))
);
Runtime.getRuntime().addShutdownHook(new Thread(Shutdown.closeSync(vertx)));

ContractMessageCodec.register(vertx.eventBus());

final var metricsRegistry = BackendRegistries.getNow(METRICS_REGISTRY_NAME);
final var eventsSentCounter = metricsRegistry.counter(HTTP_EVENTS_SENT_COUNT);

final var producerConfig = Configurations.getProperties(env.getProducerConfigFilePath());
final var consumerConfig = Configurations.getProperties(env.getConsumerConfigFilePath());
final var webClientConfig = Configurations.getPropertiesAsJson(env.getWebClientConfigFilePath());

final ConsumerRecordOffsetStrategyFactory<String, CloudEvent>
consumerRecordOffsetStrategyFactory = ConsumerRecordOffsetStrategyFactory.unordered(eventsSentCounter);

final var consumerVerticleFactory = new HttpConsumerVerticleFactory(
consumerRecordOffsetStrategyFactory,
consumerConfig,
WebClient.create(vertx, new WebClientOptions(webClientConfig)),
vertx,
producerConfig
);

final var consumerDeployerVerticle = new ConsumerDeployerVerticle(
consumerVerticleFactory,
env.getEgressesInitialCapacity()
);

final var waitConsumerDeployer = new CountDownLatch(1);
vertx.deployVerticle(consumerDeployerVerticle)
.onSuccess(v -> {
logger.info("Consumer deployer started");
waitConsumerDeployer.countDown();
})
.onFailure(t -> {
// This is a catastrophic failure, close the application
logger.error("Consumer deployer not started", t);
vertx.close(v -> System.exit(1));
});
waitConsumerDeployer.await(5, TimeUnit.SECONDS);

final var publisher = new ContractPublisher(vertx.eventBus(), ResourcesReconcilerMessageHandler.ADDRESS);
final var fs = FileSystems.getDefault().newWatchService();
var fw = new FileWatcher(fs, publisher, new File(env.getDataPlaneConfigFilePath()));

// Gracefully clean up resources.
Runtime.getRuntime().addShutdownHook(new Thread(Shutdown.run(vertx, fw, publisher)));

try {

ContractMessageCodec.register(vertx.eventBus());

final var metricsRegistry = BackendRegistries.getNow(METRICS_REGISTRY_NAME);
final var eventsSentCounter = metricsRegistry.counter(HTTP_EVENTS_SENT_COUNT);

final var producerConfig = Configurations.getProperties(env.getProducerConfigFilePath());
final var consumerConfig = Configurations.getProperties(env.getConsumerConfigFilePath());
final var webClientConfig = Configurations.getPropertiesAsJson(env.getWebClientConfigFilePath());

final ConsumerRecordOffsetStrategyFactory<String, CloudEvent>
consumerRecordOffsetStrategyFactory = ConsumerRecordOffsetStrategyFactory.unordered(eventsSentCounter);

final var consumerVerticleFactory = new HttpConsumerVerticleFactory(
consumerRecordOffsetStrategyFactory,
consumerConfig,
WebClient.create(vertx, new WebClientOptions(webClientConfig)),
vertx,
producerConfig
);

final var consumerDeployerVerticle = new ConsumerDeployerVerticle(
consumerVerticleFactory,
env.getEgressesInitialCapacity()
);

final var waitConsumerDeployer = new CountDownLatch(1);
vertx.deployVerticle(consumerDeployerVerticle)
.onSuccess(v -> {
logger.info("Consumer deployer started");
waitConsumerDeployer.countDown();
})
.onFailure(t -> {
// This is a catastrophic failure, close the application
logger.error("Consumer deployer not started", t);
vertx.close(v -> System.exit(1));
});
waitConsumerDeployer.await(5, TimeUnit.SECONDS);

final var publisher = new ContractPublisher(vertx.eventBus(), ResourcesReconcilerMessageHandler.ADDRESS);
final var fs = FileSystems.getDefault().newWatchService();
var fw = new FileWatcher(fs, publisher, new File(env.getDataPlaneConfigFilePath()));

// Gracefully clean up resources.
Runtime.getRuntime().addShutdownHook(new Thread(Shutdown.run(vertx, fw, publisher)));

fw.watch(); // block forever

} catch (final Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,62 +79,61 @@ public static void main(final String[] args) throws IOException, InterruptedExce
final var vertx = Vertx.vertx(
new VertxOptions().setMetricsOptions(MetricsOptionsProvider.get(env, METRICS_REGISTRY_NAME))
);
Runtime.getRuntime().addShutdownHook(new Thread(Shutdown.closeSync(vertx)));

ContractMessageCodec.register(vertx.eventBus());

final var metricsRegistry = BackendRegistries.getNow(METRICS_REGISTRY_NAME);

final var badRequestCounter = metricsRegistry.counter(HTTP_REQUESTS_MALFORMED_COUNT);
final var produceEventsCounter = metricsRegistry.counter(HTTP_REQUESTS_PRODUCE_COUNT);

producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class);
producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
final var handler = new RequestMapper<>(
producerConfigs,
new CloudEventRequestToRecordMapper(),
properties -> KafkaProducer.create(vertx, properties),
badRequestCounter,
produceEventsCounter
);

final var httpServerOptions = new HttpServerOptions(
Configurations.getPropertiesAsJson(env.getHttpServerConfigFilePath())
);
httpServerOptions.setPort(env.getIngressPort());

final var verticle = new ReceiverVerticle(
httpServerOptions,
handler,
h -> new SimpleProbeHandlerDecorator(
env.getLivenessProbePath(),
env.getReadinessProbePath(),
h
)
);

final var waitVerticle = new CountDownLatch(1);
vertx.deployVerticle(verticle)
.onSuccess(v -> {
logger.info("Receiver started");
waitVerticle.countDown();
})
.onFailure(t -> {
// This is a catastrophic failure, close the application
logger.error("Consumer deployer not started", t);
vertx.close(v -> System.exit(1));
});
waitVerticle.await(5, TimeUnit.SECONDS);

final var publisher = new ContractPublisher(vertx.eventBus(), ResourcesReconcilerMessageHandler.ADDRESS);
final var fs = FileSystems.getDefault().newWatchService();
var fw = new FileWatcher(fs, publisher, new File(env.getDataPlaneConfigFilePath()));

// Gracefully clean up resources.
Runtime.getRuntime().addShutdownHook(new Thread(Shutdown.run(vertx, fw, publisher)));

try {

ContractMessageCodec.register(vertx.eventBus());

final var metricsRegistry = BackendRegistries.getNow(METRICS_REGISTRY_NAME);

final var badRequestCounter = metricsRegistry.counter(HTTP_REQUESTS_MALFORMED_COUNT);
final var produceEventsCounter = metricsRegistry.counter(HTTP_REQUESTS_PRODUCE_COUNT);

producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class);
producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
final var handler = new RequestMapper<>(
producerConfigs,
new CloudEventRequestToRecordMapper(),
properties -> KafkaProducer.create(vertx, properties),
badRequestCounter,
produceEventsCounter
);

final var httpServerOptions = new HttpServerOptions(
Configurations.getPropertiesAsJson(env.getHttpServerConfigFilePath())
);
httpServerOptions.setPort(env.getIngressPort());

final var verticle = new ReceiverVerticle(
httpServerOptions,
handler,
h -> new SimpleProbeHandlerDecorator(
env.getLivenessProbePath(),
env.getReadinessProbePath(),
h
)
);

final var waitVerticle = new CountDownLatch(1);
vertx.deployVerticle(verticle)
.onSuccess(v -> {
logger.info("Receiver started");
waitVerticle.countDown();
})
.onFailure(t -> {
// This is a catastrophic failure, close the application
logger.error("Consumer deployer not started", t);
vertx.close(v -> System.exit(1));
});
waitVerticle.await(5, TimeUnit.SECONDS);

final var publisher = new ContractPublisher(vertx.eventBus(), ResourcesReconcilerMessageHandler.ADDRESS);
final var fs = FileSystems.getDefault().newWatchService();
var fw = new FileWatcher(fs, publisher, new File(env.getDataPlaneConfigFilePath()));

// Gracefully clean up resources.
Runtime.getRuntime().addShutdownHook(new Thread(Shutdown.run(vertx, fw, publisher)));

fw.watch(); // block forever

} catch (final Exception ex) {
Expand Down

0 comments on commit 82e56f6

Please sign in to comment.