From 3b1e682d9f7bf15ff139f6b470d0d95377b9c0e9 Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Wed, 26 Nov 2025 13:37:32 +0000 Subject: [PATCH 01/11] enable_streaming_queries_counters feature flag added --- .../fq/libs/row_dispatcher/row_dispatcher.cpp | 21 ++++++++++++++----- .../fq/libs/row_dispatcher/topic_session.cpp | 21 +++++++++++++------ .../kqp/executer_actor/kqp_data_executer.cpp | 6 +++++- .../kqp/proxy_service/kqp_proxy_service.cpp | 2 +- ydb/core/protos/feature_flags.proto | 1 + ydb/tests/fq/streaming/conftest.py | 3 ++- 6 files changed, 40 insertions(+), 14 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp index a7aea662d7c1..fbe01372d06c 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp @@ -5,6 +5,9 @@ #include "leader_election.h" #include "probes.h" +#include "ydb/core/base/appdata_fwd.h" +#include "ydb/core/base/feature_flags.h" + #include #include #include @@ -117,9 +120,13 @@ struct TQueryStatKeyHash { struct TAggQueryStat { TAggQueryStat() = default; - TAggQueryStat(const TString& queryId, const ::NMonitoring::TDynamicCounterPtr& counters, const NYql::NPq::NProto::TDqPqTopicSource& sourceParams) + TAggQueryStat(const TString& queryId, const ::NMonitoring::TDynamicCounterPtr& counters, const NYql::NPq::NProto::TDqPqTopicSource& sourceParams, bool enableStreamingQueriesCounters) : QueryId(queryId) , SubGroup(counters) { + Cerr << "enableStreamingQueriesCounters " << enableStreamingQueriesCounters <(); + } for (const auto& sensor : sourceParams.GetTaskSensorLabel()) { SubGroup = SubGroup->GetSubgroup(sensor.GetLabel(), sensor.GetValue()); } @@ -411,6 +418,7 @@ class TRowDispatcher : public TActorBootstrapped { TMap> ConsumersByEventQueueId; THashMap TopicSessions; TMap ReadActorsInternalState; + bool EnableStreamingQueriesCounters = false; public: explicit TRowDispatcher( @@ -552,6 +560,7 @@ void TRowDispatcher::Bootstrap() { TlsActivationContext->ActorSystem(), SelfId()); } NodesTracker.Init(SelfId()); + EnableStreamingQueriesCounters = NKikimr::AppData()->FeatureFlags.GetEnableStreamingQueriesCounters(); } void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr& ev) { @@ -665,7 +674,7 @@ void TRowDispatcher::UpdateMetrics() { TQueryStatKey statKey{consumer->QueryId, key.ReadGroup}; auto& stats = AggrStats.LastQueryStats.emplace( statKey, - TAggQueryStat(consumer->QueryId, Metrics.Counters, consumer->SourceParams)).first->second; + TAggQueryStat(consumer->QueryId, Metrics.Counters, consumer->SourceParams, EnableStreamingQueriesCounters)).first->second; stats.Add(partition.Stat, partition.FilteredBytes); partition.FilteredBytes = 0; } @@ -841,9 +850,11 @@ void TRowDispatcher::UpdateReadActorsInternalState() { void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { LOG_ROW_DISPATCHER_DEBUG("Received TEvStartSession from " << ev->Sender << ", read group " << ev->Get()->Record.GetSource().GetReadGroup() << ", topicPath " << ev->Get()->Record.GetSource().GetTopicPath() << " part id " << JoinSeq(',', ev->Get()->Record.GetPartitionIds()) << " query id " << ev->Get()->Record.GetQueryId() << " cookie " << ev->Cookie); - auto queryGroup = Metrics.Counters->GetSubgroup("query_id", ev->Get()->Record.GetQueryId()); - auto topicGroup = queryGroup->GetSubgroup("read_group", SanitizeLabel(ev->Get()->Record.GetSource().GetReadGroup())); - topicGroup->GetCounter("StartSession", true)->Inc(); + if (EnableStreamingQueriesCounters) { + auto queryGroup = Metrics.Counters->GetSubgroup("query_id", ev->Get()->Record.GetQueryId()); + auto topicGroup = queryGroup->GetSubgroup("read_group", SanitizeLabel(ev->Get()->Record.GetSource().GetReadGroup())); + topicGroup->GetCounter("StartSession", true)->Inc(); + } LWPROBE(StartSession, ev->Sender.ToString(), ev->Get()->Record.GetQueryId(), ev->Get()->Record.ByteSizeLong()); diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index f00617fe1d4e..f89dcb4bf8d2 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -1,5 +1,8 @@ #include "topic_session.h" +#include "ydb/core/base/appdata_fwd.h" +#include "ydb/core/base/feature_flags.h" + #include #include #include @@ -23,8 +26,10 @@ namespace { //////////////////////////////////////////////////////////////////////////////// struct TTopicSessionMetrics { - void Init(const ::NMonitoring::TDynamicCounterPtr& counters, const TString& topicPath, const TString& readGroup, ui32 partitionId) { - TopicGroup = counters->GetSubgroup("topic", SanitizeLabel(topicPath)); + void Init(const ::NMonitoring::TDynamicCounterPtr& counters, const TString& topicPath, const TString& readGroup, ui32 partitionId, bool enableStreamingQueriesCounters) { + TopicGroup = enableStreamingQueriesCounters + ? counters->GetSubgroup("topic", SanitizeLabel(topicPath)) + : MakeIntrusive<::NMonitoring::TDynamicCounters>(); ReadGroup = TopicGroup->GetSubgroup("read_group", SanitizeLabel(readGroup)); PartitionGroup = ReadGroup->GetSubgroup("partition", ToString(partitionId)); @@ -98,7 +103,7 @@ class TTopicSession : public TActorBootstrapped, NYql::TTopicEven struct TClientsInfo : public IClientDataConsumer { using TPtr = TIntrusivePtr; - TClientsInfo(TTopicSession& self, const TString& logPrefix, const ITopicFormatHandler::TSettings& handlerSettings, const NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev, const NMonitoring::TDynamicCounterPtr& counters, const TString& readGroup, TMaybe offset) + TClientsInfo(TTopicSession& self, const TString& logPrefix, const ITopicFormatHandler::TSettings& handlerSettings, const NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev, const NMonitoring::TDynamicCounterPtr& counters, const TString& readGroup, TMaybe offset, bool enableStreamingQueriesCounters) : Self(self) , LogPrefix(logPrefix) , HandlerSettings(handlerSettings) @@ -122,7 +127,9 @@ class TTopicSession : public TActorBootstrapped, NYql::TTopicEven for (const auto& sensor : ev->Get()->Record.GetSource().GetTaskSensorLabel()) { Counters = Counters->GetSubgroup(sensor.GetLabel(), sensor.GetValue()); } - auto queryGroup = Counters->GetSubgroup("query_id", QueryId); + auto queryGroup = enableStreamingQueriesCounters + ? Counters->GetSubgroup("query_id", QueryId) + : MakeIntrusive<::NMonitoring::TDynamicCounters>(); auto readSubGroup = queryGroup->GetSubgroup("read_group", SanitizeLabel(readGroup)); FilteredDataRate = readSubGroup->GetCounter("FilteredDataRate", true); RestartSessionByOffsetsByQuery = readSubGroup->GetCounter("RestartSessionByOffsetsByQuery", true); @@ -295,6 +302,7 @@ class TTopicSession : public TActorBootstrapped, NYql::TTopicEven TTopicSessionMetrics Metrics; const ::NMonitoring::TDynamicCounterPtr Counters; const ::NMonitoring::TDynamicCounterPtr CountersRoot; + bool EnableStreamingQueriesCounters = false; public: TTopicSession( @@ -419,7 +427,8 @@ TTopicSession::TTopicSession( void TTopicSession::Bootstrap() { Become(&TTopicSession::StateFunc); - Metrics.Init(Counters, TopicPath, ReadGroup, PartitionId); + EnableStreamingQueriesCounters = NKikimr::AppData()->FeatureFlags.GetEnableStreamingQueriesCounters(); + Metrics.Init(Counters, TopicPath, ReadGroup, PartitionId, EnableStreamingQueriesCounters); LogPrefix = LogPrefix + " " + SelfId().ToString() + " "; LOG_ROW_DISPATCHER_DEBUG("Bootstrap " << TopicPathPartition << ", Timeout " << Config.GetTimeoutBeforeStartSession() << " sec"); @@ -783,7 +792,7 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { const TString& format = source.GetFormat(); ITopicFormatHandler::TSettings handlerSettings = {.ParsingFormat = format ? format : "raw"}; - auto clientInfo = Clients.insert({ev->Sender, MakeIntrusive(*this, LogPrefix, handlerSettings, ev, Counters, ReadGroup, offset)}).first->second; + auto clientInfo = Clients.insert({ev->Sender, MakeIntrusive(*this, LogPrefix, handlerSettings, ev, Counters, ReadGroup, offset, EnableStreamingQueriesCounters)}).first->second; auto formatIt = FormatHandlers.find(handlerSettings); if (formatIt == FormatHandlers.end()) { auto config = CreateFormatHandlerConfig(Config, FunctionRegistry, CompileServiceActorId, source.GetSkipJsonErrors()); diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 6e7c17681ef0..b289d289e3cf 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2814,13 +2814,17 @@ class TKqpDataExecuter : public TKqpExecuterBaseCounters->GetKqpCounters(); + if (AppData()->FeatureFlags.GetEnableStreamingQueriesCounters()) { + counters = counters->GetSubgroup("path", context->StreamingQueryPath); + } const auto& checkpointId = context->CheckpointId; CheckpointCoordinatorId = Register(MakeCheckpointCoordinator( ::NFq::TCoordinatorId(checkpointId, Generation), NYql::NDq::MakeCheckpointStorageID(), SelfId(), {}, - Counters->Counters->GetKqpCounters()->GetSubgroup("path", context->StreamingQueryPath), + counters, graphParams, stateLoadMode, streamingDisposition).Release()); diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 33ea6f13f843..d5ec4421d37a 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -1819,7 +1819,7 @@ class TKqpProxyService : public TActorBootstrapped { "cs", NKikimr::CreateYdbCredentialsProviderFactory, *FederatedQuerySetup->Driver, - Counters->GetKqpCounters()->GetSubgroup("subsystem", "storage_service")); + Counters->GetKqpCounters()->GetSubgroup("subsystem", "checkpoints_storage_service")); CheckpointStorageService = TActivationContext::Register(service.release()); TActivationContext::ActorSystem()->RegisterLocalService( diff --git a/ydb/core/protos/feature_flags.proto b/ydb/core/protos/feature_flags.proto index 2af30de95010..5b331ec1d65a 100644 --- a/ydb/core/protos/feature_flags.proto +++ b/ydb/core/protos/feature_flags.proto @@ -241,4 +241,5 @@ message TFeatureFlags { optional bool DisableMissingDefaultColumnsInBulkUpsert = 215 [default = true]; optional bool EnableColumnTablesBackup = 216 [default = true]; optional bool EnableReplication = 217 [default = true]; + optional bool EnableStreamingQueriesCounters = 218 [default = false]; } diff --git a/ydb/tests/fq/streaming/conftest.py b/ydb/tests/fq/streaming/conftest.py index 871c81ccda8b..38cca5269026 100644 --- a/ydb/tests/fq/streaming/conftest.py +++ b/ydb/tests/fq/streaming/conftest.py @@ -45,7 +45,8 @@ def get_ydb_config(): erasure=Erasure.MIRROR_3_DC, extra_feature_flags={ "enable_external_data_sources": True, - "enable_streaming_queries": True + "enable_streaming_queries": True, + "enable_streaming_queries_counters": True }, query_service_config={"available_external_data_sources": ["Ydb"]}, table_service_config={}, From 16b5589081a24ad71272f6e0a48c73699fb876ec Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Wed, 26 Nov 2025 20:05:02 +0000 Subject: [PATCH 02/11] add sensors to source/sink --- ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp | 1 - ydb/core/kqp/compute_actor/kqp_compute_actor.cpp | 4 ++-- ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp | 8 ++++++++ .../providers/pq/async_io/dq_pq_rd_read_actor.cpp | 4 +++- .../yql/providers/pq/async_io/dq_pq_read_actor.cpp | 14 +++++++++++--- .../providers/pq/async_io/dq_pq_write_actor.cpp | 12 ++++++++++-- ydb/tests/fq/yds/test_row_dispatcher.py | 2 +- 7 files changed, 35 insertions(+), 10 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp index fbe01372d06c..810bc6364d4b 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp @@ -123,7 +123,6 @@ struct TAggQueryStat { TAggQueryStat(const TString& queryId, const ::NMonitoring::TDynamicCounterPtr& counters, const NYql::NPq::NProto::TDqPqTopicSource& sourceParams, bool enableStreamingQueriesCounters) : QueryId(queryId) , SubGroup(counters) { - Cerr << "enableStreamingQueriesCounters " << enableStreamingQueriesCounters <(); } diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp index ee1fdd881a61..a541c55a1450 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp @@ -109,8 +109,8 @@ NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory( NYql::NDq::RegisterDQSolomonReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory); NYql::NDq::RegisterDQSolomonWriteActorFactory(*factory, federatedQuerySetup->CredentialsFactory); - NYql::NDq::RegisterDqPqReadActorFactory(*factory, *federatedQuerySetup->Driver, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->PqGateway, nullptr); - NYql::NDq::RegisterDqPqWriteActorFactory(*factory, *federatedQuerySetup->Driver, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->PqGateway, nullptr); + NYql::NDq::RegisterDqPqReadActorFactory(*factory, *federatedQuerySetup->Driver, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->PqGateway, counters->GetKqpCounters()->GetSubgroup("subsystem", "DqSourceTracker")); + NYql::NDq::RegisterDqPqWriteActorFactory(*factory, *federatedQuerySetup->Driver, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->PqGateway, counters->GetKqpCounters()->GetSubgroup("subsystem", "DqSinkTracker")); } return factory; diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp index 14a9b61c38de..152d54f95501 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp @@ -2492,6 +2492,10 @@ void TKqpTasksGraph::BuildReadTasksFromSource(TStageInfo& stageInfo, const TVect FillReadTaskFromSource(task, sourceName, structuredToken, resourceSnapshot, nodeOffset++); + if (GetMeta().UserRequestContext && GetMeta().UserRequestContext->StreamingQueryPath) { + task.Meta.TaskParams.emplace("query_path", GetMeta().UserRequestContext->StreamingQueryPath); + } + tasksIds.push_back(task.Id); } @@ -2826,6 +2830,10 @@ void TKqpTasksGraph::BuildExternalSinks(const NKqpProto::TKqpSink& sink, TKqpTas } } + if (GetMeta().UserRequestContext && GetMeta().UserRequestContext->StreamingQueryPath) { + task.Meta.TaskParams.emplace("query_path", GetMeta().UserRequestContext->StreamingQueryPath); + } + auto& output = task.Outputs[sink.GetOutputIndex()]; output.Type = TTaskOutputType::Sink; output.SinkType = extSink.GetType(); diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp index 52462acc3ac3..76950146545c 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp @@ -23,6 +23,8 @@ #include #include #include +#include "ydb/core/base/appdata_fwd.h" +#include "ydb/core/base/feature_flags.h" #include #include @@ -80,7 +82,7 @@ struct TRowDispatcherReadActorMetrics { explicit TRowDispatcherReadActorMetrics(const TTxId& txId, ui64 taskId, const ::NMonitoring::TDynamicCounterPtr& counters, const NPq::NProto::TDqPqTopicSource& sourceParams) : TxId(std::visit([](auto arg) { return ToString(arg); }, txId)) , Counters(counters) { - if (Counters) { + if (Counters && NKikimr::AppData()->FeatureFlags.GetEnableStreamingQueriesCounters()) { SubGroup = Counters->GetSubgroup("source", "RdPqRead"); } else { SubGroup = MakeIntrusive<::NMonitoring::TDynamicCounters>(); diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp index bba1ffd8d274..b756f06a0cbc 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp @@ -4,6 +4,9 @@ #include "dq_pq_read_actor_base.h" #include "probes.h" +#include "ydb/core/base/appdata_fwd.h" +#include "ydb/core/base/feature_flags.h" + #include #include #include @@ -137,7 +140,7 @@ class TDqPqReadActor : public NActors::TActor, public NYql::NDq: const NPq::NProto::TDqPqTopicSource& sourceParams) : TxId(std::visit([](auto arg) { return ToString(arg); }, txId)) , Counters(counters) { - if (counters) { + if (counters && NKikimr::AppData()->FeatureFlags.GetEnableStreamingQueriesCounters()) { SubGroup = Counters->GetSubgroup("source", "PqRead"); } else { SubGroup = MakeIntrusive<::NMonitoring::TDynamicCounters>(); @@ -974,12 +977,17 @@ void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driv TVector readTaskParamsMsg; ui32 topicPartitionsCount = ExtractPartitionsFromParams(readTaskParamsMsg, args.TaskParams, args.ReadRanges); + auto txId = args.TxId; + auto taskParamsIt = args.TaskParams.find("query_path"); + if (taskParamsIt != args.TaskParams.end()) { + txId = taskParamsIt->second; + } if (!settings.GetSharedReading()) { return CreateDqPqReadActor( std::move(settings), args.InputIndex, args.StatsLevel, - args.TxId, + txId, args.TaskId, args.SecureParams, std::move(readTaskParamsMsg), @@ -998,7 +1006,7 @@ void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driv std::move(settings), args.InputIndex, args.StatsLevel, - args.TxId, + txId, args.TaskId, args.SecureParams, std::move(readTaskParamsMsg), diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp index 1c18fe0eb2e6..8278f5fcc230 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp @@ -1,6 +1,9 @@ #include "dq_pq_write_actor.h" #include "probes.h" +#include "ydb/core/base/appdata_fwd.h" +#include "ydb/core/base/feature_flags.h" + #include #include #include @@ -105,7 +108,7 @@ class TDqPqWriteActor : public NActors::TActor, public IDqCompu TMetrics(const TTxId& txId, ui64 taskId, const ::NMonitoring::TDynamicCounterPtr& counters) : TxId(std::visit([](auto arg) { return ToString(arg); }, txId)) , Counters(counters) { - if (Counters) { + if (Counters && NKikimr::AppData()->FeatureFlags.GetEnableStreamingQueriesCounters()) { SubGroup = Counters->GetSubgroup("sink", "PqSink"); } else { SubGroup = MakeIntrusive<::NMonitoring::TDynamicCounters>(); @@ -569,12 +572,17 @@ void RegisterDqPqWriteActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver dri NPq::NProto::TDqPqTopicSink&& settings, IDqAsyncIoFactory::TSinkArguments&& args) { + auto txId = args.TxId; + auto taskParamsIt = args.TaskParams.find("query_path"); + if (taskParamsIt != args.TaskParams.end()) { + txId = taskParamsIt->second; + } NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(DQ_PQ_PROVIDER)); return CreateDqPqWriteActor( std::move(settings), args.OutputIndex, args.StatsLevel, - args.TxId, + txId, args.TaskId, args.SecureParams, driver, diff --git a/ydb/tests/fq/yds/test_row_dispatcher.py b/ydb/tests/fq/yds/test_row_dispatcher.py index 26043013a0c5..bd2ab4afc885 100644 --- a/ydb/tests/fq/yds/test_row_dispatcher.py +++ b/ydb/tests/fq/yds/test_row_dispatcher.py @@ -36,7 +36,7 @@ def __init__( @pytest.fixture def kikimr(request): kikimr_conf = StreamingOverKikimrConfig( - cloud_mode=True, node_count={"/cp": TenantConfig(1), "/compute": TenantConfig(COMPUTE_NODE_COUNT)} + cloud_mode=True, node_count={"/cp": TenantConfig(1), "/compute": TenantConfig(COMPUTE_NODE_COUNT, extra_feature_flags={"enable_streaming_queries_counters": True})} ) kikimr = StreamingOverKikimr(kikimr_conf) kikimr.compute_plane.fq_config['row_dispatcher']['enabled'] = True From ee01522b9cd7f5b65d3d2d8c1af3b5df98f81508 Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Thu, 27 Nov 2025 20:44:23 +0000 Subject: [PATCH 03/11] aggregated metrics --- .../fq/libs/row_dispatcher/row_dispatcher.cpp | 10 +++---- .../fq/libs/row_dispatcher/topic_session.cpp | 30 +++++++++---------- .../pq/async_io/dq_pq_rd_read_actor.cpp | 9 ++++-- .../pq/async_io/dq_pq_read_actor.cpp | 11 ++++--- .../pq/async_io/dq_pq_write_actor.cpp | 9 ++++-- ydb/tests/fq/yds/test_row_dispatcher.py | 1 + 6 files changed, 40 insertions(+), 30 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp index 810bc6364d4b..d48f40631834 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp @@ -123,14 +123,14 @@ struct TAggQueryStat { TAggQueryStat(const TString& queryId, const ::NMonitoring::TDynamicCounterPtr& counters, const NYql::NPq::NProto::TDqPqTopicSource& sourceParams, bool enableStreamingQueriesCounters) : QueryId(queryId) , SubGroup(counters) { - if (!enableStreamingQueriesCounters) { - SubGroup = MakeIntrusive<::NMonitoring::TDynamicCounters>(); - } for (const auto& sensor : sourceParams.GetTaskSensorLabel()) { SubGroup = SubGroup->GetSubgroup(sensor.GetLabel(), sensor.GetValue()); } - auto queryGroup = SubGroup->GetSubgroup("query_id", queryId); - auto topicGroup = queryGroup->GetSubgroup("read_group", SanitizeLabel(sourceParams.GetReadGroup())); + auto topicGroup = SubGroup; + if (enableStreamingQueriesCounters) { + topicGroup = topicGroup->GetSubgroup("query_id", queryId); + topicGroup = topicGroup->GetSubgroup("read_group", SanitizeLabel(sourceParams.GetReadGroup())); + } MaxQueuedBytesCounter = topicGroup->GetCounter("MaxQueuedBytes"); AvgQueuedBytesCounter = topicGroup->GetCounter("AvgQueuedBytes"); MaxReadLagCounter = topicGroup->GetCounter("MaxReadLag"); diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index f89dcb4bf8d2..4c5b279fb0b9 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -26,14 +26,16 @@ namespace { //////////////////////////////////////////////////////////////////////////////// struct TTopicSessionMetrics { - void Init(const ::NMonitoring::TDynamicCounterPtr& counters, const TString& topicPath, const TString& readGroup, ui32 partitionId, bool enableStreamingQueriesCounters) { - TopicGroup = enableStreamingQueriesCounters - ? counters->GetSubgroup("topic", SanitizeLabel(topicPath)) - : MakeIntrusive<::NMonitoring::TDynamicCounters>(); - ReadGroup = TopicGroup->GetSubgroup("read_group", SanitizeLabel(readGroup)); - PartitionGroup = ReadGroup->GetSubgroup("partition", ToString(partitionId)); - - AllSessionsDataRate = ReadGroup->GetCounter("AllSessionsDataRate", true); + void Init(const ::NMonitoring::TDynamicCounterPtr& counters, const TString& topicPath, const TString& readGroupName, ui32 partitionId, bool enableStreamingQueriesCounters) { + const auto topicGroup = counters->GetSubgroup("topic", SanitizeLabel(topicPath)); + + auto readGroup = topicGroup; + PartitionGroup = topicGroup; + if (enableStreamingQueriesCounters) { + readGroup = topicGroup->GetSubgroup("read_group", SanitizeLabel(readGroupName)); + PartitionGroup = readGroup->GetSubgroup("partition", ToString(partitionId)); + } + AllSessionsDataRate = readGroup->GetCounter("AllSessionsDataRate", true); InFlyAsyncInputData = PartitionGroup->GetCounter("InFlyAsyncInputData"); InFlySubscribe = PartitionGroup->GetCounter("InFlySubscribe"); ReconnectRate = PartitionGroup->GetCounter("ReconnectRate", true); @@ -42,9 +44,6 @@ struct TTopicSessionMetrics { WaitEventTimeMs = PartitionGroup->GetHistogram("WaitEventTimeMs", NMonitoring::ExplicitHistogram({5, 20, 100, 500, 2000})); QueuedBytes = PartitionGroup->GetCounter("QueuedBytes"); } - - ::NMonitoring::TDynamicCounterPtr TopicGroup; - ::NMonitoring::TDynamicCounterPtr ReadGroup; ::NMonitoring::TDynamicCounterPtr PartitionGroup; ::NMonitoring::TDynamicCounters::TCounterPtr InFlyAsyncInputData; ::NMonitoring::TDynamicCounters::TCounterPtr InFlySubscribe; @@ -127,10 +126,11 @@ class TTopicSession : public TActorBootstrapped, NYql::TTopicEven for (const auto& sensor : ev->Get()->Record.GetSource().GetTaskSensorLabel()) { Counters = Counters->GetSubgroup(sensor.GetLabel(), sensor.GetValue()); } - auto queryGroup = enableStreamingQueriesCounters - ? Counters->GetSubgroup("query_id", QueryId) - : MakeIntrusive<::NMonitoring::TDynamicCounters>(); - auto readSubGroup = queryGroup->GetSubgroup("read_group", SanitizeLabel(readGroup)); + auto readSubGroup = Counters; + if (enableStreamingQueriesCounters) { + readSubGroup = readSubGroup->GetSubgroup("query_id", QueryId); + readSubGroup = readSubGroup->GetSubgroup("read_group", SanitizeLabel(readGroup)); + } FilteredDataRate = readSubGroup->GetCounter("FilteredDataRate", true); RestartSessionByOffsetsByQuery = readSubGroup->GetCounter("RestartSessionByOffsetsByQuery", true); diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp index 76950146545c..1cd814af038d 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp @@ -82,7 +82,7 @@ struct TRowDispatcherReadActorMetrics { explicit TRowDispatcherReadActorMetrics(const TTxId& txId, ui64 taskId, const ::NMonitoring::TDynamicCounterPtr& counters, const NPq::NProto::TDqPqTopicSource& sourceParams) : TxId(std::visit([](auto arg) { return ToString(arg); }, txId)) , Counters(counters) { - if (Counters && NKikimr::AppData()->FeatureFlags.GetEnableStreamingQueriesCounters()) { + if (Counters) { SubGroup = Counters->GetSubgroup("source", "RdPqRead"); } else { SubGroup = MakeIntrusive<::NMonitoring::TDynamicCounters>(); @@ -90,8 +90,11 @@ struct TRowDispatcherReadActorMetrics { for (const auto& sensor : sourceParams.GetTaskSensorLabel()) { SubGroup = SubGroup->GetSubgroup(sensor.GetLabel(), sensor.GetValue()); } - auto source = SubGroup->GetSubgroup("tx_id", TxId); - auto task = source->GetSubgroup("task_id", ToString(taskId)); + auto task = SubGroup; + if (NKikimr::AppData() && NKikimr::AppData()->FeatureFlags.GetEnableStreamingQueriesCounters()) { + auto source = SubGroup->GetSubgroup("tx_id", TxId); + task = source->GetSubgroup("task_id", ToString(taskId)); + } InFlyGetNextBatch = task->GetCounter("InFlyGetNextBatch"); InFlyAsyncInputData = task->GetCounter("InFlyAsyncInputData"); ReInit = task->GetCounter("ReInit", true); diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp index b756f06a0cbc..b469e6bcc7e4 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp @@ -140,17 +140,20 @@ class TDqPqReadActor : public NActors::TActor, public NYql::NDq: const NPq::NProto::TDqPqTopicSource& sourceParams) : TxId(std::visit([](auto arg) { return ToString(arg); }, txId)) , Counters(counters) { - if (counters && NKikimr::AppData()->FeatureFlags.GetEnableStreamingQueriesCounters()) { + if (counters) { SubGroup = Counters->GetSubgroup("source", "PqRead"); } else { SubGroup = MakeIntrusive<::NMonitoring::TDynamicCounters>(); } - for (const auto& sensor : sourceParams.GetTaskSensorLabel()) { SubGroup = SubGroup->GetSubgroup(sensor.GetLabel(), sensor.GetValue()); } - auto source = SubGroup->GetSubgroup("tx_id", TxId); - auto task = source->GetSubgroup("task_id", ToString(taskId)); + auto source = SubGroup; + auto task = SubGroup; + if (NKikimr::AppData() && NKikimr::AppData()->FeatureFlags.GetEnableStreamingQueriesCounters()) { + source = source->GetSubgroup("tx_id", TxId); + task = source->GetSubgroup("task_id", ToString(taskId)); + } InFlyAsyncInputData = task->GetCounter("InFlyAsyncInputData"); InFlySubscribe = task->GetCounter("InFlySubscribe"); AsyncInputDataRate = task->GetCounter("AsyncInputDataRate", true); diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp index 8278f5fcc230..b4ba9276f27a 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp @@ -108,13 +108,16 @@ class TDqPqWriteActor : public NActors::TActor, public IDqCompu TMetrics(const TTxId& txId, ui64 taskId, const ::NMonitoring::TDynamicCounterPtr& counters) : TxId(std::visit([](auto arg) { return ToString(arg); }, txId)) , Counters(counters) { - if (Counters && NKikimr::AppData()->FeatureFlags.GetEnableStreamingQueriesCounters()) { + if (Counters) { SubGroup = Counters->GetSubgroup("sink", "PqSink"); } else { SubGroup = MakeIntrusive<::NMonitoring::TDynamicCounters>(); } - auto sink = SubGroup->GetSubgroup("tx_id", TxId); - auto task = sink->GetSubgroup("task_id", ToString(taskId)); + auto task = SubGroup; + if (NKikimr::AppData() && NKikimr::AppData()->FeatureFlags.GetEnableStreamingQueriesCounters()) { + auto sink = SubGroup->GetSubgroup("tx_id", TxId); + task = sink->GetSubgroup("task_id", ToString(taskId)); + } LastAckLatency = task->GetCounter("LastAckLatencyMs"); InFlyCheckpoints = task->GetCounter("InFlyCheckpoints"); InFlyData = task->GetCounter("InFlyData"); diff --git a/ydb/tests/fq/yds/test_row_dispatcher.py b/ydb/tests/fq/yds/test_row_dispatcher.py index bd2ab4afc885..4772a97e93bd 100644 --- a/ydb/tests/fq/yds/test_row_dispatcher.py +++ b/ydb/tests/fq/yds/test_row_dispatcher.py @@ -25,6 +25,7 @@ YDS_CONNECTION = "yds" COMPUTE_NODE_COUNT = 3 + class Param(object): def __init__( self, From 3e5795c3f20e4b4694eae5f31ff75de29f25f0dd Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Thu, 27 Nov 2025 23:44:40 +0300 Subject: [PATCH 04/11] Update ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp Co-authored-by: Pisarenko Grigoriy --- ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp index d48f40631834..944cc9933830 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp @@ -5,8 +5,8 @@ #include "leader_election.h" #include "probes.h" -#include "ydb/core/base/appdata_fwd.h" -#include "ydb/core/base/feature_flags.h" +#include +#include #include #include From f98de1aab06b0c3d59be8a4cf330ac54e6ae214a Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Thu, 27 Nov 2025 23:44:47 +0300 Subject: [PATCH 05/11] Update ydb/core/fq/libs/row_dispatcher/topic_session.cpp Co-authored-by: Pisarenko Grigoriy --- ydb/core/fq/libs/row_dispatcher/topic_session.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 4c5b279fb0b9..714f402c7dd0 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -1,7 +1,7 @@ #include "topic_session.h" -#include "ydb/core/base/appdata_fwd.h" -#include "ydb/core/base/feature_flags.h" +#include +#include #include #include From bd037cb783da7489e909b8bc86770179fa0b2a9e Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Thu, 27 Nov 2025 23:44:59 +0300 Subject: [PATCH 06/11] Update ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp Co-authored-by: Pisarenko Grigoriy --- ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp index 1cd814af038d..f4d46020296b 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp @@ -23,8 +23,8 @@ #include #include #include -#include "ydb/core/base/appdata_fwd.h" -#include "ydb/core/base/feature_flags.h" +#include +#include #include #include From 1d660e5f3755a759a21935f1710a89d93578d879 Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Thu, 27 Nov 2025 23:45:09 +0300 Subject: [PATCH 07/11] Update ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp Co-authored-by: Pisarenko Grigoriy --- ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp index b469e6bcc7e4..6eefd3cce593 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp @@ -4,8 +4,8 @@ #include "dq_pq_read_actor_base.h" #include "probes.h" -#include "ydb/core/base/appdata_fwd.h" -#include "ydb/core/base/feature_flags.h" +#include +#include #include #include From 96c5b24855ec601c28d2d12e9a1e102e76b29664 Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Thu, 27 Nov 2025 23:45:24 +0300 Subject: [PATCH 08/11] Update ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp Co-authored-by: Pisarenko Grigoriy --- ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp index b4ba9276f27a..10711564955d 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp @@ -1,8 +1,8 @@ #include "dq_pq_write_actor.h" #include "probes.h" -#include "ydb/core/base/appdata_fwd.h" -#include "ydb/core/base/feature_flags.h" +#include +#include #include #include From 8718647165515cf30b19097f4413af5d85829305 Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Fri, 28 Nov 2025 08:14:57 +0000 Subject: [PATCH 09/11] mv flag to Register --- ydb/core/fq/libs/init/init.cpp | 4 +-- .../fq/libs/row_dispatcher/topic_session.cpp | 14 ++++----- .../kqp/compute_actor/kqp_compute_actor.cpp | 5 ++-- .../pq/async_io/dq_pq_rd_read_actor.cpp | 24 ++++++++++----- .../pq/async_io/dq_pq_rd_read_actor.h | 3 +- .../pq/async_io/dq_pq_read_actor.cpp | 29 ++++++++++++------- .../providers/pq/async_io/dq_pq_read_actor.h | 3 +- .../pq/async_io/dq_pq_write_actor.cpp | 20 ++++++++----- .../providers/pq/async_io/dq_pq_write_actor.h | 3 +- .../pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp | 3 +- .../fq/pq_async_io/ut/dq_pq_read_actor_ut.cpp | 1 + ydb/tests/fq/pq_async_io/ut_helpers.cpp | 1 + ydb/tests/fq/yds/test_row_dispatcher.py | 2 +- 13 files changed, 69 insertions(+), 43 deletions(-) diff --git a/ydb/core/fq/libs/init/init.cpp b/ydb/core/fq/libs/init/init.cpp index 60dfee0e9209..f9943b0b377f 100644 --- a/ydb/core/fq/libs/init/init.cpp +++ b/ydb/core/fq/libs/init/init.cpp @@ -259,7 +259,7 @@ void Init( ); auto pqGateway = pqGatewayFactory ? pqGatewayFactory->CreatePqGateway() : NYql::CreatePqNativeGateway(std::move(pqServices)); RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, pqGateway, - yqCounters->GetSubgroup("subsystem", "DqSourceTracker"), commonConfig.GetPqReconnectPeriod()); + yqCounters->GetSubgroup("subsystem", "DqSourceTracker"), commonConfig.GetPqReconnectPeriod(), true); s3ActorsFactory->RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway, s3HttpRetryPolicy, readActorFactoryCfg, yqCounters->GetSubgroup("subsystem", "S3ReadActor"), protoConfig.GetGateways().GetS3().GetAllowLocalFiles()); @@ -267,7 +267,7 @@ void Init( httpGateway, s3HttpRetryPolicy); RegisterGenericProviderFactories(*asyncIoFactory, credentialsFactory, connectorClient); - RegisterDqPqWriteActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, pqGateway, yqCounters->GetSubgroup("subsystem", "DqSinkTracker")); + RegisterDqPqWriteActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, pqGateway, yqCounters->GetSubgroup("subsystem", "DqSinkTracker"), true); RegisterDQSolomonWriteActorFactory(*asyncIoFactory, credentialsFactory); RegisterDQSolomonReadActorFactory(*asyncIoFactory, credentialsFactory); } diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 714f402c7dd0..d60e7f3061cc 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -27,11 +27,10 @@ namespace { struct TTopicSessionMetrics { void Init(const ::NMonitoring::TDynamicCounterPtr& counters, const TString& topicPath, const TString& readGroupName, ui32 partitionId, bool enableStreamingQueriesCounters) { - const auto topicGroup = counters->GetSubgroup("topic", SanitizeLabel(topicPath)); - - auto readGroup = topicGroup; - PartitionGroup = topicGroup; + auto readGroup = counters; + PartitionGroup = counters; if (enableStreamingQueriesCounters) { + const auto topicGroup = counters->GetSubgroup("topic", SanitizeLabel(topicPath)); readGroup = topicGroup->GetSubgroup("read_group", SanitizeLabel(readGroupName)); PartitionGroup = readGroup->GetSubgroup("partition", ToString(partitionId)); } @@ -123,11 +122,12 @@ class TTopicSession : public TActorBootstrapped, NYql::TTopicEven InitialOffset = *offset; } Y_UNUSED(TDuration::TryParse(ev->Get()->Record.GetSource().GetReconnectPeriod(), ReconnectPeriod)); - for (const auto& sensor : ev->Get()->Record.GetSource().GetTaskSensorLabel()) { - Counters = Counters->GetSubgroup(sensor.GetLabel(), sensor.GetValue()); - } + auto readSubGroup = Counters; if (enableStreamingQueriesCounters) { + for (const auto& sensor : ev->Get()->Record.GetSource().GetTaskSensorLabel()) { + Counters = Counters->GetSubgroup(sensor.GetLabel(), sensor.GetValue()); + } readSubGroup = readSubGroup->GetSubgroup("query_id", QueryId); readSubGroup = readSubGroup->GetSubgroup("read_group", SanitizeLabel(readGroup)); } diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp index a541c55a1450..47853ac35f60 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp @@ -109,8 +109,9 @@ NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory( NYql::NDq::RegisterDQSolomonReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory); NYql::NDq::RegisterDQSolomonWriteActorFactory(*factory, federatedQuerySetup->CredentialsFactory); - NYql::NDq::RegisterDqPqReadActorFactory(*factory, *federatedQuerySetup->Driver, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->PqGateway, counters->GetKqpCounters()->GetSubgroup("subsystem", "DqSourceTracker")); - NYql::NDq::RegisterDqPqWriteActorFactory(*factory, *federatedQuerySetup->Driver, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->PqGateway, counters->GetKqpCounters()->GetSubgroup("subsystem", "DqSinkTracker")); + bool enableStreamingQueriesCounters = NKikimr::AppData()->FeatureFlags.GetEnableStreamingQueriesCounters(); + NYql::NDq::RegisterDqPqReadActorFactory(*factory, *federatedQuerySetup->Driver, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->PqGateway, counters->GetKqpCounters()->GetSubgroup("subsystem", "DqSourceTracker"), {}, enableStreamingQueriesCounters); + NYql::NDq::RegisterDqPqWriteActorFactory(*factory, *federatedQuerySetup->Driver, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->PqGateway, counters->GetKqpCounters()->GetSubgroup("subsystem", "DqSinkTracker"), enableStreamingQueriesCounters); } return factory; diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp index f4d46020296b..6d3767213fef 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp @@ -79,7 +79,7 @@ LWTRACE_USING(DQ_PQ_PROVIDER); } // namespace struct TRowDispatcherReadActorMetrics { - explicit TRowDispatcherReadActorMetrics(const TTxId& txId, ui64 taskId, const ::NMonitoring::TDynamicCounterPtr& counters, const NPq::NProto::TDqPqTopicSource& sourceParams) + explicit TRowDispatcherReadActorMetrics(const TTxId& txId, ui64 taskId, const ::NMonitoring::TDynamicCounterPtr& counters, const NPq::NProto::TDqPqTopicSource& sourceParams, bool enableStreamingQueriesCounters) : TxId(std::visit([](auto arg) { return ToString(arg); }, txId)) , Counters(counters) { if (Counters) { @@ -87,11 +87,12 @@ struct TRowDispatcherReadActorMetrics { } else { SubGroup = MakeIntrusive<::NMonitoring::TDynamicCounters>(); } - for (const auto& sensor : sourceParams.GetTaskSensorLabel()) { - SubGroup = SubGroup->GetSubgroup(sensor.GetLabel(), sensor.GetValue()); - } + auto task = SubGroup; - if (NKikimr::AppData() && NKikimr::AppData()->FeatureFlags.GetEnableStreamingQueriesCounters()) { + if (enableStreamingQueriesCounters) { + for (const auto& sensor : sourceParams.GetTaskSensorLabel()) { + SubGroup = SubGroup->GetSubgroup(sensor.GetLabel(), sensor.GetValue()); + } auto source = SubGroup->GetSubgroup("tx_id", TxId); task = source->GetSubgroup("task_id", ToString(taskId)); } @@ -323,6 +324,7 @@ class TDqPqRdReadActor : public NActors::TActor, public NYql:: // Set on Parent ui64 NextGeneration = 0; ui64 NextEventQueueId = 0; + bool EnableStreamingQueriesCounters = false; TMap> LastUsedPartitionDistribution; TMap> LastReceivedPartitionDistribution; @@ -346,6 +348,7 @@ class TDqPqRdReadActor : public NActors::TActor, public NYql:: const ::NMonitoring::TDynamicCounterPtr& counters, i64 bufferSize, const IPqGateway::TPtr& pqGateway, + bool enableStreamingQueriesCounters, TDqPqRdReadActor* parent = nullptr, const TString& cluster = {}); @@ -532,6 +535,7 @@ TDqPqRdReadActor::TDqPqRdReadActor( const ::NMonitoring::TDynamicCounterPtr& counters, i64 bufferSize, const IPqGateway::TPtr& pqGateway, + bool enableStreamingQueriesCounters, TDqPqRdReadActor* parent, const TString& cluster) : TActor(&TDqPqRdReadActor::StateFunc) @@ -540,13 +544,14 @@ TDqPqRdReadActor::TDqPqRdReadActor( , Cluster(cluster) , Token(token) , LocalRowDispatcherActorId(localRowDispatcherActorId) - , Metrics(txId, taskId, counters, SourceParams) + , Metrics(txId, taskId, counters, SourceParams, enableStreamingQueriesCounters) , PqGateway(pqGateway) , HolderFactory(holderFactory) , TypeEnv(typeEnv) , Driver(std::move(driver)) , CredentialsProviderFactory(std::move(credentialsProviderFactory)) , MaxBufferSize(bufferSize) + , EnableStreamingQueriesCounters(enableStreamingQueriesCounters) { SRC_LOG_I("Start read actor, local row dispatcher " << LocalRowDispatcherActorId.ToString() << ", metadatafields: " << JoinSeq(',', SourceParams.GetMetadataFields()) << ", partitions: " << JoinSeq(',', GetPartitionsToRead()) << ", skip json errors: " << SourceParams.GetSkipJsonErrors()); @@ -1541,6 +1546,7 @@ void TDqPqRdReadActor::StartCluster(ui32 clusterIndex) { {}, MaxBufferSize, PqGateway, + EnableStreamingQueriesCounters, this, TString(Clusters[clusterIndex].Info.Name)); Clusters[clusterIndex].Child = actor; @@ -1581,7 +1587,8 @@ std::pair CreateDqPqRdReadActor( const NKikimr::NMiniKQL::THolderFactory& holderFactory, const ::NMonitoring::TDynamicCounterPtr& counters, i64 bufferSize, - const IPqGateway::TPtr& pqGateway) + const IPqGateway::TPtr& pqGateway, + bool enableStreamingQueriesCounters) { const TString& tokenName = settings.GetToken().GetName(); const TString token = secureParams.Value(tokenName, TString()); @@ -1603,7 +1610,8 @@ std::pair CreateDqPqRdReadActor( token, counters, bufferSize, - pqGateway); + pqGateway, + enableStreamingQueriesCounters); return {actor, actor}; } diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.h b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.h index e470baa257b9..536830269a68 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.h +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.h @@ -39,6 +39,7 @@ std::pair CreateDqPqRdReadActor( const NKikimr::NMiniKQL::THolderFactory& holderFactory, const ::NMonitoring::TDynamicCounterPtr& counters, i64 bufferSize, - const IPqGateway::TPtr& pqGateway); + const IPqGateway::TPtr& pqGateway, + bool enableStreamingQueriesCounters); } // namespace NYql::NDq diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp index 6eefd3cce593..2fcd6ec2b3f7 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp @@ -137,7 +137,8 @@ class TDqPqReadActor : public NActors::TActor, public NYql::NDq: const TTxId& txId, ui64 taskId, const ::NMonitoring::TDynamicCounterPtr& counters, - const NPq::NProto::TDqPqTopicSource& sourceParams) + const NPq::NProto::TDqPqTopicSource& sourceParams, + bool enableStreamingQueriesCounters) : TxId(std::visit([](auto arg) { return ToString(arg); }, txId)) , Counters(counters) { if (counters) { @@ -145,12 +146,13 @@ class TDqPqReadActor : public NActors::TActor, public NYql::NDq: } else { SubGroup = MakeIntrusive<::NMonitoring::TDynamicCounters>(); } - for (const auto& sensor : sourceParams.GetTaskSensorLabel()) { - SubGroup = SubGroup->GetSubgroup(sensor.GetLabel(), sensor.GetValue()); - } + auto source = SubGroup; auto task = SubGroup; - if (NKikimr::AppData() && NKikimr::AppData()->FeatureFlags.GetEnableStreamingQueriesCounters()) { + if (enableStreamingQueriesCounters) { + for (const auto& sensor : sourceParams.GetTaskSensorLabel()) { + SubGroup = SubGroup->GetSubgroup(sensor.GetLabel(), sensor.GetValue()); + } source = source->GetSubgroup("tx_id", TxId); task = source->GetSubgroup("task_id", ToString(taskId)); } @@ -213,10 +215,11 @@ class TDqPqReadActor : public NActors::TActor, public NYql::NDq: const ::NMonitoring::TDynamicCounterPtr& counters, i64 bufferSize, const IPqGateway::TPtr& pqGateway, - ui32 topicPartitionsCount) + ui32 topicPartitionsCount, + bool enableStreamingQueriesCounters) : TActor(&TDqPqReadActor::StateFunc) , TDqPqReadActorBase(inputIndex, taskId, this->SelfId(), txId, std::move(sourceParams), std::move(readParams), computeActorId) - , Metrics(txId, taskId, counters, SourceParams) + , Metrics(txId, taskId, counters, SourceParams, enableStreamingQueriesCounters) , BufferSize(bufferSize) , HolderFactory(holderFactory) , Driver(std::move(driver)) @@ -938,6 +941,7 @@ std::pair CreateDqPqReadActor( const ::NMonitoring::TDynamicCounterPtr& counters, IPqGateway::TPtr pqGateway, ui32 topicPartitionsCount, + bool enableStreamingQueriesCounters, i64 bufferSize ) { @@ -959,15 +963,16 @@ std::pair CreateDqPqReadActor( counters, bufferSize, pqGateway, - topicPartitionsCount + topicPartitionsCount, + enableStreamingQueriesCounters ); return {actor, actor}; } -void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters, const TString& reconnectPeriod) { +void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters, const TString& reconnectPeriod, bool enableStreamingQueriesCounters) { factory.RegisterSource("PqSource", - [driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), counters, pqGateway, reconnectPeriod]( + [driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), counters, pqGateway, reconnectPeriod, enableStreamingQueriesCounters]( NPq::NProto::TDqPqTopicSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) { @@ -1001,6 +1006,7 @@ void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driv counters ? counters : args.TaskCounters, pqGateway, topicPartitionsCount, + enableStreamingQueriesCounters, PQReadDefaultFreeSpace); } @@ -1020,7 +1026,8 @@ void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driv args.HolderFactory, counters ? counters : args.TaskCounters, PQReadDefaultFreeSpace, - pqGateway); + pqGateway, + enableStreamingQueriesCounters); }); } diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h index d6c2fb2c732b..f06576d84b9d 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h @@ -38,9 +38,10 @@ std::pair CreateDqPqReadActor( const ::NMonitoring::TDynamicCounterPtr& counters, IPqGateway::TPtr pqGateway, ui32 topicPartitionsCount, + bool enableStreamingQueriesCounters, i64 bufferSize = PQReadDefaultFreeSpace ); -void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters = MakeIntrusive<::NMonitoring::TDynamicCounters>(), const TString& reconnectPeriod = {}); +void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters = MakeIntrusive<::NMonitoring::TDynamicCounters>(), const TString& reconnectPeriod = {}, bool enableStreamingQueriesCounters = true); } // namespace NYql::NDq diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp index 10711564955d..b54bacdc4054 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp @@ -105,7 +105,7 @@ TString MakeStringForLog(const NDqProto::TCheckpoint& checkpoint) { class TDqPqWriteActor : public NActors::TActor, public IDqComputeActorAsyncOutput, TTopicEventProcessor { struct TMetrics { - TMetrics(const TTxId& txId, ui64 taskId, const ::NMonitoring::TDynamicCounterPtr& counters) + TMetrics(const TTxId& txId, ui64 taskId, const ::NMonitoring::TDynamicCounterPtr& counters, bool enableStreamingQueriesCounters) : TxId(std::visit([](auto arg) { return ToString(arg); }, txId)) , Counters(counters) { if (Counters) { @@ -114,7 +114,7 @@ class TDqPqWriteActor : public NActors::TActor, public IDqCompu SubGroup = MakeIntrusive<::NMonitoring::TDynamicCounters>(); } auto task = SubGroup; - if (NKikimr::AppData() && NKikimr::AppData()->FeatureFlags.GetEnableStreamingQueriesCounters()) { + if (enableStreamingQueriesCounters) { auto sink = SubGroup->GetSubgroup("tx_id", TxId); task = sink->GetSubgroup("task_id", ToString(taskId)); } @@ -163,11 +163,12 @@ class TDqPqWriteActor : public NActors::TActor, public IDqCompu IDqComputeActorAsyncOutput::ICallbacks* callbacks, const ::NMonitoring::TDynamicCounterPtr& counters, i64 freeSpace, - const IPqGateway::TPtr& pqGateway) + const IPqGateway::TPtr& pqGateway, + bool enableStreamingQueriesCounters) : TActor(&TDqPqWriteActor::StateFunc) , OutputIndex(outputIndex) , TxId(txId) - , Metrics(txId, taskId, counters) + , Metrics(txId, taskId, counters, enableStreamingQueriesCounters) , SinkParams(std::move(sinkParams)) , Driver(std::move(driver)) , CredentialsProviderFactory(credentialsProviderFactory) @@ -548,6 +549,7 @@ std::pair CreateDqPqWriteActor( IDqComputeActorAsyncOutput::ICallbacks* callbacks, const ::NMonitoring::TDynamicCounterPtr& counters, IPqGateway::TPtr pqGateway, + bool enableStreamingQueriesCounters, i64 freeSpace) { const TString& tokenName = settings.GetToken().GetName(); @@ -565,13 +567,14 @@ std::pair CreateDqPqWriteActor( callbacks, counters, freeSpace, - pqGateway); + pqGateway, + enableStreamingQueriesCounters); return {actor, actor}; } -void RegisterDqPqWriteActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters) { +void RegisterDqPqWriteActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters, bool enableStreamingQueriesCounters) { factory.RegisterSink("PqSink", - [driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), counters, pqGateway]( + [driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), counters, pqGateway, enableStreamingQueriesCounters]( NPq::NProto::TDqPqTopicSink&& settings, IDqAsyncIoFactory::TSinkArguments&& args) { @@ -592,7 +595,8 @@ void RegisterDqPqWriteActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver dri credentialsFactory, args.Callback, counters ? counters : args.TaskCounters, - pqGateway + pqGateway, + enableStreamingQueriesCounters ); }); } diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h index 3e5e17c80a03..40a904cc4586 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h @@ -33,8 +33,9 @@ std::pair CreateDqPqWriteActor( IDqComputeActorAsyncOutput::ICallbacks* callbacks, const ::NMonitoring::TDynamicCounterPtr& counters, IPqGateway::TPtr pqGateway, + bool enableStreamingQueriesCounters, i64 freeSpace = DqPqDefaultFreeSpace); -void RegisterDqPqWriteActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters = MakeIntrusive<::NMonitoring::TDynamicCounters>()); +void RegisterDqPqWriteActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters = MakeIntrusive<::NMonitoring::TDynamicCounters>(), bool enableStreamingQueriesCounters = true); } // namespace NYql::NDq diff --git a/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp b/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp index 757ff49e9821..ccfb4329479c 100644 --- a/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp +++ b/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp @@ -74,7 +74,8 @@ class TFixture : public TPqIoTestFixture { actor.GetHolderFactory(), MakeIntrusive(), freeSpace, - CreateMockPqGateway({.Runtime = CaSetup->Runtime.get()}) + CreateMockPqGateway({.Runtime = CaSetup->Runtime.get()}), + true ); actor.InitAsyncInput(dqAsyncInput, dqAsyncInputAsActor); diff --git a/ydb/tests/fq/pq_async_io/ut/dq_pq_read_actor_ut.cpp b/ydb/tests/fq/pq_async_io/ut/dq_pq_read_actor_ut.cpp index 26a83c2776ec..731961440611 100644 --- a/ydb/tests/fq/pq_async_io/ut/dq_pq_read_actor_ut.cpp +++ b/ydb/tests/fq/pq_async_io/ut/dq_pq_read_actor_ut.cpp @@ -57,6 +57,7 @@ class TFixture : public TPqIoTestFixture { MakeIntrusive(), CreatePqNativeGateway(std::move(pqServices)), 1, + true, freeSpace ); diff --git a/ydb/tests/fq/pq_async_io/ut_helpers.cpp b/ydb/tests/fq/pq_async_io/ut_helpers.cpp index 7098cafe40fd..937bb429ff31 100644 --- a/ydb/tests/fq/pq_async_io/ut_helpers.cpp +++ b/ydb/tests/fq/pq_async_io/ut_helpers.cpp @@ -99,6 +99,7 @@ void TPqIoTestFixture::InitAsyncOutput( &actor.GetAsyncOutputCallbacks(), MakeIntrusive(), CreatePqNativeGateway(std::move(pqServices)), + true, freeSpace); actor.InitAsyncOutput(dqAsyncOutput, dqAsyncOutputAsActor); diff --git a/ydb/tests/fq/yds/test_row_dispatcher.py b/ydb/tests/fq/yds/test_row_dispatcher.py index 4772a97e93bd..df4ce2b9428e 100644 --- a/ydb/tests/fq/yds/test_row_dispatcher.py +++ b/ydb/tests/fq/yds/test_row_dispatcher.py @@ -37,7 +37,7 @@ def __init__( @pytest.fixture def kikimr(request): kikimr_conf = StreamingOverKikimrConfig( - cloud_mode=True, node_count={"/cp": TenantConfig(1), "/compute": TenantConfig(COMPUTE_NODE_COUNT, extra_feature_flags={"enable_streaming_queries_counters": True})} + cloud_mode=True, node_count={"/cp": TenantConfig(1), "/compute": TenantConfig(COMPUTE_NODE_COUNT)} ) kikimr = StreamingOverKikimr(kikimr_conf) kikimr.compute_plane.fq_config['row_dispatcher']['enabled'] = True From 6aa9aceeb36d2774a16164d32e1b100033560d8e Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Fri, 28 Nov 2025 11:26:58 +0000 Subject: [PATCH 10/11] add flag to contructor --- ydb/core/fq/libs/init/init.cpp | 3 ++- .../fq/libs/row_dispatcher/actors_factory.cpp | 6 +++-- .../fq/libs/row_dispatcher/actors_factory.h | 3 ++- .../fq/libs/row_dispatcher/row_dispatcher.cpp | 27 +++++++++++-------- .../fq/libs/row_dispatcher/row_dispatcher.h | 3 ++- .../row_dispatcher/row_dispatcher_service.cpp | 6 +++-- .../row_dispatcher/row_dispatcher_service.h | 3 ++- .../fq/libs/row_dispatcher/topic_session.cpp | 13 +++++---- .../fq/libs/row_dispatcher/topic_session.h | 3 ++- .../row_dispatcher/ut/row_dispatcher_ut.cpp | 3 ++- .../row_dispatcher/ut/topic_session_ut.cpp | 3 ++- .../kqp/proxy_service/kqp_proxy_service.cpp | 4 ++- 12 files changed, 49 insertions(+), 28 deletions(-) diff --git a/ydb/core/fq/libs/init/init.cpp b/ydb/core/fq/libs/init/init.cpp index f9943b0b377f..dda272f45621 100644 --- a/ydb/core/fq/libs/init/init.cpp +++ b/ydb/core/fq/libs/init/init.cpp @@ -236,7 +236,8 @@ void Init( yqSharedResources->UserSpaceYdbDriver, appData->Mon, appData->Counters, - MakeNodesManagerId()); + MakeNodesManagerId(), + true); actorRegistrator(NFq::RowDispatcherServiceActorId(), rowDispatcher.release()); } diff --git a/ydb/core/fq/libs/row_dispatcher/actors_factory.cpp b/ydb/core/fq/libs/row_dispatcher/actors_factory.cpp index ca69d6bbaef3..e7cb52cb5607 100644 --- a/ydb/core/fq/libs/row_dispatcher/actors_factory.cpp +++ b/ydb/core/fq/libs/row_dispatcher/actors_factory.cpp @@ -23,7 +23,8 @@ struct TActorFactory : public IActorFactory { const ::NMonitoring::TDynamicCounterPtr& counters, const ::NMonitoring::TDynamicCounterPtr& countersRoot, const NYql::IPqGateway::TPtr& pqGateway, - ui64 maxBufferSize) const override { + ui64 maxBufferSize, + bool enableStreamingQueriesCounters) const override { auto actorPtr = NFq::NewTopicSession( readGroup, @@ -40,7 +41,8 @@ struct TActorFactory : public IActorFactory { counters, countersRoot, pqGateway, - maxBufferSize + maxBufferSize, + enableStreamingQueriesCounters ); return NActors::TActivationContext::Register(actorPtr.release(), {}, NActors::TMailboxType::HTSwap, Max()); } diff --git a/ydb/core/fq/libs/row_dispatcher/actors_factory.h b/ydb/core/fq/libs/row_dispatcher/actors_factory.h index 1d1814953baa..08c12ca0b28d 100644 --- a/ydb/core/fq/libs/row_dispatcher/actors_factory.h +++ b/ydb/core/fq/libs/row_dispatcher/actors_factory.h @@ -27,7 +27,8 @@ struct IActorFactory : public TThrRefBase { const ::NMonitoring::TDynamicCounterPtr& counters, const ::NMonitoring::TDynamicCounterPtr& countersRoot, const NYql::IPqGateway::TPtr& pqGateway, - ui64 maxBufferSize) const = 0; + ui64 maxBufferSize, + bool enableStreamingQueriesCounters) const = 0; }; IActorFactory::TPtr CreateActorFactory(); diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp index 944cc9933830..d742e509514e 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp @@ -123,13 +123,13 @@ struct TAggQueryStat { TAggQueryStat(const TString& queryId, const ::NMonitoring::TDynamicCounterPtr& counters, const NYql::NPq::NProto::TDqPqTopicSource& sourceParams, bool enableStreamingQueriesCounters) : QueryId(queryId) , SubGroup(counters) { - for (const auto& sensor : sourceParams.GetTaskSensorLabel()) { - SubGroup = SubGroup->GetSubgroup(sensor.GetLabel(), sensor.GetValue()); - } auto topicGroup = SubGroup; if (enableStreamingQueriesCounters) { - topicGroup = topicGroup->GetSubgroup("query_id", queryId); - topicGroup = topicGroup->GetSubgroup("read_group", SanitizeLabel(sourceParams.GetReadGroup())); + for (const auto& sensor : sourceParams.GetTaskSensorLabel()) { + SubGroup = SubGroup->GetSubgroup(sensor.GetLabel(), sensor.GetValue()); + } + SubGroup = SubGroup->GetSubgroup("query_id", queryId); + topicGroup = SubGroup->GetSubgroup("read_group", SanitizeLabel(sourceParams.GetReadGroup())); } MaxQueuedBytesCounter = topicGroup->GetCounter("MaxQueuedBytes"); AvgQueuedBytesCounter = topicGroup->GetCounter("AvgQueuedBytes"); @@ -432,7 +432,8 @@ class TRowDispatcher : public TActorBootstrapped { const NYql::IPqGateway::TPtr& pqGateway, NYdb::TDriver driver, NActors::TMon* monitoring = nullptr, - NActors::TActorId nodesManagerId = {} + NActors::TActorId nodesManagerId = {}, + bool enableStreamingQueriesCounters = false ); void Bootstrap(); @@ -516,7 +517,8 @@ TRowDispatcher::TRowDispatcher( const NYql::IPqGateway::TPtr& pqGateway, NYdb::TDriver driver, NActors::TMon* monitoring, - NActors::TActorId nodesManagerId) + NActors::TActorId nodesManagerId, + bool enableStreamingQueriesCounters) : Config(config) , CredentialsProviderFactory(credentialsProviderFactory) , CredentialsFactory(credentialsFactory) @@ -532,6 +534,7 @@ TRowDispatcher::TRowDispatcher( , Driver(driver) , Monitoring(monitoring) , NodesManagerId(nodesManagerId) + , EnableStreamingQueriesCounters(enableStreamingQueriesCounters) { Y_ENSURE(!Tenant.empty()); } @@ -559,7 +562,6 @@ void TRowDispatcher::Bootstrap() { TlsActivationContext->ActorSystem(), SelfId()); } NodesTracker.Init(SelfId()); - EnableStreamingQueriesCounters = NKikimr::AppData()->FeatureFlags.GetEnableStreamingQueriesCounters(); } void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr& ev) { @@ -906,7 +908,8 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { Counters, CountersRoot, PqGateway, - MaxSessionBufferSizeBytes + MaxSessionBufferSizeBytes, + EnableStreamingQueriesCounters ); TSessionInfo& sessionInfo = topicSessionInfo.Sessions[sessionActorId]; sessionInfo.Consumers[ev->Sender] = consumerInfo; @@ -1313,7 +1316,8 @@ std::unique_ptr NewRowDispatcher( const NYql::IPqGateway::TPtr& pqGateway, NYdb::TDriver driver, NActors::TMon* monitoring, - NActors::TActorId nodesManagerId) + NActors::TActorId nodesManagerId, + bool enableStreamingQueriesCounters) { return std::unique_ptr(new TRowDispatcher( config, @@ -1327,7 +1331,8 @@ std::unique_ptr NewRowDispatcher( pqGateway, driver, monitoring, - nodesManagerId)); + nodesManagerId, + enableStreamingQueriesCounters)); } } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.h b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.h index 66f021c65760..88b942f3b684 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.h +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.h @@ -31,6 +31,7 @@ std::unique_ptr NewRowDispatcher( const NYql::IPqGateway::TPtr& pqGateway, NYdb::TDriver driver, NActors::TMon* monitoring = nullptr, - NActors::TActorId nodesManagerId = {}); + NActors::TActorId nodesManagerId = {}, + bool enableStreamingQueriesCounters = false); } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp index f4447ed681e5..0c99fa3ffece 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp @@ -20,7 +20,8 @@ std::unique_ptr NewRowDispatcherService( NYdb::TDriver driver, NActors::TMon* monitoring, ::NMonitoring::TDynamicCounterPtr countersRoot, - NActors::TActorId nodesManagerId) + NActors::TActorId nodesManagerId, + bool enableStreamingQueriesCounters) { return NewRowDispatcher( config, @@ -34,7 +35,8 @@ std::unique_ptr NewRowDispatcherService( pqGateway, driver, monitoring, - nodesManagerId); + nodesManagerId, + enableStreamingQueriesCounters); } } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h b/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h index da5b19f96d8c..0ebb57c3b0b7 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h @@ -28,6 +28,7 @@ std::unique_ptr NewRowDispatcherService( NYdb::TDriver driver, NActors::TMon* monitoring = nullptr, ::NMonitoring::TDynamicCounterPtr countersRoot = MakeIntrusive<::NMonitoring::TDynamicCounters>(), - NActors::TActorId nodesManagerId = {}); + NActors::TActorId nodesManagerId = {}, + bool enableStreamingQueriesCounters = false); } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index d60e7f3061cc..7f04981ab4ff 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -320,7 +320,8 @@ class TTopicSession : public TActorBootstrapped, NYql::TTopicEven const ::NMonitoring::TDynamicCounterPtr& counters, const ::NMonitoring::TDynamicCounterPtr& countersRoot, const NYql::IPqGateway::TPtr& pqGateway, - ui64 maxBufferSize); + ui64 maxBufferSize, + bool enableStreamingQueriesCounters); void Bootstrap(); void PassAway() override; @@ -405,7 +406,8 @@ TTopicSession::TTopicSession( const ::NMonitoring::TDynamicCounterPtr& counters, const ::NMonitoring::TDynamicCounterPtr& countersRoot, const NYql::IPqGateway::TPtr& pqGateway, - ui64 maxBufferSize) + ui64 maxBufferSize, + bool enableStreamingQueriesCounters) : ReadGroup(readGroup) , TopicPath(topicPath) , TopicPathPartition(TStringBuilder() << topicPath << "/" << partitionId) @@ -423,11 +425,11 @@ TTopicSession::TTopicSession( , LogPrefix("TopicSession") , Counters(counters) , CountersRoot(countersRoot) + , EnableStreamingQueriesCounters(enableStreamingQueriesCounters) {} void TTopicSession::Bootstrap() { Become(&TTopicSession::StateFunc); - EnableStreamingQueriesCounters = NKikimr::AppData()->FeatureFlags.GetEnableStreamingQueriesCounters(); Metrics.Init(Counters, TopicPath, ReadGroup, PartitionId, EnableStreamingQueriesCounters); LogPrefix = LogPrefix + " " + SelfId().ToString() + " "; LOG_ROW_DISPATCHER_DEBUG("Bootstrap " << TopicPathPartition @@ -1038,8 +1040,9 @@ std::unique_ptr NewTopicSession( const ::NMonitoring::TDynamicCounterPtr& counters, const ::NMonitoring::TDynamicCounterPtr& countersRoot, const NYql::IPqGateway::TPtr& pqGateway, - ui64 maxBufferSize) { - return std::unique_ptr(new TTopicSession(readGroup, topicPath, endpoint, database, config, functionRegistry, rowDispatcherActorId, compileServiceActorId, partitionId, std::move(driver), credentialsProviderFactory, counters, countersRoot, pqGateway, maxBufferSize)); + ui64 maxBufferSize, + bool enableStreamingQueriesCounters) { + return std::unique_ptr(new TTopicSession(readGroup, topicPath, endpoint, database, config, functionRegistry, rowDispatcherActorId, compileServiceActorId, partitionId, std::move(driver), credentialsProviderFactory, counters, countersRoot, pqGateway, maxBufferSize, enableStreamingQueriesCounters)); } } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.h b/ydb/core/fq/libs/row_dispatcher/topic_session.h index f71d741ad694..de498fff03c6 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.h +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.h @@ -25,6 +25,7 @@ std::unique_ptr NewTopicSession( const ::NMonitoring::TDynamicCounterPtr& counters, const ::NMonitoring::TDynamicCounterPtr& countersRoot, const NYql::IPqGateway::TPtr& pqGateway, - ui64 maxBufferSize); + ui64 maxBufferSize, + bool enableStreamingQueriesCounters); } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp index 5d6b1f80bf26..dc069c4d2086 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp @@ -43,7 +43,8 @@ struct TTestActorFactory : public NFq::NRowDispatcher::IActorFactory { const ::NMonitoring::TDynamicCounterPtr& /*counters*/, const ::NMonitoring::TDynamicCounterPtr& /*counters*/, const NYql::IPqGateway::TPtr& /*pqGateway*/, - ui64 /*maxBufferSize*/) const override { + ui64 /*maxBufferSize*/, + bool /*enableStreamingQueriesCounters*/) const override { auto actorId = Runtime.AllocateEdgeActor(); ActorIds.push(actorId); return actorId; diff --git a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp index 11eedb13148a..1879d0218f89 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp @@ -90,7 +90,8 @@ class TFixture : public NTests::TBaseFixture { MakeIntrusive(), MakeIntrusive(), !MockTopicSession ? CreatePqNativeGateway(pqServices) : MockPqGateway, - 16000000 + 16000000, + true ).release()); Runtime.EnableScheduleForActor(TopicSession); } diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index d5ec4421d37a..a71d7c21c259 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -1802,7 +1802,9 @@ class TKqpProxyService : public TActorBootstrapped { FederatedQuerySetup->PqGateway, *FederatedQuerySetup->Driver, AppData()->Mon, - Counters->GetKqpCounters()); + Counters->GetKqpCounters(), + {}, + AppData()->FeatureFlags.GetEnableStreamingQueriesCounters()); RowDispatcherService = TActivationContext::Register(rowDispatcher.release()); TActivationContext::ActorSystem()->RegisterLocalService( From 2fe08fb4771d4a7a6add3f97249c4f7ef16eb7de Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Fri, 28 Nov 2025 12:29:19 +0000 Subject: [PATCH 11/11] try to fix build --- ydb/core/fq/libs/row_dispatcher/topic_session.cpp | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 43a263f4ad55..41b2fc5382ea 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -27,14 +27,14 @@ namespace { struct TTopicSessionMetrics { void Init(const ::NMonitoring::TDynamicCounterPtr& counters, const TString& topicPath, const TString& readGroupName, ui32 partitionId, bool enableStreamingQueriesCounters) { - auto readGroup = counters; + ReadGroup = counters; PartitionGroup = counters; if (enableStreamingQueriesCounters) { const auto topicGroup = counters->GetSubgroup("topic", SanitizeLabel(topicPath)); - readGroup = topicGroup->GetSubgroup("read_group", SanitizeLabel(readGroupName)); - PartitionGroup = readGroup->GetSubgroup("partition", ToString(partitionId)); + ReadGroup = topicGroup->GetSubgroup("read_group", SanitizeLabel(readGroupName)); + PartitionGroup = ReadGroup->GetSubgroup("partition", ToString(partitionId)); } - AllSessionsDataRate = readGroup->GetCounter("AllSessionsDataRate", true); + AllSessionsDataRate = ReadGroup->GetCounter("AllSessionsDataRate", true); InFlyAsyncInputData = PartitionGroup->GetCounter("InFlyAsyncInputData"); InFlySubscribe = PartitionGroup->GetCounter("InFlySubscribe"); ReconnectRate = PartitionGroup->GetCounter("ReconnectRate", true); @@ -44,6 +44,7 @@ struct TTopicSessionMetrics { QueuedBytes = PartitionGroup->GetCounter("QueuedBytes"); } ::NMonitoring::TDynamicCounterPtr PartitionGroup; + ::NMonitoring::TDynamicCounterPtr ReadGroup; ::NMonitoring::TDynamicCounters::TCounterPtr InFlyAsyncInputData; ::NMonitoring::TDynamicCounters::TCounterPtr InFlySubscribe; ::NMonitoring::TDynamicCounters::TCounterPtr ReconnectRate;