diff --git a/data-plane/README.md b/data-plane/README.md index 7d1e6df6e1..55182afc6f 100644 --- a/data-plane/README.md +++ b/data-plane/README.md @@ -32,10 +32,8 @@ update another one will be created. This allows to not block or use locks. ├── core ├── dispatcher ├── dispatcher-loom -├── dispatcher-vertx ├── receiver ├── receiver-loom -├── receiver-vertx ├── contract └── tests ``` @@ -44,9 +42,7 @@ update another one will be created. This allows to not block or use locks. - `core` directory contains the core module, in particular, it contains classes for representing Eventing objects - `dispatcher` directory contains the base [_Dispatcher_](#dispatcher) application. - `dispatcher-loom` directory contains the [_Dispatcher_](#dispatcher) application's entrypoit that use Loom virtual thread implementation for kafka communication. -- `dispatcher-vertx` directory contains application's entrypoit that use Vert.x kafka client implementation for kafka communication. - `receiver` directory contains the [_Receiver_](#receiver) application. - `receiver-loom` directory contains the [_Receiver_](#receiver) application's entrypoit that use Loom virtual thread implementation for kafka communication. -- `receiver-vertx` directory contains application's entrypoit that use Vert.x kafka client implementation for kafka communication. - `contract` directory contains a module in which the protobuf compiler (`protoc`) generates code. Git ignores the - `tests` directory contains tests for the whole data-plane. diff --git a/data-plane/THIRD-PARTY.txt b/data-plane/THIRD-PARTY.txt index 10efcbbc85..a11377b1d4 100644 --- a/data-plane/THIRD-PARTY.txt +++ b/data-plane/THIRD-PARTY.txt @@ -1,5 +1,5 @@ -Lists of 236 third-party dependencies. +Lists of 234 third-party dependencies. (Eclipse Public License - v 1.0) (GNU Lesser General Public License) Logback Classic Module (ch.qos.logback:logback-classic:1.4.14 - http://logback.qos.ch/logback-classic) (Eclipse Public License - v 1.0) (GNU Lesser General Public License) Logback Core Module (ch.qos.logback:logback-core:1.4.14 - http://logback.qos.ch/logback-core) (Apache License 2.0) brotli4j (com.aayushatharva.brotli4j:brotli4j:1.16.0 - https://github.com/hyperxpro/Brotli4j/brotli4j) @@ -36,10 +36,8 @@ Lists of 236 third-party dependencies. (Unknown license) core (dev.knative.eventing.kafka.broker:core:1.0-SNAPSHOT - no url defined) (Unknown license) dispatcher (dev.knative.eventing.kafka.broker:dispatcher:1.0-SNAPSHOT - no url defined) (Unknown license) dispatcher-loom (dev.knative.eventing.kafka.broker:dispatcher-loom:1.0-SNAPSHOT - no url defined) - (Unknown license) dispatcher-vertx (dev.knative.eventing.kafka.broker:dispatcher-vertx:1.0-SNAPSHOT - no url defined) (Unknown license) receiver (dev.knative.eventing.kafka.broker:receiver:1.0-SNAPSHOT - no url defined) (Unknown license) receiver-loom (dev.knative.eventing.kafka.broker:receiver-loom:1.0-SNAPSHOT - no url defined) - (Unknown license) receiver-vertx (dev.knative.eventing.kafka.broker:receiver-vertx:1.0-SNAPSHOT - no url defined) (The Apache Software License, Version 2.0) CloudEvents - API (io.cloudevents:cloudevents-api:2.5.0 - https://cloudevents.github.io/sdk-java/cloudevents-api/) (The Apache Software License, Version 2.0) CloudEvents - Core (io.cloudevents:cloudevents-core:2.5.0 - https://cloudevents.github.io/sdk-java/cloudevents-core/) (The Apache Software License, Version 2.0) CloudEvents - Vert.x Http Binding (io.cloudevents:cloudevents-http-vertx:2.5.0 - https://cloudevents.github.io/sdk-java/cloudevents-http-vertx/) @@ -164,7 +162,7 @@ Lists of 236 third-party dependencies. (Eclipse Public License - v 1.0) (The Apache Software License, Version 2.0) vertx-codegen (io.vertx:vertx-codegen:4.5.7 - http://nexus.sonatype.org/oss-repository-hosting.html/vertx-parent/vertx-codegen) (Eclipse Public License - v 2.0) (The Apache Software License, Version 2.0) Vert.x Core (io.vertx:vertx-core:4.5.7 - http://nexus.sonatype.org/oss-repository-hosting.html/vertx-parent/vertx-core) (Eclipse Public License - v 1.0) (The Apache Software License, Version 2.0) Vert.x JUnit 5 support :: Core (io.vertx:vertx-junit5:4.4.7 - http://nexus.sonatype.org/oss-repository-hosting.html/vertx-parent/vertx-ext/vertx-ext-parent/vertx-junit5) - (Eclipse Public License - v 1.0) (The Apache Software License, Version 2.0) Vert.x Kafka Client (io.vertx:vertx-kafka-client:4.4.7 - http://nexus.sonatype.org/oss-repository-hosting.html/vertx-parent/vertx-ext/vertx-ext-parent/vertx-kafka-client) + (Eclipse Public License - v 1.0) (The Apache Software License, Version 2.0) Vert.x Kafka Client (io.vertx:vertx-kafka-client:4.5.7 - http://nexus.sonatype.org/oss-repository-hosting.html/vertx-parent/vertx-ext/vertx-ext-parent/vertx-kafka-client) (Eclipse Public License - v 1.0) (The Apache Software License, Version 2.0) Vert.x metrics implementation for Micrometer.io (io.vertx:vertx-micrometer-metrics:4.5.7 - http://nexus.sonatype.org/oss-repository-hosting.html/vertx-parent/vertx-ext/vertx-ext-parent/vertx-micrometer-metrics) (Eclipse Public License - v 1.0) (The Apache Software License, Version 2.0) Vert.x OpenTelemetry (io.vertx:vertx-opentelemetry:4.4.7 - http://nexus.sonatype.org/oss-repository-hosting.html/vertx-parent/vertx-ext/vertx-ext-parent/vertx-tracing-parent/vertx-opentelemetry) (Eclipse Public License - v 1.0) (The Apache Software License, Version 2.0) Vert.x URI Template (io.vertx:vertx-uri-template:4.5.7 - http://nexus.sonatype.org/oss-repository-hosting.html/vertx-parent/vertx-ext/vertx-ext-parent/vertx-uri-template) diff --git a/data-plane/benchmarks/pom.xml b/data-plane/benchmarks/pom.xml index 2b3c577f9b..2cbc78f479 100644 --- a/data-plane/benchmarks/pom.xml +++ b/data-plane/benchmarks/pom.xml @@ -49,10 +49,9 @@ ${jmh.version} provided - dev.knative.eventing.kafka.broker - dispatcher-vertx + dispatcher ${project.version} diff --git a/data-plane/dispatcher-vertx/pom.xml b/data-plane/dispatcher-vertx/pom.xml deleted file mode 100644 index 1b81090b82..0000000000 --- a/data-plane/dispatcher-vertx/pom.xml +++ /dev/null @@ -1,109 +0,0 @@ - - - - 4.0.0 - - data-plane - dev.knative.eventing.kafka.broker - 1.0-SNAPSHOT - - - dispatcher-vertx - - dispatcher-vertx - - - - dev.knative.eventing.kafka.broker - dispatcher - ${project.version} - - - dev.knative.eventing.kafka.broker - receiver-vertx - ${project.version} - - - - - - - com.google.cloud.tools - jib-maven-plugin - ${jib.version} - - - ${env.KO_DOCKER_REPO}/knative-kafka-broker-dispatcher:${env.TAG} - - - - - org.apache.maven.plugins - maven-shade-plugin - ${maven.shade.plugin.version} - - true - - - - package - - shade - - - - - - - dev.knative.eventing.kafka.broker.dispatchervertx.Main - - - - - *:* - false - - - net.logstash.logback:logstash-logback-encoder - - ** - - - - org.apache.kafka:kafka-clients - - ** - - - - io.fabric8:kubernetes-client - - ** - - - - - - - - - - - diff --git a/data-plane/dispatcher-vertx/src/main/java/dev/knative/eventing/kafka/broker/dispatchervertx/Main.java b/data-plane/dispatcher-vertx/src/main/java/dev/knative/eventing/kafka/broker/dispatchervertx/Main.java deleted file mode 100644 index 9558c36fd1..0000000000 --- a/data-plane/dispatcher-vertx/src/main/java/dev/knative/eventing/kafka/broker/dispatchervertx/Main.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dev.knative.eventing.kafka.broker.dispatchervertx; - -import dev.knative.eventing.kafka.broker.receiververtx.VertxProducerFactory; -import java.io.IOException; - -public class Main { - public static void main(String[] args) throws IOException { - dev.knative.eventing.kafka.broker.dispatcher.main.Main.start( - args, new VertxConsumerFactory<>(), new VertxProducerFactory<>()); - } -} diff --git a/data-plane/dispatcher-vertx/src/main/java/dev/knative/eventing/kafka/broker/dispatchervertx/VertxConsumerFactory.java b/data-plane/dispatcher-vertx/src/main/java/dev/knative/eventing/kafka/broker/dispatchervertx/VertxConsumerFactory.java deleted file mode 100644 index 15cf4e198c..0000000000 --- a/data-plane/dispatcher-vertx/src/main/java/dev/knative/eventing/kafka/broker/dispatchervertx/VertxConsumerFactory.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dev.knative.eventing.kafka.broker.dispatchervertx; - -import dev.knative.eventing.kafka.broker.core.ReactiveConsumerFactory; -import dev.knative.eventing.kafka.broker.core.ReactiveKafkaConsumer; -import io.vertx.core.Vertx; -import io.vertx.core.tracing.TracingPolicy; -import io.vertx.kafka.client.common.KafkaClientOptions; -import java.util.Map; - -public class VertxConsumerFactory implements ReactiveConsumerFactory { - - @Override - public ReactiveKafkaConsumer create(Vertx vertx, Map configs) { - return new VertxKafkaConsumer<>( - vertx, new KafkaClientOptions().setConfig(configs).setTracingPolicy(TracingPolicy.IGNORE)); - } -} diff --git a/data-plane/dispatcher-vertx/src/main/java/dev/knative/eventing/kafka/broker/dispatchervertx/VertxKafkaConsumer.java b/data-plane/dispatcher-vertx/src/main/java/dev/knative/eventing/kafka/broker/dispatchervertx/VertxKafkaConsumer.java deleted file mode 100644 index 1e7b06dd3b..0000000000 --- a/data-plane/dispatcher-vertx/src/main/java/dev/knative/eventing/kafka/broker/dispatchervertx/VertxKafkaConsumer.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dev.knative.eventing.kafka.broker.dispatchervertx; - -import dev.knative.eventing.kafka.broker.core.ReactiveKafkaConsumer; -import io.vertx.core.Future; -import io.vertx.core.Handler; -import io.vertx.core.Vertx; -import io.vertx.kafka.client.common.KafkaClientOptions; -import io.vertx.kafka.client.consumer.KafkaConsumer; -import java.time.Duration; -import java.util.Collection; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.TopicPartition; - -public class VertxKafkaConsumer implements ReactiveKafkaConsumer { - - private KafkaConsumer consumer; - - public VertxKafkaConsumer(Vertx v, KafkaClientOptions configs) { - consumer = KafkaConsumer.create(v, configs); - } - - @Override - public Future> commit(Map offset) { - - final var vertxOffset = offset.entrySet().stream() - .collect(Collectors.toMap( - entry -> new io.vertx.kafka.client.common.TopicPartition( - entry.getKey().topic(), entry.getKey().partition()), - entry -> new io.vertx.kafka.client.consumer.OffsetAndMetadata( - entry.getValue().offset(), entry.getValue().metadata()))); - - return consumer.commit(vertxOffset) - .map(vertxOffset.entrySet().stream() - .collect(Collectors.toMap( - entry -> new TopicPartition( - entry.getKey().getTopic(), - entry.getKey().getPartition()), - entry -> new OffsetAndMetadata( - entry.getValue().getOffset(), - entry.getValue().getMetadata())))); - } - - @Override - public Future close() { - return consumer.close(); - } - - @Override - public Future pause(Collection partitions) { - var vertxTopicPartitions = partitions.stream() - .map(topicPartition -> new io.vertx.kafka.client.common.TopicPartition( - topicPartition.topic(), topicPartition.partition())) - .collect(Collectors.toSet()); - - return consumer.pause(vertxTopicPartitions); - } - - @Override - public Future> poll(Duration timeout) { - return consumer.poll(timeout).map(kafkaConsumerRecords -> kafkaConsumerRecords.records()); - } - - @Override - public Future resume(Collection partitions) { - var vertxTopicPartitions = partitions.stream() - .map(topicPartition -> new io.vertx.kafka.client.common.TopicPartition( - topicPartition.topic(), topicPartition.partition())) - .collect(Collectors.toSet()); - - return consumer.resume(vertxTopicPartitions); - } - - @Override - public Future subscribe(Collection topics) { - return consumer.subscribe(new HashSet<>(topics)); - } - - @Override - public Future subscribe(Collection topics, ConsumerRebalanceListener listener) { - Handler> handler = partitions -> { - Set apachePartitions = new HashSet<>(); - for (io.vertx.kafka.client.common.TopicPartition vertxPartition : partitions) { - apachePartitions.add(new TopicPartition(vertxPartition.getTopic(), vertxPartition.getPartition())); - } - - listener.onPartitionsRevoked(apachePartitions); - }; - consumer = consumer.partitionsRevokedHandler(handler); - - return consumer.subscribe(new HashSet<>(topics)); - } - - @Override - public Consumer unwrap() { - return consumer.unwrap(); - } - - @Override - public ReactiveKafkaConsumer exceptionHandler(Handler handler) { - consumer = consumer.exceptionHandler(handler); - return this; - } -} diff --git a/data-plane/pom.xml b/data-plane/pom.xml index 5d770ef1f8..acb2ebb56c 100644 --- a/data-plane/pom.xml +++ b/data-plane/pom.xml @@ -74,8 +74,6 @@ core receiver dispatcher - receiver-vertx - dispatcher-vertx receiver-loom dispatcher-loom @@ -140,21 +138,6 @@ ${antlr.version} - - io.vertx - vertx-kafka-client - ${vertx.kafka.clients.version} - - - org.apache.kafka - kafka-clients - - - io.vertx - vertx-core - - - io.vertx vertx-opentelemetry diff --git a/data-plane/profiler/run.sh b/data-plane/profiler/run.sh index 46a61b3607..e8af059fc8 100755 --- a/data-plane/profiler/run.sh +++ b/data-plane/profiler/run.sh @@ -110,7 +110,7 @@ java \ -XX:+UnlockDiagnosticVMOptions \ -XX:+DebugNonSafepoints \ -Dlogback.configurationFile="${RESOURCES_DIR}"/config-logging.xml \ - -jar "${PROJECT_ROOT_DIR}"/receiver-vertx/target/receiver-vertx-1.0-SNAPSHOT.jar >"${LOG_DIR}/receiver.log" & + -jar "${PROJECT_ROOT_DIR}"/receiver-loom/target/receiver-loom-1.0-SNAPSHOT.jar >"${LOG_DIR}/receiver.log" & receiver_pid=$! # Define expected env variables. @@ -126,7 +126,7 @@ java \ -XX:+UnlockDiagnosticVMOptions \ -XX:+DebugNonSafepoints \ -Dlogback.configurationFile="${RESOURCES_DIR}"/config-logging.xml \ - -jar "${PROJECT_ROOT_DIR}"/dispatcher-vertx/target/dispatcher-vertx-1.0-SNAPSHOT.jar >"${LOG_DIR}/dispatcher.log" & + -jar "${PROJECT_ROOT_DIR}"/dispatcher-loom/target/dispatcher-loom-1.0-SNAPSHOT.jar >"${LOG_DIR}/dispatcher.log" & dispatcher_pid=$! # Download Sacura diff --git a/data-plane/receiver-vertx/pom.xml b/data-plane/receiver-vertx/pom.xml deleted file mode 100644 index b75e560e89..0000000000 --- a/data-plane/receiver-vertx/pom.xml +++ /dev/null @@ -1,153 +0,0 @@ - - - - 4.0.0 - - data-plane - dev.knative.eventing.kafka.broker - 1.0-SNAPSHOT - - - receiver-vertx - - receiver-vertx - - - - - dev.knative.eventing.kafka.broker - receiver - ${project.version} - - - dev.knative.eventing.kafka.broker - core - ${project.version} - - - - dev.knative.eventing.kafka.broker - core - tests - test-jar - ${project.version} - test - - - dev.knative.eventing.kafka.broker - receiver - tests - test-jar - ${project.version} - test - - - - io.vertx - vertx-junit5 - test - - - org.junit.jupiter - junit-jupiter - test - - - org.assertj - assertj-core - test - - - org.mockito - mockito-junit-jupiter - test - - - io.opentelemetry - opentelemetry-sdk-testing - test - - - - - - - com.google.cloud.tools - jib-maven-plugin - ${jib.version} - - - ${env.KO_DOCKER_REPO}/knative-kafka-broker-receiver:${env.TAG} - - - - - org.apache.maven.plugins - maven-shade-plugin - ${maven.shade.plugin.version} - - true - - - - package - - shade - - - - - - - dev.knative.eventing.kafka.broker.receiververtx.Main - - - - - *:* - false - - - net.logstash.logback:logstash-logback-encoder - - ** - - - - org.apache.kafka:kafka-clients - - ** - - - - io.fabric8:kubernetes-client - - ** - - - - - - - - - - diff --git a/data-plane/receiver-vertx/src/main/java/dev/knative/eventing/kafka/broker/receiververtx/Main.java b/data-plane/receiver-vertx/src/main/java/dev/knative/eventing/kafka/broker/receiververtx/Main.java deleted file mode 100644 index 5365e96414..0000000000 --- a/data-plane/receiver-vertx/src/main/java/dev/knative/eventing/kafka/broker/receiververtx/Main.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dev.knative.eventing.kafka.broker.receiververtx; - -import java.io.IOException; -import java.util.concurrent.ExecutionException; - -public class Main { - public static void main(String[] args) throws IOException, ExecutionException, InterruptedException { - dev.knative.eventing.kafka.broker.receiver.main.Main.start(args, new VertxProducerFactory<>()); - } -} diff --git a/data-plane/receiver-vertx/src/main/java/dev/knative/eventing/kafka/broker/receiververtx/VertxKafkaProducer.java b/data-plane/receiver-vertx/src/main/java/dev/knative/eventing/kafka/broker/receiververtx/VertxKafkaProducer.java deleted file mode 100644 index 9d8240d426..0000000000 --- a/data-plane/receiver-vertx/src/main/java/dev/knative/eventing/kafka/broker/receiververtx/VertxKafkaProducer.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dev.knative.eventing.kafka.broker.receiververtx; - -import dev.knative.eventing.kafka.broker.core.ReactiveKafkaProducer; -import io.vertx.core.Future; -import io.vertx.core.Vertx; -import io.vertx.kafka.client.producer.KafkaProducer; -import io.vertx.kafka.client.producer.KafkaProducerRecord; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.TopicPartition; - -public class VertxKafkaProducer implements ReactiveKafkaProducer { - - private final KafkaProducer producer; - - public VertxKafkaProducer(Vertx v, Producer producer) { - this.producer = KafkaProducer.create(v, producer); - } - - @Override - public Future send(ProducerRecord record) { - return this.producer - .send(KafkaProducerRecord.create(record.topic(), record.value())) - .map(vertxRecordMetadata -> new RecordMetadata( - new TopicPartition(record.topic(), vertxRecordMetadata.getPartition()), - vertxRecordMetadata.getOffset(), - 0, - vertxRecordMetadata.getTimestamp(), - -1, - -1)); - } - - @Override - public Future close() { - return producer.close(); - } - - @Override - public Future flush() { - return producer.flush(); - } - - @Override - public Producer unwrap() { - return producer.unwrap(); - } -} diff --git a/data-plane/receiver-vertx/src/main/java/dev/knative/eventing/kafka/broker/receiververtx/VertxProducerFactory.java b/data-plane/receiver-vertx/src/main/java/dev/knative/eventing/kafka/broker/receiververtx/VertxProducerFactory.java deleted file mode 100644 index 13b66f275f..0000000000 --- a/data-plane/receiver-vertx/src/main/java/dev/knative/eventing/kafka/broker/receiververtx/VertxProducerFactory.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dev.knative.eventing.kafka.broker.receiververtx; - -import dev.knative.eventing.kafka.broker.core.ReactiveKafkaProducer; -import dev.knative.eventing.kafka.broker.core.ReactiveProducerFactory; -import io.vertx.core.Vertx; -import java.util.Properties; -import org.apache.kafka.clients.producer.KafkaProducer; - -public class VertxProducerFactory implements ReactiveProducerFactory { - - @Override - public ReactiveKafkaProducer create(Vertx v, Properties configs) { - return new VertxKafkaProducer<>(v, new KafkaProducer<>(configs)); - } -} diff --git a/data-plane/receiver-vertx/src/test/java/dev/knative/eventing/kafka/broker/receiververtx/impl/ReceiverVerticleTracingVertxImplTest.java b/data-plane/receiver-vertx/src/test/java/dev/knative/eventing/kafka/broker/receiververtx/impl/ReceiverVerticleTracingVertxImplTest.java deleted file mode 100644 index fd5f5c462b..0000000000 --- a/data-plane/receiver-vertx/src/test/java/dev/knative/eventing/kafka/broker/receiververtx/impl/ReceiverVerticleTracingVertxImplTest.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dev.knative.eventing.kafka.broker.receiververtx.impl; - -import dev.knative.eventing.kafka.broker.core.ReactiveKafkaProducer; -import dev.knative.eventing.kafka.broker.receiver.impl.ReceiverVerticleTracingTest; -import dev.knative.eventing.kafka.broker.receiververtx.VertxKafkaProducer; -import io.cloudevents.CloudEvent; -import io.vertx.core.Vertx; -import org.apache.kafka.clients.producer.Producer; - -public class ReceiverVerticleTracingVertxImplTest extends ReceiverVerticleTracingTest { - - @Override - public ReactiveKafkaProducer createKafkaProducer( - Vertx vertx, Producer producer) { - return new VertxKafkaProducer<>(vertx, producer); - } -} diff --git a/data-plane/tests/pom.xml b/data-plane/tests/pom.xml index 8cdd785da6..d692cb841d 100644 --- a/data-plane/tests/pom.xml +++ b/data-plane/tests/pom.xml @@ -46,11 +46,6 @@ dispatcher ${project.version} - - dev.knative.eventing.kafka.broker - dispatcher-vertx - ${project.version} - dev.knative.eventing.kafka.broker dispatcher-loom diff --git a/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/VertxDataPlaneTest.java b/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/VertxDataPlaneTest.java deleted file mode 100644 index e6f093a225..0000000000 --- a/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/VertxDataPlaneTest.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dev.knative.eventing.kafka.broker.tests; - -import dev.knative.eventing.kafka.broker.core.ReactiveConsumerFactory; -import dev.knative.eventing.kafka.broker.core.ReactiveProducerFactory; -import dev.knative.eventing.kafka.broker.dispatchervertx.VertxConsumerFactory; -import dev.knative.eventing.kafka.broker.receiververtx.VertxProducerFactory; - -public class VertxDataPlaneTest extends AbstractDataPlaneTest { - - @Override - protected ReactiveProducerFactory getReactiveProducerFactory() { - return new VertxProducerFactory<>(); - } - - @Override - protected ReactiveConsumerFactory getReactiveConsumerFactory() { - return new VertxConsumerFactory<>(); - } -} diff --git a/hack/data-plane.sh b/hack/data-plane.sh index 9f94eae0de..0168b9881a 100755 --- a/hack/data-plane.sh +++ b/hack/data-plane.sh @@ -31,9 +31,6 @@ readonly SINK_TLS_CONFIG_DIR=${DATA_PLANE_CONFIG_DIR}/sink-tls readonly CHANNEL_DATA_PLANE_CONFIG_DIR=${DATA_PLANE_CONFIG_DIR}/channel readonly CHANNEL_TLS_CONFIG_DIR=${DATA_PLANE_CONFIG_DIR}/channel-tls -readonly RECEIVER_VERTX_DIRECTORY=receiver-vertx -readonly DISPATCHER_VERTX_DIRECTORY=dispatcher-vertx - readonly RECEIVER_LOOM_DIRECTORY=receiver-loom readonly DISPATCHER_LOOM_DIRECTORY=dispatcher-loom @@ -64,15 +61,9 @@ function receiver_build_push() { local receiver_sha="" - if [ $USE_LOOM == "true" ]; then - local receiver="${KNATIVE_KAFKA_BROKER_RECEIVER:-${KO_DOCKER_REPO}/knative-kafka-broker-receiver-loom}" - ./mvnw clean package jib:build -pl "${RECEIVER_LOOM_DIRECTORY}" -DskipTests || return $? - receiver_sha=$(cat "${RECEIVER_LOOM_DIRECTORY}/target/jib-image.digest") - else - local receiver="${KNATIVE_KAFKA_RECEIVER:-${KO_DOCKER_REPO}/knative-kafka-broker-receiver}" - ./mvnw clean package jib:build -pl "${RECEIVER_VERTX_DIRECTORY}" -DskipTests || return $? - receiver_sha=$(cat "${RECEIVER_VERTX_DIRECTORY}/target/jib-image.digest") - fi + local receiver="${KNATIVE_KAFKA_BROKER_RECEIVER:-${KO_DOCKER_REPO}/knative-kafka-broker-receiver-loom}" + ./mvnw clean package jib:build -pl "${RECEIVER_LOOM_DIRECTORY}" -DskipTests || return $? + receiver_sha=$(cat "${RECEIVER_LOOM_DIRECTORY}/target/jib-image.digest") export KNATIVE_KAFKA_RECEIVER_IMAGE="${receiver}@${receiver_sha}" @@ -84,16 +75,10 @@ function dispatcher_build_push() { local dispatcher_sha="" - if [ "${USE_LOOM}" == "true" ]; then - local dispatcher="${KNATIVE_KAFKA_BROKER_DISPATCHER:-${KO_DOCKER_REPO}/knative-kafka-broker-dispatcher-loom}" - ./mvnw clean package jib:build -pl "${DISPATCHER_LOOM_DIRECTORY}" -DskipTests || return $? - dispatcher_sha=$(cat "${DISPATCHER_LOOM_DIRECTORY}/target/jib-image.digest") - else - local dispatcher="${KNATIVE_KAFKA_BROKER_DISPATCHER:-${KO_DOCKER_REPO}/knative-kafka-broker-dispatcher}" - ./mvnw clean package jib:build -pl "${DISPATCHER_VERTX_DIRECTORY}" -DskipTests || return $? - dispatcher_sha=$(cat "${DISPATCHER_VERTX_DIRECTORY}/target/jib-image.digest") - fi - + local dispatcher="${KNATIVE_KAFKA_BROKER_DISPATCHER:-${KO_DOCKER_REPO}/knative-kafka-broker-dispatcher-loom}" + ./mvnw clean package jib:build -pl "${DISPATCHER_LOOM_DIRECTORY}" -DskipTests || return $? + dispatcher_sha=$(cat "${DISPATCHER_LOOM_DIRECTORY}/target/jib-image.digest") + export KNATIVE_KAFKA_DISPATCHER_IMAGE="${dispatcher}@${dispatcher_sha}" echo "Dispatcher image ${KNATIVE_KAFKA_DISPATCHER_IMAGE}" diff --git a/hack/verify-codegen.sh b/hack/verify-codegen.sh index 383b40c44b..409dc8d908 100755 --- a/hack/verify-codegen.sh +++ b/hack/verify-codegen.sh @@ -41,9 +41,7 @@ DIRS=( "/data-plane/contract/src" "/data-plane/core/src" "/data-plane/dispatcher/src" - "/data-plane/dispatcher-vertx/src" "/data-plane/receiver/src" - "/data-plane/receiver-vertx/src" "/data-plane/tests/src" "/control-plane/pkg/core/config" "/control-plane/pkg/apis"