From 33efb40c4cada2667db1b8c99c8bcc043e61a37e Mon Sep 17 00:00:00 2001 From: Gimi Liang Date: Sun, 21 Jan 2024 15:15:37 -0800 Subject: [PATCH 1/2] bugfix: handle format errors in KafkaSource --- src/Formats/ProtobufReader.cpp | 12 ++++++++++-- src/Formats/ProtobufReader.h | 2 ++ src/Storages/ExternalStream/Kafka/KafkaSource.cpp | 14 +++++++++++++- src/Storages/ExternalStream/Kafka/KafkaSource.h | 1 + 4 files changed, 26 insertions(+), 3 deletions(-) diff --git a/src/Formats/ProtobufReader.cpp b/src/Formats/ProtobufReader.cpp index 9ead722e2f6..c69625b4a6e 100644 --- a/src/Formats/ProtobufReader.cpp +++ b/src/Formats/ProtobufReader.cpp @@ -3,7 +3,6 @@ #if USE_PROTOBUF # include - namespace DB { namespace ErrorCodes @@ -437,12 +436,21 @@ void ProtobufReader::ignoreGroup() ErrorCodes::UNKNOWN_PROTOBUF_FORMAT); } - +/// proton: starts void ProtobufReader::setReadBuffer(ReadBuffer & buf) { in.swap(buf); + /// reset states cursor = 0; + current_message_level = 0; + current_message_end = 0; + parent_message_ends.clear(); + field_number = 0; + next_field_number = 0; + field_end = 0; } +/// proton: ends + } #endif diff --git a/src/Formats/ProtobufReader.h b/src/Formats/ProtobufReader.h index c66543a0438..ceef4b25c35 100644 --- a/src/Formats/ProtobufReader.h +++ b/src/Formats/ProtobufReader.h @@ -33,7 +33,9 @@ class ProtobufReader void readStringAndAppend(PaddedPODArray & str); bool eof() const { return in.eof(); } + /// proton: starts void setReadBuffer(ReadBuffer & buf); + /// proton: ends private: void readBinary(void * data, size_t size); diff --git a/src/Storages/ExternalStream/Kafka/KafkaSource.cpp b/src/Storages/ExternalStream/Kafka/KafkaSource.cpp index af1dd4fe8fa..7e857ea44a4 100644 --- a/src/Storages/ExternalStream/Kafka/KafkaSource.cpp +++ b/src/Storages/ExternalStream/Kafka/KafkaSource.cpp @@ -145,6 +145,12 @@ void KafkaSource::parseFormat(const rd_kafka_message_t * kmessage) external_stream_counter->addToReadBytes(kmessage->len); external_stream_counter->addToReadCounts(new_rows); + if (format_error) + { + LOG_ERROR(log, "Failed to parse message at {}: {}", kmessage->offset, format_error.value()); + format_error.reset(); + } + if (!new_rows) return; @@ -238,7 +244,13 @@ void KafkaSource::initFormatExecutor(const Kafka * kafka) = FormatFactory::instance().getInputFormat(data_format, read_buffer, non_virtual_header, query_context, max_block_size); format_executor = std::make_unique( - non_virtual_header, std::move(input_format), [](const MutableColumns &, Exception &) -> size_t { return 0; }); + non_virtual_header, + std::move(input_format), + [this](const MutableColumns &, Exception & ex) -> size_t + { + format_error = ex.what(); + return 0; + }); auto converting_dag = ActionsDAG::makeConvertingActions( non_virtual_header.cloneEmpty().getColumnsWithTypeAndName(), diff --git a/src/Storages/ExternalStream/Kafka/KafkaSource.h b/src/Storages/ExternalStream/Kafka/KafkaSource.h index b0502cadfd2..5a2683ab12b 100644 --- a/src/Storages/ExternalStream/Kafka/KafkaSource.h +++ b/src/Storages/ExternalStream/Kafka/KafkaSource.h @@ -82,6 +82,7 @@ class KafkaSource final : public ISource bool request_virtual_columns = false; + std::optional format_error; std::vector result_chunks; std::vector::iterator iter; MutableColumns current_batch; From 4d492cef567b7a471bd758ef2ad98ef3da502c22 Mon Sep 17 00:00:00 2001 From: Gimi Liang Date: Sun, 21 Jan 2024 15:22:56 -0800 Subject: [PATCH 2/2] KafkaSource should use format_schema from the ExternalStreamSettings --- src/Storages/ExternalStream/Kafka/KafkaSource.cpp | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/Storages/ExternalStream/Kafka/KafkaSource.cpp b/src/Storages/ExternalStream/Kafka/KafkaSource.cpp index 7e857ea44a4..aa9e0e7215d 100644 --- a/src/Storages/ExternalStream/Kafka/KafkaSource.cpp +++ b/src/Storages/ExternalStream/Kafka/KafkaSource.cpp @@ -154,7 +154,7 @@ void KafkaSource::parseFormat(const rd_kafka_message_t * kmessage) if (!new_rows) return; - auto result_block = non_virtual_header.cloneWithColumns(format_executor->getResultColumns()); + auto result_block = non_virtual_header.cloneWithColumns(format_executor->getResultColumns()); convert_non_virtual_to_physical_action->execute(result_block); MutableColumns new_data(result_block.mutateColumns()); @@ -240,8 +240,13 @@ void KafkaSource::initFormatExecutor(const Kafka * kafka) { const auto & data_format = kafka->dataFormat(); - auto input_format - = FormatFactory::instance().getInputFormat(data_format, read_buffer, non_virtual_header, query_context, max_block_size); + auto input_format = FormatFactory::instance().getInputFormat( + data_format, + read_buffer, + non_virtual_header, + query_context, + max_block_size, + kafka->getFormatSettings(query_context)); format_executor = std::make_unique( non_virtual_header,