Skip to content

Commit

Permalink
knative: split producer and consumer apache#521
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed Nov 11, 2020
1 parent c9ada01 commit 61538b3
Show file tree
Hide file tree
Showing 19 changed files with 381 additions and 191 deletions.
Expand Up @@ -23,12 +23,14 @@
import io.quarkus.deployment.annotations.ExecutionTime;
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.runtime.RuntimeValue;
import io.quarkus.vertx.core.deployment.CoreVertxBuildItem;
import io.quarkus.vertx.http.deployment.BodyHandlerBuildItem;
import io.quarkus.vertx.http.deployment.VertxWebRouterBuildItem;
import org.apache.camel.component.knative.KnativeComponent;
import org.apache.camel.component.knative.KnativeConstants;
import org.apache.camel.component.knative.spi.KnativeConsumerFactory;
import org.apache.camel.component.knative.spi.KnativeEnvironment;
import org.apache.camel.component.knative.spi.KnativeProducerFactory;
import org.apache.camel.k.quarkus.knative.KnativeRecorder;
import org.apache.camel.quarkus.core.deployment.spi.CamelRuntimeBeanBuildItem;
import org.apache.camel.quarkus.core.deployment.spi.CamelServiceFilter;
Expand Down Expand Up @@ -63,16 +65,20 @@ List<CamelServiceFilterBuildItem> servicesFilters() {
CamelRuntimeBeanBuildItem knativeComponent(
KnativeRecorder recorder,
CoreVertxBuildItem vertx,
VertxWebRouterBuildItem router,
BodyHandlerBuildItem bodyHandlerBuildItem) {
VertxWebRouterBuildItem router) {

RuntimeValue<KnativeProducerFactory> producerFactory = vertx != null
? recorder.createKnativeHttpProducerFactory(vertx.getVertx())
: null;

RuntimeValue<KnativeConsumerFactory> consumerFactory = router != null
? recorder.createKnativeHttpConsumerFactory(router.getRouter())
: null;

return new CamelRuntimeBeanBuildItem(
KnativeConstants.SCHEME,
KnativeComponent.class.getName(),
recorder.createKnativeComponent(
vertx.getVertx(),
router.getRouter(),
bodyHandlerBuildItem.getHandler())
recorder.createKnativeComponent(producerFactory, consumerFactory)
);
}
}
Expand Up @@ -20,31 +20,47 @@

import io.quarkus.runtime.RuntimeValue;
import io.quarkus.runtime.annotations.Recorder;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import org.apache.camel.component.knative.KnativeComponent;
import org.apache.camel.component.knative.http.KnativeHttpTransport;
import org.apache.camel.component.platform.http.vertx.VertxPlatformHttpRouter;
import org.apache.camel.component.knative.http.KnativeHttpConsumerFactory;
import org.apache.camel.component.knative.http.KnativeHttpProducerFactory;
import org.apache.camel.component.knative.spi.KnativeConsumerFactory;
import org.apache.camel.component.knative.spi.KnativeProducerFactory;

@Recorder
public class KnativeRecorder {

public RuntimeValue<KnativeProducerFactory> createKnativeHttpProducerFactory(
Supplier<Vertx> vertx) {

KnativeHttpProducerFactory producerFactory = new KnativeHttpProducerFactory();
producerFactory.setVertx(vertx.get());

return new RuntimeValue<>(producerFactory);
}

public RuntimeValue<KnativeConsumerFactory> createKnativeHttpConsumerFactory(
RuntimeValue<Router> router) {

KnativeHttpConsumerFactory consumerFactory = new KnativeHttpConsumerFactory();
consumerFactory.setRouter(router.getValue());

return new RuntimeValue<>(consumerFactory);
}

public RuntimeValue<KnativeComponent> createKnativeComponent(
Supplier<Vertx> vertx,
RuntimeValue<Router> router,
Handler<RoutingContext> bodyHandler) {

KnativeHttpTransport transport = new KnativeHttpTransport();
transport.setRouter(new VertxPlatformHttpRouter(vertx.get(), router.getValue()) {
@Override
public Handler<RoutingContext> bodyHandler() {
return bodyHandler;
}
});
RuntimeValue<KnativeProducerFactory> producerFactory,
RuntimeValue<KnativeConsumerFactory> consumerFactory) {

KnativeComponent component = new KnativeComponent();
component.setTransport(transport);

if (producerFactory != null) {
component.setProducerFactory(producerFactory.getValue());
}
if (consumerFactory != null) {
component.setConsumerFactory(consumerFactory.getValue());
}

return new RuntimeValue<>(component);
}
Expand Down
Expand Up @@ -19,23 +19,9 @@
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.Service;

public interface KnativeTransport extends Service {
/**
* Create a camel {@link Producer} in place of the original endpoint for a specific protocol.
*
* @param endpoint the endpoint for which the producer should be created
* @param configuration the general transport configuration
* @param service the service definition containing information about how make reach the target service.
* @return
*/
Producer createProducer(
Endpoint endpoint,
KnativeTransportConfiguration configuration,
KnativeEnvironment.KnativeResource service);

public interface KnativeConsumerFactory extends Service {
/**
* Create a camel {@link Consumer} in place of the original endpoint for a specific protocol.
*
Expand Down
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.knative.spi;

import org.apache.camel.Endpoint;
import org.apache.camel.Producer;
import org.apache.camel.Service;

public interface KnativeProducerFactory extends Service {
/**
* Create a camel {@link Producer} in place of the original endpoint for a specific protocol.
*
* @param endpoint the endpoint for which the producer should be created
* @param configuration the general transport configuration
* @param service the service definition containing information about how make reach the target service.
* @return
*/
Producer createProducer(
Endpoint endpoint,
KnativeTransportConfiguration configuration,
KnativeEnvironment.KnativeResource service);
}
16 changes: 11 additions & 5 deletions components/camel-knative/camel-knative-http/pom.xml
Expand Up @@ -46,13 +46,14 @@
</dependency>

<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-platform-http-vertx</artifactId>
<groupId>io.vertx</groupId>
<artifactId>vertx-web-client</artifactId>
<version>${vertx-version}</version>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web-client</artifactId>
<artifactId>vertx-web</artifactId>
<version>${vertx-version}</version>
</dependency>

Expand Down Expand Up @@ -95,12 +96,17 @@
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-http</artifactId>
<artifactId>camel-core-languages</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core-languages</artifactId>
<artifactId>camel-platform-http-vertx</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-http</artifactId>
<scope>test</scope>
</dependency>

Expand Down
Expand Up @@ -18,17 +18,21 @@

import java.io.PrintWriter;
import java.io.StringWriter;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.util.Locale;
import java.util.Map;
import java.util.function.Predicate;

import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.ext.web.Route;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
Expand All @@ -37,7 +41,6 @@
import org.apache.camel.TypeConverter;
import org.apache.camel.component.knative.spi.KnativeEnvironment;
import org.apache.camel.component.knative.spi.KnativeTransportConfiguration;
import org.apache.camel.component.platform.http.vertx.VertxPlatformHttpRouter;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.ExchangeHelper;
Expand All @@ -52,17 +55,19 @@ public class KnativeHttpConsumer extends DefaultConsumer {
private final KnativeTransportConfiguration configuration;
private final Predicate<HttpServerRequest> filter;
private final KnativeEnvironment.KnativeResource serviceDefinition;
private final VertxPlatformHttpRouter router;
private final Router router;
private final HeaderFilterStrategy headerFilterStrategy;

private String basePath;
private Route route;
private BigInteger maxBodySize;
private boolean preallocateBodyBuffer;

public KnativeHttpConsumer(
KnativeTransportConfiguration configuration,
Endpoint endpoint,
KnativeEnvironment.KnativeResource serviceDefinition,
VertxPlatformHttpRouter router,
Router router,
Processor processor) {

super(endpoint, processor);
Expand All @@ -72,6 +77,7 @@ public KnativeHttpConsumer(
this.router = router;
this.headerFilterStrategy = new KnativeHttpHeaderFilterStrategy();
this.filter = KnativeHttpSupport.createFilter(serviceDefinition);
this.preallocateBodyBuffer = true;
}

public String getBasePath() {
Expand All @@ -82,10 +88,29 @@ public void setBasePath(String basePath) {
this.basePath = basePath;
}

public BigInteger getMaxBodySize() {
return maxBodySize;
}

public void setMaxBodySize(BigInteger maxBodySize) {
this.maxBodySize = maxBodySize;
}

public boolean isPreallocateBodyBuffer() {
return preallocateBodyBuffer;
}

public void setPreallocateBodyBuffer(boolean preallocateBodyBuffer) {
this.preallocateBodyBuffer = preallocateBodyBuffer;
}

@Override
protected void doStart() throws Exception {
if (route == null) {
String path = ObjectHelper.supplyIfEmpty(serviceDefinition.getPath(), () -> KnativeHttpTransport.DEFAULT_PATH);
String path = serviceDefinition.getPath();
if (ObjectHelper.isEmpty(path)) {
path = "/";
}
if (ObjectHelper.isNotEmpty(basePath)) {
path = basePath + path;
}
Expand All @@ -97,8 +122,20 @@ protected void doStart() throws Exception {
path
);

BodyHandler bodyHandler = BodyHandler.create();
bodyHandler.setPreallocateBodyBuffer(this.preallocateBodyBuffer);
if (this.maxBodySize != null) {
bodyHandler.setBodyLimit(this.maxBodySize.longValueExact());
}

// add body handler
route.handler(router.bodyHandler());
route.handler(new Handler<RoutingContext>() {
@Override
public void handle(RoutingContext event) {
event.request().resume();
bodyHandler.handle(event);
}
});

// add knative handler
route.handler(routingContext -> {
Expand Down Expand Up @@ -161,7 +198,7 @@ private void handleRequest(RoutingContext routingContext) {
// from("knative:event/my.event")
// .to("http://{{env:PROJECT}}.{{env:NAMESPACE}}.svc.cluster.local/service");
//
router.vertx().executeBlocking(
routingContext.vertx().executeBlocking(
promise -> {
try {
createUoW(exchange);
Expand Down
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.knative.http;

import java.util.Objects;

import io.vertx.core.Handler;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Processor;
import org.apache.camel.component.knative.spi.KnativeConsumerFactory;
import org.apache.camel.component.knative.spi.KnativeEnvironment;
import org.apache.camel.component.knative.spi.KnativeTransportConfiguration;
import org.apache.camel.support.service.ServiceSupport;

public class KnativeHttpConsumerFactory extends ServiceSupport implements CamelContextAware, KnativeConsumerFactory {
private Router router;
private Handler<RoutingContext> bodyHandler;
private CamelContext camelContext;

public Router getRouter() {
return router;
}

public KnativeHttpConsumerFactory setRouter(Router router) {
this.router = router;
return this;
}

@Override
public void setCamelContext(CamelContext camelContext) {
this.camelContext = camelContext;
}

@Override
public CamelContext getCamelContext() {
return camelContext;
}

@Override
public Consumer createConsumer(Endpoint endpoint, KnativeTransportConfiguration config, KnativeEnvironment.KnativeResource service, Processor processor) {
Objects.requireNonNull(this.router, "router");

return new KnativeHttpConsumer(
config,
endpoint,
service,
this.router,
processor);
}

}

0 comments on commit 61538b3

Please sign in to comment.