From b0f9101b37daa61782f7e67afca7fd8adb5014be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Niedzielski?= Date: Fri, 7 Jun 2019 02:40:15 +0200 Subject: [PATCH] Allow reading headers from Kafka Message headers A simpler part of #843, adds just the consumption part. This makes headers accessible in the consumer in case another application makes use of those, be it PHP or not. --- pkg/rdkafka/RdKafkaConsumer.php | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/rdkafka/RdKafkaConsumer.php b/pkg/rdkafka/RdKafkaConsumer.php index d7016a946..241ee3841 100644 --- a/pkg/rdkafka/RdKafkaConsumer.php +++ b/pkg/rdkafka/RdKafkaConsumer.php @@ -169,6 +169,12 @@ private function doReceive(int $timeout): ?RdKafkaMessage $message->setPartition($kafkaMessage->partition); $message->setKafkaMessage($kafkaMessage); + // Merge headers passed from Kafka with possible earlier serialized payload headers. Prefer Kafka's. + // Note: Requires phprdkafka >= 3.1.0 + if (isset($kafkaMessage->headers)) { + $message->setHeaders(array_merge($message->getHeaders(), $kafkaMessage->headers)); + } + return $message; default: throw new \LogicException($kafkaMessage->errstr(), $kafkaMessage->err);