Skip to content

Commit

Permalink
Support Quarkus command mode apache#1037
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed Jun 18, 2020
1 parent 1bdb772 commit c7e6653
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.quarkus.deployment.annotations.ExecutionTime;
import io.quarkus.deployment.annotations.Overridable;
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.RawCommandLineArgumentsBuildItem;
import io.quarkus.deployment.builditem.ServiceStartBuildItem;
import io.quarkus.runtime.RuntimeValue;
import org.apache.camel.CamelContext;
Expand Down Expand Up @@ -131,16 +132,17 @@ CamelMainBuildItem main(
* after having processed a certain number of messagees.
* </ul>
*
* @param beanContainer a reference to a fully initialized CDI bean container
* @param recorder the recorder.
* @param main a reference to a {@link CamelMain}.
* @param customizers a list of {@link org.apache.camel.quarkus.core.CamelContextCustomizer} that will be executed
* before starting the {@link CamelContext} at {@link ExecutionTime#RUNTIME_INIT}.
* @param startList a placeholder to ensure camel-main start after the ArC container is fully initialized. This
* is required as under the hoods the camel registry may look-up beans form the
* container thus we need it to be fully initialized to avoid unexpected behaviors.
* @param runtimeTasks a placeholder to ensure all the runtime task are properly are done.
* @return a build item holding a {@link CamelRuntime} instance.
* @param beanContainer a reference to a fully initialized CDI bean container
* @param recorder the recorder.
* @param main a reference to a {@link CamelMain}.
* @param customizers a list of {@link org.apache.camel.quarkus.core.CamelContextCustomizer} that will be
* executed before starting the {@link CamelContext} at {@link ExecutionTime#RUNTIME_INIT}.
* @param commandLineArguments a reference to the raw command line arguments as they were passed to the application.
* @param startList a placeholder to ensure camel-main start after the ArC container is fully initialized.
* This is required as under the hoods the camel registry may look-up beans form the
* container thus we need it to be fully initialized to avoid unexpected behaviors.
* @param runtimeTasks a placeholder to ensure all the runtime task are properly are done.
* @return a build item holding a {@link CamelRuntime} instance.
*/
@BuildStep
@Record(value = ExecutionTime.RUNTIME_INIT, optional = true)
Expand All @@ -149,6 +151,7 @@ CamelRuntimeBuildItem runtime(
CamelMainRecorder recorder,
CamelMainBuildItem main,
List<RuntimeCamelContextCustomizerBuildItem> customizers,
RawCommandLineArgumentsBuildItem commandLineArguments,
List<ServiceStartBuildItem> startList,
List<CamelRuntimeTaskBuildItem> runtimeTasks) {

Expand All @@ -163,6 +166,6 @@ CamelRuntimeBuildItem runtime(
customizers.stream().map(RuntimeCamelContextCustomizerBuildItem::get).collect(Collectors.toList()));

return new CamelRuntimeBuildItem(
recorder.createRuntime(beanContainer.getValue(), main.getInstance()));
recorder.createRuntime(beanContainer.getValue(), main.getInstance(), commandLineArguments));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,54 +18,30 @@

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import io.quarkus.runtime.Quarkus;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.main.BaseMainSupport;
import org.apache.camel.main.MainCommandLineSupport;
import org.apache.camel.main.MainConfigurationProperties;
import org.apache.camel.main.MainListener;
import org.apache.camel.main.MainShutdownStrategy;
import org.apache.camel.spi.CamelBeanPostProcessor;
import org.apache.camel.support.service.ServiceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.camel.spi.HasCamelContext;

public class CamelMain extends BaseMainSupport implements CamelContextAware {
private static final Logger LOGGER = LoggerFactory.getLogger(CamelMain.class);

@Override
public void setCamelContext(CamelContext camelContext) {
public final class CamelMain extends MainCommandLineSupport implements HasCamelContext {
public CamelMain(CamelContext camelContext) {
this.camelContext = camelContext;
}

@Override
protected void doInit() throws Exception {
super.doInit();
postProcessCamelContext(getCamelContext());
getCamelContext().init();
}

@Override
protected void doStart() throws Exception {
for (MainListener listener : listeners) {
listener.beforeStart(this);
}

getCamelContext().start();

for (MainListener listener : listeners) {
listener.afterStart(this);
}
}

@Override
protected void postProcessCamelContext(CamelContext camelContext) throws Exception {
super.postProcessCamelContext(camelContext);

// post process classes with camel's post processor so classes have support
// for camel's simple di
protected void loadRouteBuilders(CamelContext camelContext) throws Exception {
// routes are discovered and pre-instantiated which allow to post process them to support Camel's DI
CamelBeanPostProcessor postProcessor = camelContext.adapt(ExtendedCamelContext.class).getBeanPostProcessor();
for (RoutesBuilder builder : mainConfigurationProperties.getRoutesBuilders()) {
postProcessor.postProcessBeforeInitialization(builder, builder.getClass().getName());
Expand All @@ -74,35 +50,40 @@ protected void postProcessCamelContext(CamelContext camelContext) throws Excepti
}

@Override
protected void loadRouteBuilders(CamelContext camelContext) throws Exception {
// classes are automatically discovered by build processors
protected void doInit() throws Exception {
setShutdownStrategy(new ShutdownStrategy());

super.doInit();
initCamelContext();
}

@Override
protected void doStop() throws Exception {
protected void doStart() throws Exception {
super.doStart();
try {
if (camelTemplate != null) {
ServiceHelper.stopService(camelTemplate);
camelTemplate = null;
// if we were veto started then mark as completed
this.camelContext.start();
} finally {
if (getCamelContext().isVetoStarted()) {
completed();
}
} catch (Exception e) {
LOGGER.debug("Error stopping camelTemplate due " + e.getMessage() + ". This exception is ignored.", e);
}

for (MainListener listener : listeners) {
listener.beforeStop(this);
}
}

getCamelContext().stop();

for (MainListener listener : listeners) {
listener.afterStop(this);
}
@Override
protected void doStop() throws Exception {
super.doStop();
this.camelContext.stop();
}

@Override
protected ProducerTemplate findOrCreateCamelTemplate() {
return getCamelContext().createProducerTemplate();
return this.camelContext.createProducerTemplate();
}

@Override
protected void initCamelContext() throws Exception {
postProcessCamelContext(camelContext);
}

@Override
Expand All @@ -117,4 +98,50 @@ Collection<MainListener> getMainListeners() {
MainConfigurationProperties getMainConfigurationProperties() {
return mainConfigurationProperties;
}

/**
* Implementation of a {@link MainShutdownStrategy} based on Quarkus Command Mode.
*
* @see <a href="https://quarkus.io/guides/command-mode-reference">Quarkus Command Mode Applications</a>
*/
private class ShutdownStrategy implements MainShutdownStrategy {
private final AtomicBoolean completed;

public ShutdownStrategy() {
this.completed = new AtomicBoolean();
}

@Override
public boolean isRunAllowed() {
return !completed.get();
}

@Override
public boolean shutdown() {
if (completed.compareAndSet(false, true)) {
LOG.info("Shutdown process started");
Quarkus.asyncExit(getExitCode());

return true;
}

return false;
}

@Override
public void await() throws InterruptedException {
LOG.info("Wait for exit");
Quarkus.waitForExit();
LOG.info("Mark as completed");
completed.set(true);
}

@Override
public void await(long timeout, TimeUnit unit) throws InterruptedException {
if (!completed.get()) {
// TODO: this should be revisited
new CountDownLatch(1).await(timeout, unit);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.camel.quarkus.main;

import java.util.List;
import java.util.function.Supplier;

import io.quarkus.arc.runtime.BeanContainer;
import io.quarkus.runtime.RuntimeValue;
Expand All @@ -39,9 +40,8 @@ public RuntimeValue<CamelMain> createCamelMain(
RuntimeValue<CamelContext> runtime,
RuntimeValue<RoutesCollector> routesCollector,
BeanContainer container) {
CamelMain main = new CamelMain();
CamelMain main = new CamelMain(runtime.getValue());
main.setRoutesCollector(routesCollector.getValue());
main.setCamelContext(runtime.getValue());
main.addMainListener(new CamelMainEventDispatcher());

// autowire only non null values as components may have configured
Expand Down Expand Up @@ -99,8 +99,12 @@ public void afterConfigure(BaseMainSupport main) {
});
}

public RuntimeValue<CamelRuntime> createRuntime(BeanContainer beanContainer, RuntimeValue<CamelMain> main) {
final CamelRuntime runtime = new CamelMainRuntime(main.getValue());
public RuntimeValue<CamelRuntime> createRuntime(
BeanContainer beanContainer,
RuntimeValue<CamelMain> main,
Supplier<String[]> args) {

final CamelRuntime runtime = new CamelMainRuntime(main.getValue(), args);

// register to the container
beanContainer.instance(CamelProducers.class).setRuntime(runtime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,40 @@
*/
package org.apache.camel.quarkus.main;

import java.util.Arrays;
import java.util.function.Supplier;

import org.apache.camel.CamelContext;
import org.apache.camel.quarkus.core.CamelRuntime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An implementation of the {@link CamelRuntime} based on camel-main.
*/
public class CamelMainRuntime implements CamelRuntime {
private static final Logger LOGGER = LoggerFactory.getLogger(CamelMainRuntime.class);
private final CamelMain main;
private final Supplier<String[]> args;

public CamelMainRuntime(CamelMain main) {
public CamelMainRuntime(CamelMain main, Supplier<String[]> args) {
this.main = main;
this.args = args;
}

@Override
public void start() {
main.start();
try {
final String[] arguments = args.get();
if (arguments.length > 0) {
LOGGER.info("Starting camel-quarkus with args: {}", Arrays.toString(arguments));
}

main.run(arguments);
} catch (Exception e) {
LOGGER.error("Failed to start application", e);
stop();
}
}

@Override
Expand Down

0 comments on commit c7e6653

Please sign in to comment.