Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create Kafka API module #1272

Merged
merged 8 commits into from
Jun 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ include::example$messages/MessageExamples.java[tag=message]

As you can see in the previous snippet, `Messages` can also have _metadata_.
_Metadata_ is a way to extend messages with additional data.
It can be metadata related to the message broker ({javadoc-base}/apidocs/io/smallrye/reactive/messaging/kafka/KafkaMessageMetadata.html[`KafkaMessageMetadata`] for instance), or contain operational data (such as tracing metadata), or business-related data.
It can be metadata related to the message broker ({javadoc-base}/apidocs/io/smallrye/reactive/messaging/kafka/api/KafkaMessageMetadata.html[`KafkaMessageMetadata`] for instance), or contain operational data (such as tracing metadata), or business-related data.

NOTE: When retrieving metadata, you get an `Optional` as it may not be present.

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package inbound;

import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata;
import io.vertx.mutiny.kafka.client.consumer.KafkaConsumerRecord;
import org.eclipse.microprofile.reactive.messaging.Message;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package outbound;

import io.smallrye.reactive.messaging.kafka.OutgoingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.eclipse.microprofile.reactive.messaging.Message;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package outbound;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.reactive.messaging.jms.JmsProperties;
import io.smallrye.reactive.messaging.jms.JmsPropertiesBuilder;
import io.smallrye.reactive.messaging.jms.OutgoingJmsMessageMetadata;
import io.smallrye.reactive.messaging.kafka.OutgoingKafkaRecordMetadata;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.eclipse.microprofile.reactive.messaging.Message;

public class JmsOutboundMetadataExample {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package inbound;

import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.OutgoingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.Record;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletionStage;

import javax.enterprise.context.ApplicationScoped;

import org.apache.kafka.common.header.internals.RecordHeaders;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Metadata;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import javax.enterprise.context.ApplicationScoped;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletionStage;
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;

@ApplicationScoped
public class KafkaDeadLetterExample {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package inbound;

import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecordMetadata;
import io.vertx.mutiny.kafka.client.consumer.KafkaConsumerRecord;
import java.time.Instant;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.eclipse.microprofile.reactive.messaging.Message;

import java.time.Instant;
import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata;

public class KafkaMetadataExample {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package outbound;

import io.smallrye.reactive.messaging.kafka.OutgoingKafkaRecordMetadata;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;

public class KafkaOutboundDynamicTopicExample {

public Message<Double> metadata(Message<Double> incoming) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package outbound;

import io.smallrye.reactive.messaging.kafka.OutgoingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.eclipse.microprofile.reactive.messaging.Message;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ include::example$inbound/Converters.java[tags=code]

=== Inbound Metadata

Messages coming from Kafka contains an instance of {javadoc-base}/io/smallrye/reactive/messaging/kafka/IncomingKafkaRecordMetadata.html[IncomingKafkaRecordMetadata<K, T>] in the metadata.
Messages coming from Kafka contains an instance of {javadoc-base}/io/smallrye/reactive/messaging/kafka/api/IncomingKafkaRecordMetadata.html[IncomingKafkaRecordMetadata<K, T>] in the metadata.
`K` is the type of the record's key.
`T` is the type of the record's value.
It provides the key, topic, partitions, headers and so on:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ If you need more control on the written records, use `OutgoingKafkaRecordMetadat

=== Outbound Metadata

When sending `Messages`, you can add an instance of {javadoc-base}/apidocs/io/smallrye/reactive/messaging/kafka/OutgoingKafkaRecordMetadata.html[`OutgoingKafkaRecordMetadata`] to influence how the message is going to written to Kafka.
When sending `Messages`, you can add an instance of {javadoc-base}/apidocs/io/smallrye/reactive/messaging/kafka/api/OutgoingKafkaRecordMetadata.html[`OutgoingKafkaRecordMetadata`] to influence how the message is going to written to Kafka.
For example, you can add Kafka headers, configure the record key...

[source, java]
Expand Down
3 changes: 3 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@

<jboss-log-manager.version>2.1.18.Final</jboss-log-manager.version>

<kafka.version>2.8.0</kafka.version>

<opentelemetry.version>1.3.0</opentelemetry.version>
<opentelemetry-semver.version>1.3.0-alpha</opentelemetry-semver.version>

Expand All @@ -100,6 +102,7 @@
<module>smallrye-reactive-messaging-provider</module>
<module>smallrye-reactive-messaging-in-memory</module>
<module>smallrye-reactive-messaging-kafka</module>
<module>smallrye-reactive-messaging-kafka-api</module>
<module>smallrye-reactive-messaging-mqtt</module>
<module>smallrye-reactive-messaging-camel</module>
<module>smallrye-reactive-messaging-amqp</module>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ public void testFromAmqpToAppWithNoParent() {
container = weld.initialize();
MyAppReceivingData bean = container.getBeanManager().createInstance().select(MyAppReceivingData.class).get();

await().until(() -> isAmqpConnectorReady(container));

AtomicInteger count = new AtomicInteger();

usage.produce("no-parent-stuff", 10, count::getAndIncrement);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ public void testFromAmqpToAppToAmqp() {

container = weld.initialize();

await().until(() -> isAmqpConnectorReady(container));

AtomicInteger count = new AtomicInteger();
List<SpanContext> producedSpanContexts = new CopyOnWriteArrayList<>();
usage.produce("parent-topic", 10, () -> AmqpMessage.create()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ public void testFromAmqpToAppWithParentSpan() {
container = weld.initialize();
MyAppReceivingData bean = container.getBeanManager().createInstance().select(MyAppReceivingData.class).get();

await().until(() -> isAmqpConnectorReady(container));

AtomicInteger count = new AtomicInteger();
List<SpanContext> producedSpanContexts = new CopyOnWriteArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ public void testFromAppToAmqp() {

container = weld.initialize();

await().until(() -> isAmqpConnectorReady(container));

await().until(() -> payloads.size() >= 10);
assertThat(payloads).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);

Expand Down
29 changes: 29 additions & 0 deletions smallrye-reactive-messaging-kafka-api/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging</artifactId>
<version>3.6.0-SNAPSHOT</version>
</parent>

<artifactId>smallrye-reactive-messaging-kafka-api</artifactId>

<name>SmallRye Reactive Messaging : Connector :: Kafka User API</name>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>smallrye-reactive-messaging-api</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package io.smallrye.reactive.messaging.kafka.api;

import java.time.Instant;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;

/**
* Contains information about messages received from a channel backed by Kafka.
* Generally this will be created by the framework, and users should not construct instances of this class.
*
* @param <K> The Kafka record key type
* @param <T> The payload type
*/
public class IncomingKafkaRecordMetadata<K, T> implements KafkaMessageMetadata<K> {

private final ConsumerRecord<K, T> record;
private volatile Headers headers;

/**
* Constructor
*
* @param record the underlying record received from Kafka
*/
public IncomingKafkaRecordMetadata(ConsumerRecord<K, T> record) {
this.record = record;
}

/**
* {@inheritDoc}
*/
@Override
public String getTopic() {
return record.topic();
}

/**
* {@inheritDoc}
*/
@Override
public K getKey() {
return record.key();
}

/**
* {@inheritDoc}
*/
@Override
public int getPartition() {
return record.partition();
}

/**
* {@inheritDoc}
*/
@Override
public Instant getTimestamp() {
return Instant.ofEpochMilli(record.timestamp());
}

/**
* Get the timestamp type
*
* @return the timestamp type
*/
public TimestampType getTimestampType() {
return record.timestampType();
}

/**
* Get the offset
*
* @return the offset
*/
public long getOffset() {
return record.offset();
}

/**
* {@inheritDoc}
*/
@Override
public Headers getHeaders() {
if (headers == null) {
synchronized (this) {
if (headers == null) {
Headers headers = new RecordHeaders();
if (record.headers() != null) {
for (Header header : record.headers()) {
headers.add(new RecordHeader(header.key(), header.value()));
}
}
this.headers = headers;
}
}
}
return headers;
}

/**
* Get the underlying Kafka ConsumerRecord
*
* @return the underlying Kafka ConsumerRecord
*/
public ConsumerRecord<K, T> getRecord() {
return record;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package io.smallrye.reactive.messaging.kafka.api;

import java.time.Instant;

import org.apache.kafka.common.header.Headers;

/**
* Common interface for
*
* @param <K> the Kafka record key type
*/
public interface KafkaMessageMetadata<K> {

/**
* Get the topic
*
* @return the name of the topic
*/
String getTopic();

/**
* Get the key
*
* @return the key
*/
K getKey();

/**
* Get the timestamp
*
* @return the timestamp
*/
Instant getTimestamp();

/**
* Get the Kafka headers
*
* @return the Kafka headers
*/
Headers getHeaders();

/**
* Get the partition
*
* @return the partition
*/
int getPartition();

}