From 34ed2ad5196ee263a2de2a637598503aae507c7c Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Thu, 17 Apr 2025 13:05:22 +0000 Subject: [PATCH 1/6] refactoring to reduce memory --- .../fq/libs/row_dispatcher/topic_session.cpp | 67 +++++++++++-------- 1 file changed, 38 insertions(+), 29 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 992f486f3b24..996fbf2c6146 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -100,7 +100,12 @@ class TTopicSession : public TActorBootstrapped { : Self(self) , LogPrefix(logPrefix) , HandlerSettings(handlerSettings) - , Settings(ev->Get()->Record) + , QueryId(ev->Get()->Record.GetQueryId()) + , EnabledLLVM(ev->Get()->Record.GetSource().GetEnabledLLVM()) + , StartingMessageTimestampMs(ev->Get()->Record.GetStartingMessageTimestampMs()) + , Predicate(ev->Get()->Record.GetSource().GetPredicate()) + , ConsumerName(ev->Get()->Record.GetSource().GetConsumerName()) + , UseSsl(ev->Get()->Record.GetSource().GetUseSsl()) , ReadActorId(ev->Sender) , Counters(counters) { @@ -108,15 +113,22 @@ class TTopicSession : public TActorBootstrapped { NextMessageOffset = *offset; InitialOffset = *offset; } - Y_UNUSED(TDuration::TryParse(Settings.GetSource().GetReconnectPeriod(), ReconnectPeriod)); - auto queryGroup = Counters->GetSubgroup("query_id", ev->Get()->Record.GetQueryId()); + Y_UNUSED(TDuration::TryParse(ev->Get()->Record.GetSource().GetReconnectPeriod(), ReconnectPeriod)); + auto queryGroup = Counters->GetSubgroup("query_id", QueryId); auto readSubGroup = queryGroup->GetSubgroup("read_group", SanitizeLabel(readGroup)); FilteredDataRate = readSubGroup->GetCounter("FilteredDataRate", true); RestartSessionByOffsetsByQuery = readSubGroup->GetCounter("RestartSessionByOffsetsByQuery", true); + + const auto& source = ev->Get()->Record.GetSource(); + Y_ENSURE(source.ColumnsSize() == source.ColumnTypesSize(), "Columns size and types size should be equal, but got " << source.ColumnsSize() << " columns and " << source.ColumnTypesSize() << " types"); + Columns.reserve(source.ColumnsSize()); + for (ui64 i = 0; i < source.ColumnsSize(); ++i) { + Columns.emplace_back(TSchemaColumn{.Name = source.GetColumns().Get(i), .TypeYson = source.GetColumnTypes().Get(i)}); + } } ~TClientsInfo() { - Counters->RemoveSubgroup("query_id", Settings.GetQueryId()); + Counters->RemoveSubgroup("query_id", QueryId); } TActorId GetClientId() const override { @@ -128,24 +140,15 @@ class TTopicSession : public TActorBootstrapped { } TVector GetColumns() const override { - const auto& source = Settings.GetSource(); - Y_ENSURE(source.ColumnsSize() == source.ColumnTypesSize(), "Columns size and types size should be equal, but got " << source.ColumnsSize() << " columns and " << source.ColumnTypesSize() << " types"); - - TVector Columns; - Columns.reserve(source.ColumnsSize()); - for (ui64 i = 0; i < source.ColumnsSize(); ++i) { - Columns.emplace_back(TSchemaColumn{.Name = source.GetColumns().Get(i), .TypeYson = source.GetColumnTypes().Get(i)}); - } - return Columns; } const TString& GetWhereFilter() const override { - return Settings.GetSource().GetPredicate(); + return Predicate; } TPurecalcCompileSettings GetPurecalcSettings() const override { - return {.EnabledLLVM = Settings.GetSource().GetEnabledLLVM()}; + return {.EnabledLLVM = EnabledLLVM}; } void OnClientError(TStatus status) override { @@ -185,7 +188,13 @@ class TTopicSession : public TActorBootstrapped { TTopicSession& Self; const TString& LogPrefix; const ITopicFormatHandler::TSettings HandlerSettings; - const NFq::NRowDispatcherProto::TEvStartSession Settings; + const TString QueryId; + const bool EnabledLLVM; + const ui64 StartingMessageTimestampMs; + const TString Predicate; + TVector Columns; + const TString ConsumerName; + const bool UseSsl; const TActorId ReadActorId; TDuration ReconnectPeriod; @@ -281,9 +290,9 @@ class TTopicSession : public TActorBootstrapped { static constexpr char ActorName[] = "FQ_ROW_DISPATCHER_SESSION"; private: - NYdb::NTopic::TTopicClientSettings GetTopicClientSettings(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams) const; - NYql::ITopicClient& GetTopicClient(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams); - NYdb::NTopic::TReadSessionSettings GetReadSessionSettings(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams) const; + NYdb::NTopic::TTopicClientSettings GetTopicClientSettings(bool useSsl) const; + NYql::ITopicClient& GetTopicClient(bool useSsl); + NYdb::NTopic::TReadSessionSettings GetReadSessionSettings(const TString& consumerName) const; void CreateTopicSession(); void CloseTopicSession(); void SubscribeOnNextEvent(); @@ -411,17 +420,17 @@ void TTopicSession::SubscribeOnNextEvent() { }); } -NYdb::NTopic::TTopicClientSettings TTopicSession::GetTopicClientSettings(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams) const { +NYdb::NTopic::TTopicClientSettings TTopicSession::GetTopicClientSettings(bool useSsl) const { return PqGateway->GetTopicClientSettings() .Database(Database) .DiscoveryEndpoint(Endpoint) - .SslCredentials(NYdb::TSslCredentials(sourceParams.GetUseSsl())) + .SslCredentials(NYdb::TSslCredentials(useSsl)) .CredentialsProviderFactory(CredentialsProviderFactory); } -NYql::ITopicClient& TTopicSession::GetTopicClient(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams) { +NYql::ITopicClient& TTopicSession::GetTopicClient(bool useSsl) { if (!TopicClient) { - TopicClient = PqGateway->GetTopicClient(Driver, GetTopicClientSettings(sourceParams)); + TopicClient = PqGateway->GetTopicClient(Driver, GetTopicClientSettings(useSsl)); } return *TopicClient; } @@ -430,13 +439,13 @@ TInstant TTopicSession::GetMinStartingMessageTimestamp() const { auto result = TInstant::Max(); Y_ENSURE(!Clients.empty()); for (const auto& [actorId, info] : Clients) { - ui64 time = info->Settings.GetStartingMessageTimestampMs(); + ui64 time = info->StartingMessageTimestampMs; result = std::min(result, TInstant::MilliSeconds(time)); } return result; } -NYdb::NTopic::TReadSessionSettings TTopicSession::GetReadSessionSettings(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams) const { +NYdb::NTopic::TReadSessionSettings TTopicSession::GetReadSessionSettings(const TString& consumerName) const { NYdb::NTopic::TTopicReadSettings topicReadSettings; topicReadSettings.Path(TopicPath); topicReadSettings.AppendPartitionIds(PartitionId); @@ -453,7 +462,7 @@ NYdb::NTopic::TReadSessionSettings TTopicSession::GetReadSessionSettings(const N if (Config.GetWithoutConsumer()) { settings.WithoutConsumer(); } else { - settings.ConsumerName(sourceParams.GetConsumerName()); + settings.ConsumerName(consumerName); } return settings; } @@ -465,8 +474,8 @@ void TTopicSession::CreateTopicSession() { if (!ReadSession) { // Use any sourceParams. - const NYql::NPq::NProto::TDqPqTopicSource& sourceParams = Clients.begin()->second->Settings.GetSource(); - ReadSession = GetTopicClient(sourceParams).CreateReadSession(GetReadSessionSettings(sourceParams)); + const auto& client = Clients.begin()->second; + ReadSession = GetTopicClient(client->UseSsl).CreateReadSession(GetReadSessionSettings(client->ConsumerName)); StartingMessageTimestamp = GetMinStartingMessageTimestamp(); SubscribeOnNextEvent(); } @@ -699,7 +708,7 @@ void TTopicSession::SendData(TClientsInfo& info) { void TTopicSession::StartClientSession(TClientsInfo& info) { if (ReadSession) { - auto offset = GetOffset(info.Settings); + auto offset = info.GetNextMessageOffset(); if (offset && offset <= LastMessageOffset) { LOG_ROW_DISPATCHER_INFO("New client has less offset (" << offset << ") than the last message (" << LastMessageOffset << "), stop (restart) topic session"); Metrics.RestartSessionByOffsets->Inc(); From e2963ab9d6de729aedc7f9cc262dce55964a5e7a Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Fri, 25 Apr 2025 06:09:27 +0000 Subject: [PATCH 2/6] add const --- .../fq/libs/row_dispatcher/format_handler/format_handler.h | 2 +- .../row_dispatcher/format_handler/ut/format_handler_ut.cpp | 2 +- ydb/core/fq/libs/row_dispatcher/topic_session.cpp | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h index 8e0efb13dc94..e7454caa4aa0 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h @@ -15,7 +15,7 @@ class IClientDataConsumer : public TThrRefBase { using TPtr = TIntrusivePtr; public: - virtual TVector GetColumns() const = 0; + virtual const TVector GetColumns() const = 0; virtual const TString& GetWhereFilter() const = 0; virtual TPurecalcCompileSettings GetPurecalcSettings() const = 0; virtual NActors::TActorId GetClientId() const = 0; diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp index 9137efbfd93f..c19c7cb1d4e4 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp @@ -57,7 +57,7 @@ class TFormatHandlerFixture : public TBaseFixture { } public: - TVector GetColumns() const override { + const TVector GetColumns() const override { return Columns; } diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 996fbf2c6146..09de8a99bb4f 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -139,7 +139,7 @@ class TTopicSession : public TActorBootstrapped { return NextMessageOffset; } - TVector GetColumns() const override { + const TVector GetColumns() const override { return Columns; } @@ -192,7 +192,7 @@ class TTopicSession : public TActorBootstrapped { const bool EnabledLLVM; const ui64 StartingMessageTimestampMs; const TString Predicate; - TVector Columns; + const TVector Columns; const TString ConsumerName; const bool UseSsl; const TActorId ReadActorId; From e42f3d7db074739b6001822b37f1b8fe893eecbd Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Fri, 25 Apr 2025 06:36:46 +0000 Subject: [PATCH 3/6] init columns --- ydb/core/fq/libs/row_dispatcher/topic_session.cpp | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 09de8a99bb4f..ef000de7a819 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -104,6 +104,7 @@ class TTopicSession : public TActorBootstrapped { , EnabledLLVM(ev->Get()->Record.GetSource().GetEnabledLLVM()) , StartingMessageTimestampMs(ev->Get()->Record.GetStartingMessageTimestampMs()) , Predicate(ev->Get()->Record.GetSource().GetPredicate()) + , Columns(GetColumns(ev->Get()->Record.GetSource())) , ConsumerName(ev->Get()->Record.GetSource().GetConsumerName()) , UseSsl(ev->Get()->Record.GetSource().GetUseSsl()) , ReadActorId(ev->Sender) @@ -121,16 +122,21 @@ class TTopicSession : public TActorBootstrapped { const auto& source = ev->Get()->Record.GetSource(); Y_ENSURE(source.ColumnsSize() == source.ColumnTypesSize(), "Columns size and types size should be equal, but got " << source.ColumnsSize() << " columns and " << source.ColumnTypesSize() << " types"); - Columns.reserve(source.ColumnsSize()); - for (ui64 i = 0; i < source.ColumnsSize(); ++i) { - Columns.emplace_back(TSchemaColumn{.Name = source.GetColumns().Get(i), .TypeYson = source.GetColumnTypes().Get(i)}); - } } ~TClientsInfo() { Counters->RemoveSubgroup("query_id", QueryId); } + static TVector GetColumns(const NYql::NPq::NProto::TDqPqTopicSource& source) { + TVector result; + result.reserve(source.ColumnsSize()); + for (ui64 i = 0; i < source.ColumnsSize(); ++i) { + result.emplace_back(TSchemaColumn{.Name = source.GetColumns().Get(i), .TypeYson = source.GetColumnTypes().Get(i)}); + } + return result; + } + TActorId GetClientId() const override { return ReadActorId; } From 14243142b2335d6ee2a9f55fdb995b001de8972f Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Wed, 7 May 2025 10:02:07 +0300 Subject: [PATCH 4/6] Update ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h Co-authored-by: yumkam --- ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h index e7454caa4aa0..9317b9a0b4f5 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h @@ -15,7 +15,7 @@ class IClientDataConsumer : public TThrRefBase { using TPtr = TIntrusivePtr; public: - virtual const TVector GetColumns() const = 0; + virtual const TVector& GetColumns() const = 0; virtual const TString& GetWhereFilter() const = 0; virtual TPurecalcCompileSettings GetPurecalcSettings() const = 0; virtual NActors::TActorId GetClientId() const = 0; From dfa96c7768e82b9c8218b752b4bcb3e53dc11af5 Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Wed, 7 May 2025 10:02:15 +0300 Subject: [PATCH 5/6] Update ydb/core/fq/libs/row_dispatcher/topic_session.cpp Co-authored-by: yumkam --- ydb/core/fq/libs/row_dispatcher/topic_session.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index ef000de7a819..16e1245297e3 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -145,7 +145,7 @@ class TTopicSession : public TActorBootstrapped { return NextMessageOffset; } - const TVector GetColumns() const override { + const TVector& GetColumns() const override { return Columns; } From 4d095f5cdf7e897963ef7f4bf1e1807287039541 Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Wed, 7 May 2025 10:02:25 +0300 Subject: [PATCH 6/6] Update ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp Co-authored-by: yumkam --- .../libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp index c19c7cb1d4e4..d1d75e520ce6 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp @@ -57,7 +57,7 @@ class TFormatHandlerFixture : public TBaseFixture { } public: - const TVector GetColumns() const override { + const TVector& GetColumns() const override { return Columns; }