diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h index 8e0efb13dc94..9317b9a0b4f5 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h @@ -15,7 +15,7 @@ class IClientDataConsumer : public TThrRefBase { using TPtr = TIntrusivePtr; public: - virtual TVector GetColumns() const = 0; + virtual const TVector& GetColumns() const = 0; virtual const TString& GetWhereFilter() const = 0; virtual TPurecalcCompileSettings GetPurecalcSettings() const = 0; virtual NActors::TActorId GetClientId() const = 0; diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp index 9137efbfd93f..d1d75e520ce6 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp @@ -57,7 +57,7 @@ class TFormatHandlerFixture : public TBaseFixture { } public: - TVector GetColumns() const override { + const TVector& GetColumns() const override { return Columns; } diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 992f486f3b24..16e1245297e3 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -100,7 +100,13 @@ class TTopicSession : public TActorBootstrapped { : Self(self) , LogPrefix(logPrefix) , HandlerSettings(handlerSettings) - , Settings(ev->Get()->Record) + , QueryId(ev->Get()->Record.GetQueryId()) + , EnabledLLVM(ev->Get()->Record.GetSource().GetEnabledLLVM()) + , StartingMessageTimestampMs(ev->Get()->Record.GetStartingMessageTimestampMs()) + , Predicate(ev->Get()->Record.GetSource().GetPredicate()) + , Columns(GetColumns(ev->Get()->Record.GetSource())) + , ConsumerName(ev->Get()->Record.GetSource().GetConsumerName()) + , UseSsl(ev->Get()->Record.GetSource().GetUseSsl()) , ReadActorId(ev->Sender) , Counters(counters) { @@ -108,15 +114,27 @@ class TTopicSession : public TActorBootstrapped { NextMessageOffset = *offset; InitialOffset = *offset; } - Y_UNUSED(TDuration::TryParse(Settings.GetSource().GetReconnectPeriod(), ReconnectPeriod)); - auto queryGroup = Counters->GetSubgroup("query_id", ev->Get()->Record.GetQueryId()); + Y_UNUSED(TDuration::TryParse(ev->Get()->Record.GetSource().GetReconnectPeriod(), ReconnectPeriod)); + auto queryGroup = Counters->GetSubgroup("query_id", QueryId); auto readSubGroup = queryGroup->GetSubgroup("read_group", SanitizeLabel(readGroup)); FilteredDataRate = readSubGroup->GetCounter("FilteredDataRate", true); RestartSessionByOffsetsByQuery = readSubGroup->GetCounter("RestartSessionByOffsetsByQuery", true); + + const auto& source = ev->Get()->Record.GetSource(); + Y_ENSURE(source.ColumnsSize() == source.ColumnTypesSize(), "Columns size and types size should be equal, but got " << source.ColumnsSize() << " columns and " << source.ColumnTypesSize() << " types"); } ~TClientsInfo() { - Counters->RemoveSubgroup("query_id", Settings.GetQueryId()); + Counters->RemoveSubgroup("query_id", QueryId); + } + + static TVector GetColumns(const NYql::NPq::NProto::TDqPqTopicSource& source) { + TVector result; + result.reserve(source.ColumnsSize()); + for (ui64 i = 0; i < source.ColumnsSize(); ++i) { + result.emplace_back(TSchemaColumn{.Name = source.GetColumns().Get(i), .TypeYson = source.GetColumnTypes().Get(i)}); + } + return result; } TActorId GetClientId() const override { @@ -127,25 +145,16 @@ class TTopicSession : public TActorBootstrapped { return NextMessageOffset; } - TVector GetColumns() const override { - const auto& source = Settings.GetSource(); - Y_ENSURE(source.ColumnsSize() == source.ColumnTypesSize(), "Columns size and types size should be equal, but got " << source.ColumnsSize() << " columns and " << source.ColumnTypesSize() << " types"); - - TVector Columns; - Columns.reserve(source.ColumnsSize()); - for (ui64 i = 0; i < source.ColumnsSize(); ++i) { - Columns.emplace_back(TSchemaColumn{.Name = source.GetColumns().Get(i), .TypeYson = source.GetColumnTypes().Get(i)}); - } - + const TVector& GetColumns() const override { return Columns; } const TString& GetWhereFilter() const override { - return Settings.GetSource().GetPredicate(); + return Predicate; } TPurecalcCompileSettings GetPurecalcSettings() const override { - return {.EnabledLLVM = Settings.GetSource().GetEnabledLLVM()}; + return {.EnabledLLVM = EnabledLLVM}; } void OnClientError(TStatus status) override { @@ -185,7 +194,13 @@ class TTopicSession : public TActorBootstrapped { TTopicSession& Self; const TString& LogPrefix; const ITopicFormatHandler::TSettings HandlerSettings; - const NFq::NRowDispatcherProto::TEvStartSession Settings; + const TString QueryId; + const bool EnabledLLVM; + const ui64 StartingMessageTimestampMs; + const TString Predicate; + const TVector Columns; + const TString ConsumerName; + const bool UseSsl; const TActorId ReadActorId; TDuration ReconnectPeriod; @@ -281,9 +296,9 @@ class TTopicSession : public TActorBootstrapped { static constexpr char ActorName[] = "FQ_ROW_DISPATCHER_SESSION"; private: - NYdb::NTopic::TTopicClientSettings GetTopicClientSettings(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams) const; - NYql::ITopicClient& GetTopicClient(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams); - NYdb::NTopic::TReadSessionSettings GetReadSessionSettings(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams) const; + NYdb::NTopic::TTopicClientSettings GetTopicClientSettings(bool useSsl) const; + NYql::ITopicClient& GetTopicClient(bool useSsl); + NYdb::NTopic::TReadSessionSettings GetReadSessionSettings(const TString& consumerName) const; void CreateTopicSession(); void CloseTopicSession(); void SubscribeOnNextEvent(); @@ -411,17 +426,17 @@ void TTopicSession::SubscribeOnNextEvent() { }); } -NYdb::NTopic::TTopicClientSettings TTopicSession::GetTopicClientSettings(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams) const { +NYdb::NTopic::TTopicClientSettings TTopicSession::GetTopicClientSettings(bool useSsl) const { return PqGateway->GetTopicClientSettings() .Database(Database) .DiscoveryEndpoint(Endpoint) - .SslCredentials(NYdb::TSslCredentials(sourceParams.GetUseSsl())) + .SslCredentials(NYdb::TSslCredentials(useSsl)) .CredentialsProviderFactory(CredentialsProviderFactory); } -NYql::ITopicClient& TTopicSession::GetTopicClient(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams) { +NYql::ITopicClient& TTopicSession::GetTopicClient(bool useSsl) { if (!TopicClient) { - TopicClient = PqGateway->GetTopicClient(Driver, GetTopicClientSettings(sourceParams)); + TopicClient = PqGateway->GetTopicClient(Driver, GetTopicClientSettings(useSsl)); } return *TopicClient; } @@ -430,13 +445,13 @@ TInstant TTopicSession::GetMinStartingMessageTimestamp() const { auto result = TInstant::Max(); Y_ENSURE(!Clients.empty()); for (const auto& [actorId, info] : Clients) { - ui64 time = info->Settings.GetStartingMessageTimestampMs(); + ui64 time = info->StartingMessageTimestampMs; result = std::min(result, TInstant::MilliSeconds(time)); } return result; } -NYdb::NTopic::TReadSessionSettings TTopicSession::GetReadSessionSettings(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams) const { +NYdb::NTopic::TReadSessionSettings TTopicSession::GetReadSessionSettings(const TString& consumerName) const { NYdb::NTopic::TTopicReadSettings topicReadSettings; topicReadSettings.Path(TopicPath); topicReadSettings.AppendPartitionIds(PartitionId); @@ -453,7 +468,7 @@ NYdb::NTopic::TReadSessionSettings TTopicSession::GetReadSessionSettings(const N if (Config.GetWithoutConsumer()) { settings.WithoutConsumer(); } else { - settings.ConsumerName(sourceParams.GetConsumerName()); + settings.ConsumerName(consumerName); } return settings; } @@ -465,8 +480,8 @@ void TTopicSession::CreateTopicSession() { if (!ReadSession) { // Use any sourceParams. - const NYql::NPq::NProto::TDqPqTopicSource& sourceParams = Clients.begin()->second->Settings.GetSource(); - ReadSession = GetTopicClient(sourceParams).CreateReadSession(GetReadSessionSettings(sourceParams)); + const auto& client = Clients.begin()->second; + ReadSession = GetTopicClient(client->UseSsl).CreateReadSession(GetReadSessionSettings(client->ConsumerName)); StartingMessageTimestamp = GetMinStartingMessageTimestamp(); SubscribeOnNextEvent(); } @@ -699,7 +714,7 @@ void TTopicSession::SendData(TClientsInfo& info) { void TTopicSession::StartClientSession(TClientsInfo& info) { if (ReadSession) { - auto offset = GetOffset(info.Settings); + auto offset = info.GetNextMessageOffset(); if (offset && offset <= LastMessageOffset) { LOG_ROW_DISPATCHER_INFO("New client has less offset (" << offset << ") than the last message (" << LastMessageOffset << "), stop (restart) topic session"); Metrics.RestartSessionByOffsets->Inc();