Skip to content

Commit

Permalink
kafka: fetch record converter (envoyproxy#25300)
Browse files Browse the repository at this point in the history
Signed-off-by: Adam Kotwasinski <adam.kotwasinski@gmail.com>
Signed-off-by: Ryan Northey <ryan@synca.io>
  • Loading branch information
adamkotwasinski authored and phlax committed Aug 25, 2023
1 parent a6b1416 commit 684c3f2
Show file tree
Hide file tree
Showing 13 changed files with 464 additions and 28 deletions.
1 change: 1 addition & 0 deletions contrib/kafka/filters/network/source/mesh/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ envoy_cc_library(
deps = [
":librdkafka_utils_impl_lib",
":upstream_kafka_consumer_lib",
"//contrib/kafka/filters/network/source:kafka_types_lib",
"//envoy/event:dispatcher_interface",
"//source/common/common:minimal_logger_lib",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ envoy_cc_library(
tags = ["skip_on_windows"],
deps = [
"//contrib/kafka/filters/network/source:kafka_response_parser_lib",
"//contrib/kafka/filters/network/source:serialization_lib",
"//contrib/kafka/filters/network/source/mesh:inbound_record_lib",
],
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,194 @@
#include "contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.h"

#include "contrib/kafka/filters/network/source/serialization.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

const FetchRecordConverter& FetchRecordConverterImpl::getDefaultInstance() {
CONSTRUCT_ON_FIRST_USE(FetchRecordConverterImpl);
}

std::vector<FetchableTopicResponse>
FetchRecordConverterImpl::convert(const InboundRecordsMap&) const {
FetchRecordConverterImpl::convert(const InboundRecordsMap& arg) const {

// Compute record batches.
std::map<KafkaPartition, Bytes> record_batches;
for (const auto& partition_and_records : arg) {
const KafkaPartition& kp = partition_and_records.first;
const std::vector<InboundRecordSharedPtr>& partition_records = partition_and_records.second;
const Bytes batch = renderRecordBatch(partition_records);
record_batches[kp] = batch;
}

// TODO (adam.kotwasinski) This needs to be actually implemented.
return {};
// Transform our maps into the Kafka structs.
std::map<std::string, std::vector<FetchResponseResponsePartitionData>> topic_to_frrpd;
for (const auto& record_batch : record_batches) {
const std::string& topic_name = record_batch.first.first;
const int32_t partition = record_batch.first.second;

std::vector<FetchResponseResponsePartitionData>& frrpds = topic_to_frrpd[topic_name];
const int16_t error_code = 0;
const int64_t high_watermark = 0;
const auto frrpd = FetchResponseResponsePartitionData{partition, error_code, high_watermark,
absl::make_optional(record_batch.second)};

frrpds.push_back(frrpd);
}

std::vector<FetchableTopicResponse> result;
for (const auto& partition_and_records : topic_to_frrpd) {
const std::string& topic_name = partition_and_records.first;
const auto ftr = FetchableTopicResponse{topic_name, partition_and_records.second};
result.push_back(ftr);
}
return result;
}

const FetchRecordConverter& FetchRecordConverterImpl::getDefaultInstance() {
CONSTRUCT_ON_FIRST_USE(FetchRecordConverterImpl);
// Magic format introduced around Kafka 1.0.0 and still used with Kafka 3.3.
constexpr int8_t MAGIC = 2;

Bytes FetchRecordConverterImpl::renderRecordBatch(
const std::vector<InboundRecordSharedPtr>& records) const {

Bytes result = {};

// Base offset (bytes 0..7).
const int64_t base_offset = htobe64(0);
const unsigned char* base_offset_b = reinterpret_cast<const unsigned char*>(&base_offset);
result.insert(result.end(), base_offset_b, base_offset_b + sizeof(base_offset));

// Batch length placeholder (bytes 8..11).
result.insert(result.end(), {0, 0, 0, 0});

// All other attributes (spans partitionLeaderEpoch .. baseSequence) (bytes 12..56).
const std::vector zeros(45, 0);
result.insert(result.end(), zeros.begin(), zeros.end());

// Last offset delta.
// -1 means we always claim that we are at the beginning of partition.
const int32_t last_offset_delta = htobe32(-1);
const unsigned char* last_offset_delta_bytes =
reinterpret_cast<const unsigned char*>(&last_offset_delta);
const auto last_offset_delta_pos = result.begin() + 8 + 4 + 11;
std::copy(last_offset_delta_bytes, last_offset_delta_bytes + sizeof(last_offset_delta),
last_offset_delta_pos);

// Records (count) (bytes 57..60).
const int32_t record_count = htobe32(records.size());
const unsigned char* record_count_b = reinterpret_cast<const unsigned char*>(&record_count);
result.insert(result.end(), record_count_b, record_count_b + sizeof(record_count));

// Records (data) (bytes 61+).
for (const auto& record : records) {
appendRecord(*record, result);
}

// Set batch length.
const int32_t batch_len = htobe32(result.size() - (sizeof(base_offset) + sizeof(batch_len)));
const unsigned char* batch_len_bytes = reinterpret_cast<const unsigned char*>(&batch_len);
std::copy(batch_len_bytes, batch_len_bytes + sizeof(batch_len),
result.begin() + sizeof(base_offset));

// Set magic.
const uint32_t magic_offset = sizeof(base_offset) + sizeof(batch_len) + sizeof(int32_t);
result[magic_offset] = MAGIC;

// Compute and set CRC.
const uint32_t crc_offset = magic_offset + 1;
const auto crc_data_start = result.data() + crc_offset + sizeof(int32_t);
const auto crc_data_len = result.size() - (crc_offset + sizeof(int32_t));
const Bytes crc = renderCrc32c(crc_data_start, crc_data_len);
std::copy(crc.begin(), crc.end(), result.begin() + crc_offset);

return result;
}

void FetchRecordConverterImpl::appendRecord(const InboundRecord& record, Bytes& out) const {

Bytes tmp = {};
// This is not precise maths, as we could be over-reserving a little due to var-length fields.
tmp.reserve(sizeof(int8_t) + sizeof(int64_t) + sizeof(int32_t) + record.dataLengthEstimate());

// attributes: int8
const int8_t attributes = 0;
tmp.push_back(static_cast<unsigned char>(attributes));

// timestampDelta: varlong
const int64_t timestamp_delta = 0;
VarlenUtils::writeVarlong(timestamp_delta, tmp);

// offsetDelta: varint
const int32_t offset_delta = record.offset_;
VarlenUtils::writeVarint(offset_delta, tmp);

// Impl note: compared to requests/responses, records serialize byte arrays as varint length +
// bytes (and not length + 1, then bytes). So we cannot use EncodingContext from serialization.h.

// keyLength: varint
// key: byte[]
const NullableBytes& key = record.key_;
if (key.has_value()) {
VarlenUtils::writeVarint(key->size(), tmp);
tmp.insert(tmp.end(), key->begin(), key->end());
} else {
VarlenUtils::writeVarint(-1, tmp);
}

// valueLen: varint
// value: byte[]
const NullableBytes& value = record.value_;
if (value.has_value()) {
VarlenUtils::writeVarint(value->size(), tmp);
tmp.insert(tmp.end(), value->begin(), value->end());
} else {
VarlenUtils::writeVarint(-1, tmp);
}

// TODO (adam.kotwasinski) Headers are not supported yet.
const int32_t header_count = 0;
VarlenUtils::writeVarint(header_count, tmp);

// Put tmp's length into 'out'.
VarlenUtils::writeVarint(tmp.size(), out);

// Put tmp's contents into 'out'.
out.insert(out.end(), tmp.begin(), tmp.end());
}

// XXX (adam.kotwasinski) Instead of computing it naively, either link against librdkafka's
// implementation or generate it.
// https://github.com/confluentinc/librdkafka/blob/v1.8.0/src/crc32c.c#L1
uint32_t FetchRecordConverterImpl::computeCrc32c(const unsigned char* data, const size_t len) {
uint32_t crc = 0xFFFFFFFF;
for (size_t i = 0; i < len; i++) {
char ch = data[i];
for (size_t j = 0; j < 8; j++) {
uint32_t b = (ch ^ crc) & 1;
crc >>= 1;
if (b) {
crc = crc ^ 0x82F63B78;
}
ch >>= 1;
}
}
return ~crc;
}

uint32_t FetchRecordConverterImpl::computeCrc32cForTest(const unsigned char* data,
const size_t len) {
return computeCrc32c(data, len);
}

Bytes FetchRecordConverterImpl::renderCrc32c(const unsigned char* data, const size_t len) const {
uint32_t crc = htobe32(computeCrc32c(data, len));
Bytes result;
unsigned char* raw = reinterpret_cast<unsigned char*>(&crc);
result.insert(result.end(), raw, raw + sizeof(crc));
return result;
}

} // namespace Mesh
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,24 @@ class FetchRecordConverterImpl : public FetchRecordConverter {

// Default singleton accessor.
static const FetchRecordConverter& getDefaultInstance();

static uint32_t computeCrc32cForTest(const unsigned char* data, const size_t len);

private:
// Helper function: transform records from a partition into a record batch.
// See: https://kafka.apache.org/33/documentation.html#recordbatch
Bytes renderRecordBatch(const std::vector<InboundRecordSharedPtr>& records) const;

// Helper function: append record to output array.
// See: https://kafka.apache.org/33/documentation.html#record
// https://github.com/apache/kafka/blob/3.3.2/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java#L164
void appendRecord(const InboundRecord& record, Bytes& out) const;

// Helper function: render CRC32C bytes from given input.
Bytes renderCrc32c(const unsigned char* data, const size_t len) const;

// Helper function: compute CRC32C.
static uint32_t computeCrc32c(const unsigned char* data, const size_t len);
};

} // namespace Mesh
Expand Down
17 changes: 14 additions & 3 deletions contrib/kafka/filters/network/source/mesh/inbound_record.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <string>

#include "absl/strings/str_cat.h"
#include "contrib/kafka/filters/network/source/kafka_types.h"

namespace Envoy {
namespace Extensions {
Expand All @@ -20,10 +21,20 @@ struct InboundRecord {
const int32_t partition_;
const int64_t offset_;

// TODO (adam.kotwasinski) Get data in here in the next commits.
const NullableBytes key_;
const NullableBytes value_;

InboundRecord(std::string topic, int32_t partition, int64_t offset)
: topic_{topic}, partition_{partition}, offset_{offset} {};
InboundRecord(const std::string& topic, const int32_t partition, const int64_t offset,
const NullableBytes& key, const NullableBytes& value)
: topic_{topic}, partition_{partition}, offset_{offset}, key_{key}, value_{value} {};

// Estimates how many bytes this record would take.
uint32_t dataLengthEstimate() const {
uint32_t result = 15; // Max key length, value lenght, header count.
result += key_ ? key_->size() : 0;
result += value_ ? value_->size() : 0;
return result;
}

// Used in logging.
std::string toString() const {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "contrib/kafka/filters/network/source/mesh/upstream_kafka_consumer_impl.h"

#include "contrib/kafka/filters/network/source/kafka_types.h"
#include "contrib/kafka/filters/network/source/mesh/librdkafka_utils_impl.h"

namespace Envoy {
Expand Down Expand Up @@ -99,12 +100,27 @@ void RichKafkaConsumer::runWorkerLoop() {
ENVOY_LOG(debug, "Worker thread for consumer [{}] finished", topic_);
}

// Helper method, converts byte array.
static NullableBytes toBytes(const void* data, const size_t size) {
const unsigned char* as_char = static_cast<const unsigned char*>(data);
if (data) {
Bytes bytes(as_char, as_char + size);
return {bytes};
} else {
return absl::nullopt;
}
}

// Helper method, gets rid of librdkafka.
static InboundRecordSharedPtr transform(RdKafkaMessagePtr arg) {
const auto topic = arg->topic_name();
const auto partition = arg->partition();
const auto offset = arg->offset();
return std::make_shared<InboundRecord>(topic, partition, offset);

const NullableBytes key = toBytes(arg->key_pointer(), arg->key_len());
const NullableBytes value = toBytes(arg->payload(), arg->len());

return std::make_shared<InboundRecord>(topic, partition, offset, key, value);
}

std::vector<InboundRecordSharedPtr> RichKafkaConsumer::receiveRecordBatch() {
Expand Down
49 changes: 49 additions & 0 deletions contrib/kafka/filters/network/source/serialization.cc
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,55 @@ NullableBytes NullableCompactBytesDeserializer::get() const {
}
}

namespace VarlenUtils {

uint32_t writeUnsignedVarint(const uint32_t arg, Bytes& dst) {
uint32_t value = arg;

uint32_t elements_with_1 = 0;
// As long as there are bits set on indexes 8 or higher (counting from 1).
while ((value & ~(0x7f)) != 0) {
// Save next 7-bit batch with highest bit set.
const uint8_t el = (value & 0x7f) | 0x80;
dst.push_back(el);
value >>= 7;
elements_with_1++;
}

// After the loop has finished, we are certain that bit 8 = 0, so we can just add final element.
const uint8_t el = value;
dst.push_back(el);

return elements_with_1 + 1;
}

uint32_t writeVarint(const int32_t arg, Bytes& dst) {
uint32_t zz = (static_cast<uint32_t>(arg) << 1) ^ (arg >> 31); // Zig-zag.
return writeUnsignedVarint(zz, dst);
}

uint32_t writeVarlong(const int64_t arg, Bytes& dst) {
uint64_t value = (static_cast<uint64_t>(arg) << 1) ^ (arg >> 63); // Zig-zag.

uint32_t elements_with_1 = 0;
// As long as there are bits set on indexes 8 or higher (counting from 1).
while ((value & ~(0x7f)) != 0) {
// Save next 7-bit batch with highest bit set.
const uint8_t el = (value & 0x7f) | 0x80;
dst.push_back(el);
value >>= 7;
elements_with_1++;
}

// After the loop has finished, we are certain that bit 8 = 0, so we can just add final element.
const uint8_t el = value;
dst.push_back(el);

return elements_with_1 + 1;
}

} // namespace VarlenUtils

} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
Expand Down
Loading

0 comments on commit 684c3f2

Please sign in to comment.