Skip to content

Need to handle errors of different data format in one kafka topic #483

@leo-cai-timeplus

Description

@leo-cai-timeplus

Describe what's wrong

First Create one kafka topic, then create some external streams on this topic with different data format, and ingest data via these streams, then run select query with these external streams, some may lead to Fatal.

How to reproduce

  1. create a topic 'foo'
  2. run sql:
CREATE OR REPLACE FORMAT SCHEMA foo AS '
    syntax = "proto3";
    message Foo {
        string str = 1;
        int32  i = 2;
    }
' TYPE Protobuf;
DROP STREAM IF EXISTS ext_s;
CREATE EXTERNAL STREAM ext_s (
    str string,
    i int32
) SETTINGS type='kafka', data_format='ProtobufSingle', brokers='stream-store:9092', topic='foo', format_schema='foo:Foo';
DROP STREAM IF EXISTS ext;
CREATE EXTERNAL STREAM ext (
    str string,
    i int32
) SETTINGS type='kafka', data_format='Protobuf', brokers='stream-store:9092', topic='foo', format_schema='foo:Foo';
INSERT INTO ext VALUES ('aaa', 1), ('bbb', 2);
INSERT INTO ext_s VALUES ('aaa', 1);
SELECT * FROM ext_s settings format_schema='foo:Foo', seek_to='earliest';

Error message and/or stacktrace

2024.01.12 03:10:05.966213 [ 419 ] {} <Fatal> BaseDaemon: ########################################
2024.01.12 03:10:05.966314 [ 419 ] {} <Fatal> BaseDaemon: (version 1.3.31, build id: 95BDBDEAB46863A7E119EDFEC2C39EB60F08AFB2) (from thread 404) (query_id: c4357dd7-0614-4aee-9bf1-ad60056f3414) (query: SELECT * FROM ext_s settings format_schema='foo:Foo', seek_to='earliest';) Received signal Segmentation fault (11)
2024.01.12 03:10:05.966358 [ 419 ] {} <Fatal> BaseDaemon: Address: NULL pointer. Access: read. Address not mapped to object.
2024.01.12 03:10:05.966394 [ 419 ] {} <Fatal> BaseDaemon: Stack trace: 0x16a3d5a8 0x16a7e220 0x16a3ba2e 0x169dfc65 0x1698c715 0x1698c450 0x169bdb67 0x1a715ed1 0x1a715dd5 0x1a74436f 0x1a716f52 0x1a715af2 0x1698c715 0x1698c450 0x169b6077 0x169a8d9d 0x169aa607 0x100da2af 0x100dccee 0x7f8bb6d97609 0x7f8bb6cac353
2024.01.12 03:10:05.966475 [ 419 ] {} <Fatal> BaseDaemon: 3. DB::ProtobufReader::endMessage(bool) @ 0x16a3d5a8 in /usr/bin/proton
2024.01.12 03:10:05.966500 [ 419 ] {} <Fatal> BaseDaemon: 4. DB::(anonymous namespace)::ProtobufSerializerMessage::readRow(unsigned long) @ 0x16a7e220 in /usr/bin/proton
2024.01.12 03:10:05.966546 [ 419 ] {} <Fatal> BaseDaemon: 5. DB::ProtobufRowInputFormat::readRow(std::__1::vector<COW<DB::IColumn>::mutable_ptr<DB::IColumn>, std::__1::allocator<COW<DB::IColumn>::mutable_ptr<DB::IColumn>>>&, DB::RowReadExtension&) @ 0x16a3ba2e in /usr/bin/proton
2024.01.12 03:10:05.966573 [ 419 ] {} <Fatal> BaseDaemon: 6. DB::IRowInputFormat::generate() @ 0x169dfc65 in /usr/bin/proton
2024.01.12 03:10:05.966601 [ 419 ] {} <Fatal> BaseDaemon: 7. DB::ISource::tryGenerate() @ 0x1698c715 in /usr/bin/proton
2024.01.12 03:10:05.966618 [ 419 ] {} <Fatal> BaseDaemon: 8. DB::ISource::work() @ 0x1698c450 in /usr/bin/proton
2024.01.12 03:10:05.966636 [ 419 ] {} <Fatal> BaseDaemon: 9. DB::StreamingFormatExecutor::execute() @ 0x169bdb67 in /usr/bin/proton
2024.01.12 03:10:05.966659 [ 419 ] {} <Fatal> BaseDaemon: 10. DB::KafkaSource::parseFormat(rd_kafka_message_s const*) @ 0x1a715ed1 in /usr/bin/proton
2024.01.12 03:10:05.966687 [ 419 ] {} <Fatal> BaseDaemon: 11. DB::KafkaSource::parseMessage(void*, unsigned long, void*) @ 0x1a715dd5 in /usr/bin/proton
2024.01.12 03:10:05.966712 [ 419 ] {} <Fatal> BaseDaemon: 12. klog::KafkaWALSimpleConsumer::consume(void (*)(void*, unsigned long, void*), void*, unsigned int, int, klog::KafkaWALContext const&) const @ 0x1a74436f in /usr/bin/proton
2024.01.12 03:10:05.966730 [ 419 ] {} <Fatal> BaseDaemon: 13. DB::KafkaSource::readAndProcess() @ 0x1a716f52 in /usr/bin/proton
2024.01.12 03:10:05.966746 [ 419 ] {} <Fatal> BaseDaemon: 14. DB::KafkaSource::generate() @ 0x1a715af2 in /usr/bin/proton
2024.01.12 03:10:05.966762 [ 419 ] {} <Fatal> BaseDaemon: 15. DB::ISource::tryGenerate() @ 0x1698c715 in /usr/bin/proton
2024.01.12 03:10:05.966776 [ 419 ] {} <Fatal> BaseDaemon: 16. DB::ISource::work() @ 0x1698c450 in /usr/bin/proton
2024.01.12 03:10:05.966793 [ 419 ] {} <Fatal> BaseDaemon: 17. DB::ExecutionThreadContext::executeTask() @ 0x169b6077 in /usr/bin/proton
2024.01.12 03:10:05.966812 [ 419 ] {} <Fatal> BaseDaemon: 18. DB::PipelineExecutor::executeStepImpl(unsigned long, std::__1::atomic<bool>*) @ 0x169a8d9d in /usr/bin/proton
2024.01.12 03:10:05.966855 [ 419 ] {} <Fatal> BaseDaemon: 19. void std::__1::__function::__policy_invoker<void ()>::__call_impl<std::__1::__function::__default_alloc_func<ThreadFromGlobalPool::ThreadFromGlobalPool<DB::PipelineExecutor::executeImpl(unsigned long, DB::ExecuteMode)::$_0>(DB::PipelineExecutor::executeImpl(unsigned long, DB::ExecuteMode)::$_0&&)::'lambda'(), void ()>>(std::__1::__function::__policy_storage const*) @ 0x169aa607 in /usr/bin/proton
2024.01.12 03:10:05.966882 [ 419 ] {} <Fatal> BaseDaemon: 20. ThreadPoolImpl<std::__1::thread>::worker(std::__1::__list_iterator<std::__1::thread, void*>) @ 0x100da2af in /usr/bin/proton
2024.01.12 03:10:05.966914 [ 419 ] {} <Fatal> BaseDaemon: 21. void* std::__1::__thread_proxy[abi:v15000]<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct>>, void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, long, std::__1::optional<unsigned long>, bool)::'lambda0'()>>(void*) @ 0x100dccee in /usr/bin/proton
2024.01.12 03:10:05.966930 [ 419 ] {} <Fatal> BaseDaemon: 22. ? @ 0x7f8bb6d97609 in ?
2024.01.12 03:10:05.966956 [ 419 ] {} <Fatal> BaseDaemon: 23. __clone @ 0x7f8bb6cac353 in ?
2024.01.12 03:10:05.966969 [ 419 ] {} <Fatal> BaseDaemon: Integrity check of the executable skipped because the reference checksum could not be read.

Additional context

Errors should report or messages with wrong format should be skipped.

Fatal is not wanted.

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions