Skip to content

Commit

Permalink
[release-1.0] Properly handle events without the data field (#1500)
Browse files Browse the repository at this point in the history
* Properly handle events without data field

A valid CloudEvent in the CE binary protocol binding of Kafka
might be composed by only Headers.

KafkaConsumer doesn't call the deserializer if the value
is null.

That means that we get a record with a null value even though
the record is a valid CloudEvent.

This patch handles events without the data field properly
by creating the CloudEvent object from record headers, if
the above conditions apply.

Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com>

* Make code simpler, handle exceptions, change method name

Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com>

Co-authored-by: Pierangelo Di Pilato <pdipilat@redhat.com>
  • Loading branch information
knative-prow-robot and pierDipi committed Nov 17, 2021
1 parent 2c5e702 commit f4a3bf0
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,22 @@
import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcher;
import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcherListener;
import dev.knative.eventing.kafka.broker.dispatcher.ResponseHandler;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.CloudEventDeserializer;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.KafkaConsumerRecordUtils;
import io.cloudevents.CloudEvent;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.kafka.client.common.tracing.ConsumerTracer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.consumer.impl.KafkaConsumerRecordImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;
import java.util.function.Function;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue;

/**
Expand All @@ -47,6 +49,8 @@ public class RecordDispatcherImpl implements RecordDispatcher {

private static final Logger logger = LoggerFactory.getLogger(RecordDispatcherImpl.class);

private static final CloudEventDeserializer cloudEventDeserializer = new CloudEventDeserializer();

private final Filter filter;
private final Function<KafkaConsumerRecord<Object, CloudEvent>, Future<Void>> subscriberSender;
private final Function<KafkaConsumerRecord<Object, CloudEvent>, Future<Void>> dlsSender;
Expand Down Expand Up @@ -92,8 +96,6 @@ public RecordDispatcherImpl(
*/
@Override
public Future<Void> dispatch(KafkaConsumerRecord<Object, CloudEvent> record) {
Promise<Void> promise = Promise.promise();

/*
That's pretty much what happens here:
Expand All @@ -116,9 +118,23 @@ public Future<Void> dispatch(KafkaConsumerRecord<Object, CloudEvent> record) {
+->end<--+
*/

onRecordReceived(record, promise);

return promise.future();
try {
Promise<Void> promise = Promise.promise();
onRecordReceived(maybeDeserializeValueFromHeaders(record), promise);
return promise.future();
} catch (final Exception ex) {
// This is a fatal exception that shouldn't happen in normal cases.
//
// It might happen if folks send bad records to a topic that is
// managed by our system.
//
// So discard record if we can't deal with the record, so that we can
// make progress in the partition.
logError("Exception occurred, discarding the record", record, ex);
recordDispatcherListener.recordReceived(record);
recordDispatcherListener.recordDiscarded(record);
return Future.failedFuture(ex);
}
}

private void onRecordReceived(final KafkaConsumerRecord<Object, CloudEvent> record, Promise<Void> finalProm) {
Expand Down Expand Up @@ -192,6 +208,23 @@ private void onDeadLetterSinkFailure(final KafkaConsumerRecord<Object, CloudEven
finalProm.complete();
}

private static KafkaConsumerRecord<Object, CloudEvent> maybeDeserializeValueFromHeaders(KafkaConsumerRecord<Object, CloudEvent> record) {
if (record.value() != null) {
return record;
}
// A valid CloudEvent in the CE binary protocol binding of Kafka
// might be composed by only Headers.
//
// KafkaConsumer doesn't call the deserializer if the value
// is null.
//
// That means that we get a record with a null value and some CE
// headers even though the record is a valid CloudEvent.
logDebug("Value is null", record);
final var value = cloudEventDeserializer.deserialize(record.record().topic(), record.record().headers(), null);
return new KafkaConsumerRecordImpl<>(KafkaConsumerRecordUtils.copyRecordAssigningValue(record.record(), value));
}

private static Function<KafkaConsumerRecord<Object, CloudEvent>, Future<Void>> composeSenderAndSinkHandler(
CloudEventSender sender, ResponseHandler sinkHandler, String senderType) {
return rec -> sender.send(rec.value())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.stream.Stream;

import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue;
import static dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.KafkaConsumerRecordUtils.copyRecordAssigningValue;
import static io.cloudevents.kafka.PartitionKeyExtensionInterceptor.PARTITION_KEY_EXTENSION;

/**
Expand Down Expand Up @@ -174,20 +175,7 @@ private ConsumerRecord<Object, CloudEvent> validRecord(final ConsumerRecord<Obje
}

// Copy consumer record and set value to a valid CloudEvent.
return new ConsumerRecord<>(
record.topic(),
record.partition(),
record.offset(),
record.timestamp(),
record.timestampType(),
record.checksum(),
record.serializedKeySize(),
record.serializedValueSize(),
record.key(),
value.build(),
record.headers(),
record.leaderEpoch()
);
return copyRecordAssigningValue(record, value.build());
}

private static void setKey(CloudEventBuilder value, final Object key) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright © 2018 Knative Authors (knative-dev@googlegroups.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.dispatcher.impl.consumer;

import io.cloudevents.CloudEvent;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public final class KafkaConsumerRecordUtils {

private KafkaConsumerRecordUtils() {
}

public static <T> ConsumerRecord<T, CloudEvent> copyRecordAssigningValue(final ConsumerRecord<T, CloudEvent> record,
final CloudEvent value) {
return new ConsumerRecord<>(
record.topic(),
record.partition(),
record.offset(),
record.timestamp(),
record.timestampType(),
record.checksum(),
record.serializedKeySize(),
record.serializedValueSize(),
record.key(),
value,
record.headers(),
record.leaderEpoch()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -117,35 +118,40 @@ public static void setUp(final Vertx vertx, final VertxTestContext context) thro

/*
1: event sent by the source to the Broker
2: event sent by the service in the response
2: event sent by the trigger 1 in the response
3: event sent by the trigger 2 in the response
2
+----------------------+
| |
| +-----+-----+
| 1 | |
| +---------->+ Trigger 1 |
v | | |
v | 3 | |
+------------+ +-------------+ +-------+----+----+ +-----------+
| | 1 | | 2 | |
| HTTPClient +--------->+ Receiver | +--------+ Dispatcher |
| | | | | | |
+------------+ +------+------+ | +--------+---+----+ +-----------+
| | ^ | | |
| | ^ | 3 | |
| v | +---------->+ Trigger 2 |
1 | +--------+--------+ | 2 | |
| | | 1 | +-----------+
+----->+ Kafka +--------+
| | 2 +-----------+
+-----------------+ | |
+-----------------+ 3 | |
| Trigger 3 |
| |
+-----------+
*/
@Test
@Timeout(timeUnit = TimeUnit.MINUTES, value = 1)
public void execute(final Vertx vertx, final VertxTestContext context) {
public void execute(final Vertx vertx, final VertxTestContext context) throws InterruptedException {

final var checkpoints = context.checkpoint(3);
final var checkpoints = context.checkpoint(4);

// event sent by the source to the Broker (see 1 in diagram)
final var expectedRequestEvent = CloudEventBuilder.v1()
Expand All @@ -158,7 +164,7 @@ public void execute(final Vertx vertx, final VertxTestContext context) {
.build();

// event sent in the response by the Callable service (see 2 in diagram)
final var expectedResponseEvent = CloudEventBuilder.v03()
final var expectedResponseEventService2 = CloudEventBuilder.v03()
.withId(UUID.randomUUID().toString())
.withDataSchema(URI.create("/api/data-schema-ce-2"))
.withSubject("subject-ce-2")
Expand All @@ -167,6 +173,20 @@ public void execute(final Vertx vertx, final VertxTestContext context) {
.withType(TYPE_CE_2)
.build();

// event sent in the response by the Callable service 2 (see 3 in diagram)
final var expectedResponseEventService1 = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withDataSchema(URI.create("/api/data-schema-ce-3"))
.withSource(URI.create("/api/rossi"))
.withSubject("subject-ce-3")
.withType(TYPE_CE_1)
.build();

final var service1ExpectedEventsIterator = List.of(
expectedRequestEvent,
expectedResponseEventService1
).iterator();

final var resource = DataPlaneContract.Resource.newBuilder()
.addTopics(TOPIC)
.setIngress(DataPlaneContract.Ingress.newBuilder().setPath(format("/%s/%s", BROKER_NAMESPACE, BROKER_NAME)))
Expand Down Expand Up @@ -207,9 +227,13 @@ public void execute(final Vertx vertx, final VertxTestContext context) {
new ContractPublisher(vertx.eventBus(), ResourcesReconcilerMessageHandler.ADDRESS)
.accept(DataPlaneContract.Contract.newBuilder().addResources(resource).build());

await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> assertThat(vertx.deploymentIDs())
await()
.atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> assertThat(vertx.deploymentIDs())
.hasSize(resource.getEgressesCount() + NUM_RESOURCES + NUM_SYSTEM_VERTICLES));

Thread.sleep(2000); // Give consumers time to start

// start service
vertx.createHttpServer()
.exceptionHandler(context::failNow)
Expand All @@ -221,22 +245,29 @@ public void execute(final Vertx vertx, final VertxTestContext context) {

// service 1 receives event sent by the HTTPClient
if (request.path().equals(PATH_SERVICE_1)) {
final var expectedEvent = service1ExpectedEventsIterator.next();
context.verify(() -> {
assertThat(event).isEqualTo(expectedRequestEvent);
assertThat(event).isEqualTo(expectedEvent);
checkpoints.flag(); // 2
});

// write event to the response, the event will be handled by service 2
VertxMessageFactory.createWriter(request.response())
.writeBinary(expectedResponseEvent);
if (service1ExpectedEventsIterator.hasNext()) {
// write event to the response, the event will be handled by service 2
VertxMessageFactory.createWriter(request.response())
.writeBinary(expectedResponseEventService2);
}
}

// service 2 receives event in the response
if (request.path().equals(PATH_SERVICE_2)) {
context.verify(() -> {
assertThat(event).isEqualTo(expectedResponseEvent);
assertThat(event).isEqualTo(expectedResponseEventService2);
checkpoints.flag(); // 3
});

// write event to the response, the event will be handled by service 2
VertxMessageFactory.createWriter(request.response())
.writeBinary(expectedResponseEventService1);
}

if (request.path().equals(PATH_SERVICE_3)) {
Expand Down

0 comments on commit f4a3bf0

Please sign in to comment.